[ARVADOS] updated: 1edffa89f42b3b3e53df9f5669cc3d7e2c99ea4b

Git user git at public.curoverse.com
Thu Feb 25 22:09:19 EST 2016


Summary of changes:
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 133 +++++++++++++++++----
 .../crunch-dispatch-slurm/crunch-finish-slurm.sh   |   7 ++
 services/crunch-run/crunchrun.go                   |   2 +-
 3 files changed, 116 insertions(+), 26 deletions(-)
 create mode 100755 services/crunch-dispatch-slurm/crunch-finish-slurm.sh

       via  1edffa89f42b3b3e53df9f5669cc3d7e2c99ea4b (commit)
      from  5af1462689a77d3222fa29e547f1f38a80c3610d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1edffa89f42b3b3e53df9f5669cc3d7e2c99ea4b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 25 22:09:16 2016 -0500

    6518: Working on using strigger to update job records when crunch-run cannot.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 5133c9f..875eaa3 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -4,7 +4,7 @@ import (
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"io"
+	"io/ioutil"
 	"log"
 	"os"
 	"os/exec"
@@ -48,6 +48,11 @@ func doMain() error {
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
 
+	finishCommand := flags.String(
+		"finish-command",
+		"/usr/bin/crunch-finish-slurm.sh",
+		"Command to run from strigger when job is finished")
+
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
@@ -71,7 +76,7 @@ func doMain() error {
 	}(sigChan)
 
 	// Run all queued containers
-	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
@@ -85,13 +90,13 @@ func doMain() error {
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
 	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
 
 	for {
 		select {
 		case <-ticker.C:
-			dispatchSlurm(priorityPollInterval, crunchRunCommand)
+			dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
 		case <-doneProcessing:
 			ticker.Stop()
 			return
@@ -112,7 +117,7 @@ type ContainerList struct {
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
+func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
@@ -127,7 +132,90 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
 	for i := 0; i < len(containers.Items); i++ {
 		log.Printf("About to submit queued container %v", containers.Items[i].UUID)
 		// Run the container
-		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+		go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+	}
+}
+
+func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
+	submiterr = nil
+
+	defer func() {
+		if submiterr != nil {
+			// This really should be an "Error" state, see #8018
+			updateErr := arv.Update("containers", container.UUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"state": "Complete"}},
+				nil)
+			if updateErr != nil {
+				log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
+			}
+		}
+	}()
+
+	cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable")
+	stdinWriter, stdinerr := cmd.StdinPipe()
+	if stdinerr != nil {
+		submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
+		return
+	}
+
+	stdoutReader, stdouterr := cmd.StdoutPipe()
+	if stdouterr != nil {
+		submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
+		return
+	}
+
+	stderrReader, stderrerr := cmd.StderrPipe()
+	if stderrerr != nil {
+		submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
+		return
+	}
+
+	err := cmd.Start()
+	if err != nil {
+		submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+		return
+	}
+
+	stdoutchan := make(chan []byte)
+	go func() {
+		b, _ := ioutil.ReadAll(stdoutReader)
+		stdoutchan <- b
+		close(stdoutchan)
+	}()
+
+	stderrchan := make(chan []byte)
+	go func() {
+		b, _ := ioutil.ReadAll(stderrReader)
+		stderrchan <- b
+		close(stderrchan)
+	}()
+
+	fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
+	stdinWriter.Close()
+
+	err = cmd.Wait()
+
+	stdoutmsg := <-stdoutchan
+	stderrmsg := <-stderrchan
+
+	if err != nil {
+		submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+		return
+	}
+
+	jobid = string(stdoutmsg)
+
+	return
+}
+
+func strigger(jobid, containerUUID, finishCommand string) {
+	cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini", fmt.Sprintf("--program=%s", finishCommand))
+	cmd.Stdout = os.Stdout
+	cmd.Stderr = os.Stderr
+	err := cmd.Run()
+	if err != nil {
+		log.Printf("While setting up strigger: %v", err)
 	}
 }
 
@@ -136,47 +224,42 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
 // Run container using the given crunch-run command
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
-	stdinReader, stdinWriter := io.Pipe()
+func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
 
-	cmd := exec.Command("sbatch", "--job-name="+uuid, "--share")
-	cmd.Stdin = stdinReader
-	cmd.Stderr = os.Stderr
-	cmd.Stdout = os.Stderr
-	if err := cmd.Start(); err != nil {
-		log.Printf("Error running container for %v: %q", uuid, err)
+	jobid, err := submit(container, crunchRunCommand)
+	if err != nil {
+		log.Printf("Error queuing container run: %v", err)
 		return
 	}
 
-	fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, uuid)
-
-	stdinWriter.Close()
-	cmd.Wait()
+	strigger(jobid, container.UUID, finishCommand)
 
 	// Update container status to Running
-	err := arv.Update("containers", uuid,
+	err = arv.Update("containers", container.UUID,
 		arvadosclient.Dict{
 			"container": arvadosclient.Dict{"state": "Running"}},
 		nil)
 	if err != nil {
-		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+		log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
 	}
 
-	log.Printf("Submitted container run for %v", uuid)
+	log.Printf("Submitted container run for %v", container.UUID)
+
+	containerUUID := container.UUID
 
 	// A goroutine to terminate the runner if container priority becomes zero
 	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
 	go func() {
 		for _ = range priorityTicker.C {
 			var container Container
-			err := arv.Get("containers", uuid, nil, &container)
+			err := arv.Get("containers", containerUUID, nil, &container)
 			if err != nil {
-				log.Printf("Error getting container info for %v: %q", uuid, err)
+				log.Printf("Error getting container info for %v: %q", container.UUID, err)
 			} else {
 				if container.Priority == 0 {
-					log.Printf("Canceling container %v", uuid)
+					log.Printf("Canceling container %v", container.UUID)
 					priorityTicker.Stop()
-					cancelcmd := exec.Command("scancel", "--name="+uuid)
+					cancelcmd := exec.Command("scancel", "--name="+container.UUID)
 					cancelcmd.Run()
 				}
 				if container.State == "Complete" {
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
new file mode 100755
index 0000000..8be6fdd
--- /dev/null
+++ b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+jobid=$1
+
+uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
+
+arv containers update --uuid $uuid --container '{"state": "Completed"}'
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 64f0d77..039a649 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -604,7 +604,7 @@ func (runner *ContainerRunner) Run() (err error) {
 	if hosterr != nil {
 		runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
 	} else {
-		runner.CrunchLog.Printf("Executing on host '%s'", runner.ContainerRecord.UUID, hostname)
+		runner.CrunchLog.Printf("Executing on host '%s'", hostname)
 	}
 
 	var runerr, waiterr error

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list