[ARVADOS] updated: 62dff431b7fbe97479cf98ac38e529388e16494f
Git user
git at public.curoverse.com
Tue Mar 15 20:01:24 EDT 2016
Summary of changes:
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 83 +++++++++++++---------
.../crunch-dispatch-slurm_test.go} | 75 +++++++++----------
.../crunch-dispatch-slurm/crunch-finish-slurm.sh | 8 ++-
3 files changed, 92 insertions(+), 74 deletions(-)
copy services/{crunch-dispatch-local/crunch-dispatch-local_test.go => crunch-dispatch-slurm/crunch-dispatch-slurm_test.go} (59%)
via 62dff431b7fbe97479cf98ac38e529388e16494f (commit)
via ad58141530559c7e0c7849811e83efe65ec33306 (commit)
from bf3a2814843a8f7a78592e3fb4c629fc9f4819b9 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 62dff431b7fbe97479cf98ac38e529388e16494f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Mar 15 20:01:19 2016 -0400
6518: Merge in tests. Code cleanup around variable naming and comments.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index c945269..bc1b0a5 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -85,7 +85,7 @@ func doMain() error {
}
// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
//
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
@@ -147,11 +147,13 @@ var striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, a
fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
}
-func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
- submiterr = nil
+// Submit job to slurm using sbatch.
+func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+ submitErr = nil
+ // Mark record as complete if anything errors out.
defer func() {
- if submiterr != nil {
+ if submitErr != nil {
// This really should be an "Error" state, see #8018
updateErr := arv.Update("containers", container.UUID,
arvadosclient.Dict{
@@ -163,64 +165,70 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
}
}()
+ // Create the command and attach to stdin/stdout
cmd := sbatchCmd(container.UUID)
stdinWriter, stdinerr := cmd.StdinPipe()
if stdinerr != nil {
- submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
+ submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
return
}
- stdoutReader, stdouterr := cmd.StdoutPipe()
- if stdouterr != nil {
- submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
+ stdoutReader, stdoutErr := cmd.StdoutPipe()
+ if stdoutErr != nil {
+ submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
return
}
- stderrReader, stderrerr := cmd.StderrPipe()
- if stderrerr != nil {
- submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
+ stderrReader, stderrErr := cmd.StderrPipe()
+ if stderrErr != nil {
+ submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
return
}
err := cmd.Start()
if err != nil {
- submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+ submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
return
}
- stdoutchan := make(chan []byte)
+ stdoutChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stdoutReader)
- stdoutchan <- b
- close(stdoutchan)
+ stdoutChan <- b
+ close(stdoutChan)
}()
- stderrchan := make(chan []byte)
+ stderrChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stderrReader)
- stderrchan <- b
- close(stderrchan)
+ stderrChan <- b
+ close(stderrChan)
}()
+ // Send a tiny script on stdin to execute the crunch-run command
+ // slurm actually enforces that this must be a #! script
fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
stdinWriter.Close()
err = cmd.Wait()
- stdoutmsg := <-stdoutchan
- stderrmsg := <-stderrchan
+ stdoutMsg := <-stdoutChan
+ stderrmsg := <-stderrChan
if err != nil {
- submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+ submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
return
}
- jobid = string(stdoutmsg)
+ // If everything worked out, got the jobid on stdout
+ jobid = string(stdoutMsg)
return
}
-func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
+// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
+// the slurm controller when the job finishes.
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -230,11 +238,10 @@ func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecur
}
}
-// Run queued container:
+// Run a queued container.
// Set container state to locked (TBD)
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
+// Submit job to slurm to execute crunch-run command for the container
+// If the container priority becomes zero while crunch job is still running, cancel the job.
func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
jobid, err := submit(container, crunchRunCommand)
@@ -247,9 +254,11 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
if arv.ApiInsecure {
insecure = "1"
}
- strigger(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+ finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- // Update container status to Running
+ // Update container status to Running, this is a temporary workaround
+ // to avoid resubmitting queued containers because record locking isn't
+ // implemented yet.
err = arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Running"}},
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 9b13f00..56dd3ce 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -92,14 +92,19 @@ func (s *TestSuite) Test_doMain(c *C) {
sigChan <- syscall.SIGINT
}()
- err := doMain()
- c.Check(err, IsNil)
-
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
var containers ContainerList
+ err := arv.List("containers", params, &containers)
+ c.Check(err, IsNil)
+ c.Assert(len(containers.Items), Equals, 1)
+
+ err = doMain()
+ c.Check(err, IsNil)
+
+ // There should be no queued containers now
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 0)
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
index 2977e1c..95a37ba 100755
--- a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
+++ b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
@@ -1,7 +1,8 @@
#!/bin/sh
-# I wonder if it is possible to attach metadata to job records to look these
-# things up instead of having to provide it on the command line.
+# Script to be called by strigger when a job finishes. This ensures the job
+# record has the correct state "Complete" even if the node running the job
+# failed.
ARVADOS_API_HOST=$1
ARVADOS_API_TOKEN=$2
@@ -9,6 +10,9 @@ ARVADOS_API_HOST_INSECURE=$3
uuid=$4
jobid=$5
+# If it is possible to attach metadata to job records we could look up the
+# above information instead of getting it on the command line. For example,
+# this is the recipe for getting the job name (container uuid) from the job id.
#uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE
commit ad58141530559c7e0c7849811e83efe65ec33306
Author: radhika <radhika at curoverse.com>
Date: Thu Mar 10 22:24:06 2016 -0500
6518: create sbatch and strigger commands using func declarations so that they can be overridden in tests.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 7f41a0a..c945269 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -136,6 +136,17 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand str
}
}
+// sbatchCmd
+var sbatchCmd = func(uuid string) *exec.Cmd {
+ return exec.Command("sbatch", "--job-name="+uuid, "--share", "--parsable")
+}
+
+// striggerCmd
+var striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+ return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
+ fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
+}
+
func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
submiterr = nil
@@ -152,7 +163,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
}
}()
- cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable")
+ cmd := sbatchCmd(container.UUID)
stdinWriter, stdinerr := cmd.StdinPipe()
if stdinerr != nil {
submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
@@ -210,8 +221,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
}
func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
- cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
- fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
+ cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
new file mode 100644
index 0000000..9b13f00
--- /dev/null
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -0,0 +1,149 @@
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "os/exec"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+
+ . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+var _ = Suite(&TestSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
+
+type TestSuite struct{}
+type MockArvadosServerSuite struct{}
+
+var initialArgs []string
+
+func (s *TestSuite) SetUpSuite(c *C) {
+ initialArgs = os.Args
+ arvadostest.StartAPI()
+}
+
+func (s *TestSuite) TearDownSuite(c *C) {
+ arvadostest.StopAPI()
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+ args := []string{"crunch-dispatch-slurm"}
+ os.Args = args
+
+ var err error
+ arv, err = arvadosclient.MakeArvadosClient()
+ if err != nil {
+ c.Fatalf("Error making arvados client: %s", err)
+ }
+}
+
+func (s *TestSuite) TearDownTest(c *C) {
+ arvadostest.ResetEnv()
+ os.Args = initialArgs
+}
+
+func (s *MockArvadosServerSuite) TearDownTest(c *C) {
+ arvadostest.ResetEnv()
+}
+
+func (s *TestSuite) Test_doMain(c *C) {
+ args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
+ os.Args = append(os.Args, args...)
+
+ // Override sbatchCmd
+ defer func(orig func(string) *exec.Cmd) {
+ sbatchCmd = orig
+ }(sbatchCmd)
+ sbatchCmd = func(uuid string) *exec.Cmd {
+ return exec.Command("echo", uuid)
+ }
+
+ // Override striggerCmd
+ defer func(orig func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd) {
+ striggerCmd = orig
+ }(striggerCmd)
+ striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+ go func() {
+ time.Sleep(5 * time.Second)
+ arv.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Complete"}},
+ nil)
+ }()
+ return exec.Command("echo", "strigger")
+ }
+
+ go func() {
+ time.Sleep(8 * time.Second)
+ sigChan <- syscall.SIGINT
+ }()
+
+ err := doMain()
+ c.Check(err, IsNil)
+
+ // There should be no queued containers now
+ params := arvadosclient.Dict{
+ "filters": [][]string{[]string{"state", "=", "Queued"}},
+ }
+ var containers ContainerList
+ err = arv.List("containers", params, &containers)
+ c.Check(err, IsNil)
+ c.Assert(len(containers.Items), Equals, 0)
+
+ // Previously "Queued" container should now be in "Complete" state
+ var container Container
+ err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
+ c.Check(err, IsNil)
+ c.Check(container.State, Equals, "Complete")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
+ apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
+
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+}
+
+func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+ apiStub := arvadostest.ServerStub{apiStubResponses}
+
+ api := httptest.NewServer(&apiStub)
+ defer api.Close()
+
+ arv = arvadosclient.ArvadosClient{
+ Scheme: "http",
+ ApiServer: api.URL[7:],
+ ApiToken: "abc123",
+ Client: &http.Client{Transport: &http.Transport{}},
+ Retries: 0,
+ }
+
+ tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
+ c.Check(err, IsNil)
+ defer os.Remove(tempfile.Name())
+ log.SetOutput(tempfile)
+
+ go func() {
+ time.Sleep(2 * time.Second)
+ sigChan <- syscall.SIGTERM
+ }()
+
+ runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+
+ buf, _ := ioutil.ReadFile(tempfile.Name())
+ c.Check(strings.Contains(string(buf), expected), Equals, true)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list