[ARVADOS] updated: 62dff431b7fbe97479cf98ac38e529388e16494f

Git user git at public.curoverse.com
Tue Mar 15 20:01:24 EDT 2016


Summary of changes:
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 83 +++++++++++++---------
 .../crunch-dispatch-slurm_test.go}                 | 75 +++++++++----------
 .../crunch-dispatch-slurm/crunch-finish-slurm.sh   |  8 ++-
 3 files changed, 92 insertions(+), 74 deletions(-)
 copy services/{crunch-dispatch-local/crunch-dispatch-local_test.go => crunch-dispatch-slurm/crunch-dispatch-slurm_test.go} (59%)

       via  62dff431b7fbe97479cf98ac38e529388e16494f (commit)
       via  ad58141530559c7e0c7849811e83efe65ec33306 (commit)
      from  bf3a2814843a8f7a78592e3fb4c629fc9f4819b9 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 62dff431b7fbe97479cf98ac38e529388e16494f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Mar 15 20:01:19 2016 -0400

    6518: Merge in tests.  Code cleanup around variable naming and comments.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index c945269..bc1b0a5 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -85,7 +85,7 @@ func doMain() error {
 }
 
 // Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
@@ -147,11 +147,13 @@ var striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, a
 		fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
 }
 
-func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
-	submiterr = nil
+// Submit job to slurm using sbatch.
+func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+	submitErr = nil
 
+	// Mark record as complete if anything errors out.
 	defer func() {
-		if submiterr != nil {
+		if submitErr != nil {
 			// This really should be an "Error" state, see #8018
 			updateErr := arv.Update("containers", container.UUID,
 				arvadosclient.Dict{
@@ -163,64 +165,70 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
 		}
 	}()
 
+	// Create the command and attach to stdin/stdout
 	cmd := sbatchCmd(container.UUID)
 	stdinWriter, stdinerr := cmd.StdinPipe()
 	if stdinerr != nil {
-		submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
+		submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
 		return
 	}
 
-	stdoutReader, stdouterr := cmd.StdoutPipe()
-	if stdouterr != nil {
-		submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
+	stdoutReader, stdoutErr := cmd.StdoutPipe()
+	if stdoutErr != nil {
+		submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
 		return
 	}
 
-	stderrReader, stderrerr := cmd.StderrPipe()
-	if stderrerr != nil {
-		submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
+	stderrReader, stderrErr := cmd.StderrPipe()
+	if stderrErr != nil {
+		submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
 		return
 	}
 
 	err := cmd.Start()
 	if err != nil {
-		submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+		submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
 		return
 	}
 
-	stdoutchan := make(chan []byte)
+	stdoutChan := make(chan []byte)
 	go func() {
 		b, _ := ioutil.ReadAll(stdoutReader)
-		stdoutchan <- b
-		close(stdoutchan)
+		stdoutChan <- b
+		close(stdoutChan)
 	}()
 
-	stderrchan := make(chan []byte)
+	stderrChan := make(chan []byte)
 	go func() {
 		b, _ := ioutil.ReadAll(stderrReader)
-		stderrchan <- b
-		close(stderrchan)
+		stderrChan <- b
+		close(stderrChan)
 	}()
 
+	// Send a tiny script on stdin to execute the crunch-run command
+	// slurm actually enforces that this must be a #! script
 	fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
 	stdinWriter.Close()
 
 	err = cmd.Wait()
 
-	stdoutmsg := <-stdoutchan
-	stderrmsg := <-stderrchan
+	stdoutMsg := <-stdoutChan
+	stderrmsg := <-stderrChan
 
 	if err != nil {
-		submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+		submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
 		return
 	}
 
-	jobid = string(stdoutmsg)
+	// If everything worked out, got the jobid on stdout
+	jobid = string(stdoutMsg)
 
 	return
 }
 
-func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
+// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
+// the slurm controller when the job finishes.
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
 	cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
@@ -230,11 +238,10 @@ func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecur
 	}
 }
 
-// Run queued container:
+// Run a queued container.
 // Set container state to locked (TBD)
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
+// Submit job to slurm to execute crunch-run command for the container
+// If the container priority becomes zero while crunch job is still running, cancel the job.
 func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
 
 	jobid, err := submit(container, crunchRunCommand)
@@ -247,9 +254,11 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
 	if arv.ApiInsecure {
 		insecure = "1"
 	}
-	strigger(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 
-	// Update container status to Running
+	// Update container status to Running, this is a temporary workaround
+	// to avoid resubmitting queued containers because record locking isn't
+	// implemented yet.
 	err = arv.Update("containers", container.UUID,
 		arvadosclient.Dict{
 			"container": arvadosclient.Dict{"state": "Running"}},
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 9b13f00..56dd3ce 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -92,14 +92,19 @@ func (s *TestSuite) Test_doMain(c *C) {
 		sigChan <- syscall.SIGINT
 	}()
 
-	err := doMain()
-	c.Check(err, IsNil)
-
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
 	var containers ContainerList
+	err := arv.List("containers", params, &containers)
+	c.Check(err, IsNil)
+	c.Assert(len(containers.Items), Equals, 1)
+
+	err = doMain()
+	c.Check(err, IsNil)
+
+	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
 	c.Assert(len(containers.Items), Equals, 0)
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
index 2977e1c..95a37ba 100755
--- a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
+++ b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
@@ -1,7 +1,8 @@
 #!/bin/sh
 
-# I wonder if it is possible to attach metadata to job records to look these
-# things up instead of having to provide it on the command line.
+# Script to be called by strigger when a job finishes.  This ensures the job
+# record has the correct state "Complete" even if the node running the job
+# failed.
 
 ARVADOS_API_HOST=$1
 ARVADOS_API_TOKEN=$2
@@ -9,6 +10,9 @@ ARVADOS_API_HOST_INSECURE=$3
 uuid=$4
 jobid=$5
 
+# If it is possible to attach metadata to job records we could look up the
+# above information instead of getting it on the command line.  For example,
+# this is the recipe for getting the job name (container uuid) from the job id.
 #uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
 
 export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE

commit ad58141530559c7e0c7849811e83efe65ec33306
Author: radhika <radhika at curoverse.com>
Date:   Thu Mar 10 22:24:06 2016 -0500

    6518: create sbatch and strigger commands using func declarations so that they can be overridden in tests.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 7f41a0a..c945269 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -136,6 +136,17 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand str
 	}
 }
 
+// sbatchCmd
+var sbatchCmd = func(uuid string) *exec.Cmd {
+	return exec.Command("sbatch", "--job-name="+uuid, "--share", "--parsable")
+}
+
+// striggerCmd
+var striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+	return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
+		fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
+}
+
 func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
 	submiterr = nil
 
@@ -152,7 +163,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
 		}
 	}()
 
-	cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable")
+	cmd := sbatchCmd(container.UUID)
 	stdinWriter, stdinerr := cmd.StdinPipe()
 	if stdinerr != nil {
 		submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
@@ -210,8 +221,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
 }
 
 func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
-	cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
-		fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
+	cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
 	err := cmd.Run()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
new file mode 100644
index 0000000..9b13f00
--- /dev/null
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -0,0 +1,149 @@
+package main
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
+	"io/ioutil"
+	"log"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"os/exec"
+	"strings"
+	"syscall"
+	"testing"
+	"time"
+
+	. "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+var _ = Suite(&TestSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
+
+type TestSuite struct{}
+type MockArvadosServerSuite struct{}
+
+var initialArgs []string
+
+func (s *TestSuite) SetUpSuite(c *C) {
+	initialArgs = os.Args
+	arvadostest.StartAPI()
+}
+
+func (s *TestSuite) TearDownSuite(c *C) {
+	arvadostest.StopAPI()
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+	args := []string{"crunch-dispatch-slurm"}
+	os.Args = args
+
+	var err error
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		c.Fatalf("Error making arvados client: %s", err)
+	}
+}
+
+func (s *TestSuite) TearDownTest(c *C) {
+	arvadostest.ResetEnv()
+	os.Args = initialArgs
+}
+
+func (s *MockArvadosServerSuite) TearDownTest(c *C) {
+	arvadostest.ResetEnv()
+}
+
+func (s *TestSuite) Test_doMain(c *C) {
+	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
+	os.Args = append(os.Args, args...)
+
+	// Override sbatchCmd
+	defer func(orig func(string) *exec.Cmd) {
+		sbatchCmd = orig
+	}(sbatchCmd)
+	sbatchCmd = func(uuid string) *exec.Cmd {
+		return exec.Command("echo", uuid)
+	}
+
+	// Override striggerCmd
+	defer func(orig func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd) {
+		striggerCmd = orig
+	}(striggerCmd)
+	striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+		go func() {
+			time.Sleep(5 * time.Second)
+			arv.Update("containers", containerUUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"state": "Complete"}},
+				nil)
+		}()
+		return exec.Command("echo", "strigger")
+	}
+
+	go func() {
+		time.Sleep(8 * time.Second)
+		sigChan <- syscall.SIGINT
+	}()
+
+	err := doMain()
+	c.Check(err, IsNil)
+
+	// There should be no queued containers now
+	params := arvadosclient.Dict{
+		"filters": [][]string{[]string{"state", "=", "Queued"}},
+	}
+	var containers ContainerList
+	err = arv.List("containers", params, &containers)
+	c.Check(err, IsNil)
+	c.Assert(len(containers.Items), Equals, 0)
+
+	// Previously "Queued" container should now be in "Complete" state
+	var container Container
+	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
+	c.Check(err, IsNil)
+	c.Check(container.State, Equals, "Complete")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
+
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+}
+
+func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+	apiStub := arvadostest.ServerStub{apiStubResponses}
+
+	api := httptest.NewServer(&apiStub)
+	defer api.Close()
+
+	arv = arvadosclient.ArvadosClient{
+		Scheme:    "http",
+		ApiServer: api.URL[7:],
+		ApiToken:  "abc123",
+		Client:    &http.Client{Transport: &http.Transport{}},
+		Retries:   0,
+	}
+
+	tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
+	c.Check(err, IsNil)
+	defer os.Remove(tempfile.Name())
+	log.SetOutput(tempfile)
+
+	go func() {
+		time.Sleep(2 * time.Second)
+		sigChan <- syscall.SIGTERM
+	}()
+
+	runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+
+	buf, _ := ioutil.ReadFile(tempfile.Name())
+	c.Check(strings.Contains(string(buf), expected), Equals, true)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list