[ARVADOS] created: e39e6831a61af3b889a78b2380bf978c48a20fb9
Git user
git at public.curoverse.com
Thu Feb 18 21:50:54 EST 2016
at e39e6831a61af3b889a78b2380bf978c48a20fb9 (commit)
commit e39e6831a61af3b889a78b2380bf978c48a20fb9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Feb 18 21:50:46 2016 -0500
6518: Log when container is cancelled.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 29d58c5..e053c21 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -174,6 +174,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
log.Printf("Error getting container info for %v: %q", uuid, err)
} else {
if container.Priority == 0 {
+ log.Printf("Canceling container %v", uuid)
priorityTicker.Stop()
cancelcmd := exec.Command("scancel", "--name="+uuid)
cancelcmd.Run()
commit df5453354edc2e6a3db198884b9c5bd0f86fed7c
Merge: 5762dc9 a56f0f6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Feb 18 21:42:31 2016 -0500
Merge branch '8015-crunch2-mount' into 6518-crunch2-dispatch-slurm
commit 5762dc9c58290f3c760b9183f6735d056c895daf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Feb 18 21:42:10 2016 -0500
6518: Dispatch to slurm using sbatch
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
new file mode 100644
index 0000000..29d58c5
--- /dev/null
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -0,0 +1,185 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "io"
+ "log"
+ "os"
+ "os/exec"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+)
+
+func main() {
+ err := doMain()
+ if err != nil {
+ log.Fatalf("%q", err)
+ }
+}
+
+var (
+ arv arvadosclient.ArvadosClient
+ runningCmds map[string]*exec.Cmd
+ runningCmdsMutex sync.Mutex
+ waitGroup sync.WaitGroup
+ doneProcessing chan bool
+ sigChan chan os.Signal
+)
+
+func doMain() error {
+ flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+
+ pollInterval := flags.Int(
+ "poll-interval",
+ 10,
+ "Interval in seconds to poll for queued containers")
+
+ priorityPollInterval := flags.Int(
+ "container-priority-poll-interval",
+ 60,
+ "Interval in seconds to check priority of a dispatched container")
+
+ crunchRunCommand := flags.String(
+ "crunch-run-command",
+ "/usr/bin/crunch-run",
+ "Crunch command to run container")
+
+ // Parse args; omit the first arg which is the command name
+ flags.Parse(os.Args[1:])
+
+ var err error
+ arv, err = arvadosclient.MakeArvadosClient()
+ if err != nil {
+ return err
+ }
+
+ // Channel to terminate
+ doneProcessing = make(chan bool)
+
+ // Graceful shutdown
+ sigChan = make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ log.Printf("Caught signal: %v", sig)
+ doneProcessing <- true
+ }
+ }(sigChan)
+
+ // Run all queued containers
+ runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+
+ // Wait for all running crunch jobs to complete / terminate
+ waitGroup.Wait()
+
+ return nil
+}
+
+// Poll for queued containers using pollInterval.
+// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+//
+// Any errors encountered are logged but the program would continue to run (not exit).
+// This is because, once one or more crunch jobs are running,
+// we would need to wait for them complete.
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+ ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+
+ for {
+ select {
+ case <-ticker.C:
+ dispatchSlurm(priorityPollInterval, crunchRunCommand)
+ case <-doneProcessing:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+// Container data
+type Container struct {
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+ Items []Container `json:"items"`
+}
+
+// Get the list of queued containers from API server and invoke run for each container.
+func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
+ params := arvadosclient.Dict{
+ "filters": [][]string{[]string{"state", "=", "Queued"}},
+ }
+
+ var containers ContainerList
+ err := arv.List("containers", params, &containers)
+ if err != nil {
+ log.Printf("Error getting list of queued containers: %q", err)
+ return
+ }
+
+ for i := 0; i < len(containers.Items); i++ {
+ log.Printf("About to submit queued container %v", containers.Items[i].UUID)
+ // Run the container
+ go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ }
+}
+
+// Run queued container:
+// Set container state to locked (TBD)
+// Run container using the given crunch-run command
+// Set the container state to Running
+// If the container priority becomes zero while crunch job is still running, terminate it.
+func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+ stdinReader, stdinWriter := io.Pipe()
+
+ cmd := exec.Command("sbatch", "--job-name="+uuid)
+ cmd.Stdin = stdinReader
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stderr
+ if err := cmd.Start(); err != nil {
+ log.Printf("Error running container for %v: %q", uuid, err)
+ return
+ }
+
+ fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec %s %s\n", crunchRunCommand, uuid)
+
+ stdinWriter.Close()
+ cmd.Wait()
+
+ // Update container status to Running
+ err := arv.Update("containers", uuid,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Running"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+ }
+
+ log.Printf("Submitted container run for %v", uuid)
+
+ // A goroutine to terminate the runner if container priority becomes zero
+ priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+ go func() {
+ for _ = range priorityTicker.C {
+ var container Container
+ err := arv.Get("containers", uuid, nil, &container)
+ if err != nil {
+ log.Printf("Error getting container info for %v: %q", uuid, err)
+ } else {
+ if container.Priority == 0 {
+ priorityTicker.Stop()
+ cancelcmd := exec.Command("scancel", "--name="+uuid)
+ cancelcmd.Run()
+ }
+ }
+ }
+ }()
+
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list