[ARVADOS] created: e39e6831a61af3b889a78b2380bf978c48a20fb9

Git user git at public.curoverse.com
Thu Feb 18 21:50:54 EST 2016


        at  e39e6831a61af3b889a78b2380bf978c48a20fb9 (commit)


commit e39e6831a61af3b889a78b2380bf978c48a20fb9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 18 21:50:46 2016 -0500

    6518: Log when container is cancelled.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 29d58c5..e053c21 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -174,6 +174,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 				log.Printf("Error getting container info for %v: %q", uuid, err)
 			} else {
 				if container.Priority == 0 {
+					log.Printf("Canceling container %v", uuid)
 					priorityTicker.Stop()
 					cancelcmd := exec.Command("scancel", "--name="+uuid)
 					cancelcmd.Run()

commit df5453354edc2e6a3db198884b9c5bd0f86fed7c
Merge: 5762dc9 a56f0f6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 18 21:42:31 2016 -0500

    Merge branch '8015-crunch2-mount' into 6518-crunch2-dispatch-slurm


commit 5762dc9c58290f3c760b9183f6735d056c895daf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 18 21:42:10 2016 -0500

    6518: Dispatch to slurm using sbatch

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
new file mode 100644
index 0000000..29d58c5
--- /dev/null
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -0,0 +1,185 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"io"
+	"log"
+	"os"
+	"os/exec"
+	"os/signal"
+	"sync"
+	"syscall"
+	"time"
+)
+
+func main() {
+	err := doMain()
+	if err != nil {
+		log.Fatalf("%q", err)
+	}
+}
+
+var (
+	arv              arvadosclient.ArvadosClient
+	runningCmds      map[string]*exec.Cmd
+	runningCmdsMutex sync.Mutex
+	waitGroup        sync.WaitGroup
+	doneProcessing   chan bool
+	sigChan          chan os.Signal
+)
+
+func doMain() error {
+	flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+
+	pollInterval := flags.Int(
+		"poll-interval",
+		10,
+		"Interval in seconds to poll for queued containers")
+
+	priorityPollInterval := flags.Int(
+		"container-priority-poll-interval",
+		60,
+		"Interval in seconds to check priority of a dispatched container")
+
+	crunchRunCommand := flags.String(
+		"crunch-run-command",
+		"/usr/bin/crunch-run",
+		"Crunch command to run container")
+
+	// Parse args; omit the first arg which is the command name
+	flags.Parse(os.Args[1:])
+
+	var err error
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		return err
+	}
+
+	// Channel to terminate
+	doneProcessing = make(chan bool)
+
+	// Graceful shutdown
+	sigChan = make(chan os.Signal, 1)
+	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+	go func(sig <-chan os.Signal) {
+		for sig := range sig {
+			log.Printf("Caught signal: %v", sig)
+			doneProcessing <- true
+		}
+	}(sigChan)
+
+	// Run all queued containers
+	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
+
+	return nil
+}
+
+// Poll for queued containers using pollInterval.
+// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+//
+// Any errors encountered are logged but the program would continue to run (not exit).
+// This is because, once one or more crunch jobs are running,
+// we would need to wait for them complete.
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+
+	for {
+		select {
+		case <-ticker.C:
+			dispatchSlurm(priorityPollInterval, crunchRunCommand)
+		case <-doneProcessing:
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+// Container data
+type Container struct {
+	UUID     string `json:"uuid"`
+	State    string `json:"state"`
+	Priority int    `json:"priority"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+	Items []Container `json:"items"`
+}
+
+// Get the list of queued containers from API server and invoke run for each container.
+func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
+	params := arvadosclient.Dict{
+		"filters": [][]string{[]string{"state", "=", "Queued"}},
+	}
+
+	var containers ContainerList
+	err := arv.List("containers", params, &containers)
+	if err != nil {
+		log.Printf("Error getting list of queued containers: %q", err)
+		return
+	}
+
+	for i := 0; i < len(containers.Items); i++ {
+		log.Printf("About to submit queued container %v", containers.Items[i].UUID)
+		// Run the container
+		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+	}
+}
+
+// Run queued container:
+// Set container state to locked (TBD)
+// Run container using the given crunch-run command
+// Set the container state to Running
+// If the container priority becomes zero while crunch job is still running, terminate it.
+func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+	stdinReader, stdinWriter := io.Pipe()
+
+	cmd := exec.Command("sbatch", "--job-name="+uuid)
+	cmd.Stdin = stdinReader
+	cmd.Stderr = os.Stderr
+	cmd.Stdout = os.Stderr
+	if err := cmd.Start(); err != nil {
+		log.Printf("Error running container for %v: %q", uuid, err)
+		return
+	}
+
+	fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec %s %s\n", crunchRunCommand, uuid)
+
+	stdinWriter.Close()
+	cmd.Wait()
+
+	// Update container status to Running
+	err := arv.Update("containers", uuid,
+		arvadosclient.Dict{
+			"container": arvadosclient.Dict{"state": "Running"}},
+		nil)
+	if err != nil {
+		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+	}
+
+	log.Printf("Submitted container run for %v", uuid)
+
+	// A goroutine to terminate the runner if container priority becomes zero
+	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+	go func() {
+		for _ = range priorityTicker.C {
+			var container Container
+			err := arv.Get("containers", uuid, nil, &container)
+			if err != nil {
+				log.Printf("Error getting container info for %v: %q", uuid, err)
+			} else {
+				if container.Priority == 0 {
+					priorityTicker.Stop()
+					cancelcmd := exec.Command("scancel", "--name="+uuid)
+					cancelcmd.Run()
+				}
+			}
+		}
+	}()
+
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list