[ARVADOS] updated: 1edffa89f42b3b3e53df9f5669cc3d7e2c99ea4b
Git user
git at public.curoverse.com
Thu Feb 25 22:09:19 EST 2016
Summary of changes:
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 133 +++++++++++++++++----
.../crunch-dispatch-slurm/crunch-finish-slurm.sh | 7 ++
services/crunch-run/crunchrun.go | 2 +-
3 files changed, 116 insertions(+), 26 deletions(-)
create mode 100755 services/crunch-dispatch-slurm/crunch-finish-slurm.sh
via 1edffa89f42b3b3e53df9f5669cc3d7e2c99ea4b (commit)
from 5af1462689a77d3222fa29e547f1f38a80c3610d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1edffa89f42b3b3e53df9f5669cc3d7e2c99ea4b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Feb 25 22:09:16 2016 -0500
6518: Working on using strigger to update job records when crunch-run cannot.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 5133c9f..875eaa3 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -4,7 +4,7 @@ import (
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "io"
+ "io/ioutil"
"log"
"os"
"os/exec"
@@ -48,6 +48,11 @@ func doMain() error {
"/usr/bin/crunch-run",
"Crunch command to run container")
+ finishCommand := flags.String(
+ "finish-command",
+ "/usr/bin/crunch-finish-slurm.sh",
+ "Command to run from strigger when job is finished")
+
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
@@ -71,7 +76,7 @@ func doMain() error {
}(sigChan)
// Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+ runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
@@ -85,13 +90,13 @@ func doMain() error {
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
for {
select {
case <-ticker.C:
- dispatchSlurm(priorityPollInterval, crunchRunCommand)
+ dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
case <-doneProcessing:
ticker.Stop()
return
@@ -112,7 +117,7 @@ type ContainerList struct {
}
// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
+func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
@@ -127,7 +132,90 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
for i := 0; i < len(containers.Items); i++ {
log.Printf("About to submit queued container %v", containers.Items[i].UUID)
// Run the container
- go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+ }
+}
+
+func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
+ submiterr = nil
+
+ defer func() {
+ if submiterr != nil {
+ // This really should be an "Error" state, see #8018
+ updateErr := arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Complete"}},
+ nil)
+ if updateErr != nil {
+ log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
+ }
+ }
+ }()
+
+ cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable")
+ stdinWriter, stdinerr := cmd.StdinPipe()
+ if stdinerr != nil {
+ submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
+ return
+ }
+
+ stdoutReader, stdouterr := cmd.StdoutPipe()
+ if stdouterr != nil {
+ submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
+ return
+ }
+
+ stderrReader, stderrerr := cmd.StderrPipe()
+ if stderrerr != nil {
+ submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
+ return
+ }
+
+ err := cmd.Start()
+ if err != nil {
+ submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+ return
+ }
+
+ stdoutchan := make(chan []byte)
+ go func() {
+ b, _ := ioutil.ReadAll(stdoutReader)
+ stdoutchan <- b
+ close(stdoutchan)
+ }()
+
+ stderrchan := make(chan []byte)
+ go func() {
+ b, _ := ioutil.ReadAll(stderrReader)
+ stderrchan <- b
+ close(stderrchan)
+ }()
+
+ fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
+ stdinWriter.Close()
+
+ err = cmd.Wait()
+
+ stdoutmsg := <-stdoutchan
+ stderrmsg := <-stderrchan
+
+ if err != nil {
+ submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+ return
+ }
+
+ jobid = string(stdoutmsg)
+
+ return
+}
+
+func strigger(jobid, containerUUID, finishCommand string) {
+ cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini", fmt.Sprintf("--program=%s", finishCommand))
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ err := cmd.Run()
+ if err != nil {
+ log.Printf("While setting up strigger: %v", err)
}
}
@@ -136,47 +224,42 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
// Run container using the given crunch-run command
// Set the container state to Running
// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
- stdinReader, stdinWriter := io.Pipe()
+func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
- cmd := exec.Command("sbatch", "--job-name="+uuid, "--share")
- cmd.Stdin = stdinReader
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stderr
- if err := cmd.Start(); err != nil {
- log.Printf("Error running container for %v: %q", uuid, err)
+ jobid, err := submit(container, crunchRunCommand)
+ if err != nil {
+ log.Printf("Error queuing container run: %v", err)
return
}
- fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, uuid)
-
- stdinWriter.Close()
- cmd.Wait()
+ strigger(jobid, container.UUID, finishCommand)
// Update container status to Running
- err := arv.Update("containers", uuid,
+ err = arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Running"}},
nil)
if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+ log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
}
- log.Printf("Submitted container run for %v", uuid)
+ log.Printf("Submitted container run for %v", container.UUID)
+
+ containerUUID := container.UUID
// A goroutine to terminate the runner if container priority becomes zero
priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
go func() {
for _ = range priorityTicker.C {
var container Container
- err := arv.Get("containers", uuid, nil, &container)
+ err := arv.Get("containers", containerUUID, nil, &container)
if err != nil {
- log.Printf("Error getting container info for %v: %q", uuid, err)
+ log.Printf("Error getting container info for %v: %q", container.UUID, err)
} else {
if container.Priority == 0 {
- log.Printf("Canceling container %v", uuid)
+ log.Printf("Canceling container %v", container.UUID)
priorityTicker.Stop()
- cancelcmd := exec.Command("scancel", "--name="+uuid)
+ cancelcmd := exec.Command("scancel", "--name="+container.UUID)
cancelcmd.Run()
}
if container.State == "Complete" {
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
new file mode 100755
index 0000000..8be6fdd
--- /dev/null
+++ b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+jobid=$1
+
+uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
+
+arv containers update --uuid $uuid --container '{"state": "Completed"}'
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 64f0d77..039a649 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -604,7 +604,7 @@ func (runner *ContainerRunner) Run() (err error) {
if hosterr != nil {
runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
} else {
- runner.CrunchLog.Printf("Executing on host '%s'", runner.ContainerRecord.UUID, hostname)
+ runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
var runerr, waiterr error
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list