[ARVADOS] updated: 11d484489c21487074ecb6d15b086c02b13e326f

Git user git at public.curoverse.com
Wed May 25 09:29:45 EDT 2016


Summary of changes:
 sdk/go/dispatch/dispatch.go                        | 136 ++++++++++++---------
 services/api/app/models/container.rb               |   7 +-
 .../crunch-dispatch-local/crunch-dispatch-local.go | 100 +++++++--------
 .../crunch-dispatch-local_test.go                  |  22 ++--
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go |  29 +++--
 .../crunch-dispatch-slurm_test.go                  |  33 ++---
 6 files changed, 185 insertions(+), 142 deletions(-)

       via  11d484489c21487074ecb6d15b086c02b13e326f (commit)
      from  1032d5957a67aa706079917f3c20bdb1a5c91ad0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 11d484489c21487074ecb6d15b086c02b13e326f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed May 25 08:54:23 2016 -0400

    9187: Explicitly query for monitored containers that are not listed (usually complete).  Tests pass.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 926a550..9ce7bf8 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -10,6 +10,14 @@ import (
 	"time"
 )
 
+const (
+	Queued    = "Queued"
+	Locked    = "Locked"
+	Running   = "Running"
+	Complete  = "Complete"
+	Cancelled = "Cancelled"
+)
+
 type apiClientAuthorization struct {
 	UUID     string `json:"uuid"`
 	APIToken string `json:"api_token"`
@@ -34,15 +42,15 @@ type ContainerList struct {
 }
 
 type DispatcherState struct {
-	mineMutex      sync.Mutex
-	mineMap        map[string]chan Container
-	pollInterval   time.Duration
-	Arv            arvadosclient.ArvadosClient
-	auth           apiClientAuthorization
-	containers     chan []Container
-	DoneProcessing chan struct{}
-	runContainer   func(*DispatcherState, Container, chan Container)
-	waitContainer  func(*DispatcherState, Container, chan Container)
+	mineMutex        sync.Mutex
+	mineMap          map[string]chan Container
+	pollInterval     time.Duration
+	Arv              arvadosclient.ArvadosClient
+	auth             apiClientAuthorization
+	containers       chan Container
+	DoneProcessing   chan struct{}
+	runContainer     func(*DispatcherState, Container)
+	monitorContainer func(*DispatcherState, string, chan Container)
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
@@ -82,37 +90,46 @@ func (dispatcher *DispatcherState) updateMine(c Container) bool {
 	return false
 }
 
+func (dispatcher *DispatcherState) getContainers(params arvadosclient.Dict, touched map[string]bool) {
+	// XXX needs to handle paging
+	var containers ContainerList
+	err := dispatcher.Arv.List("containers", params, &containers)
+	if err != nil {
+		log.Printf("Error getting list of containers: %q", err)
+	} else {
+		for _, container := range containers.Items {
+			touched[container.UUID] = true
+			dispatcher.containers <- container
+		}
+	}
+}
+
 func (dispatcher *DispatcherState) pollContainers() {
 	ticker := time.NewTicker(dispatcher.pollInterval)
 
 	paramsQ := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "=", "Queued"},
-			{"priority", ">", "0"}},
-		"order": []string{"priority desc"}}
+		"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
+		"order":   []string{"priority desc"}}
 	paramsP := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "in", []string{"Locked", "Running"}},
-			{"locked_by_uuid", "=", dispatcher.auth.UUID}}}
+		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}}}
 
 	for {
 		select {
 		case <-ticker.C:
-			{
-				var containers ContainerList
-				err := dispatcher.Arv.List("containers", paramsQ, &containers)
-				if err != nil {
-					log.Printf("Error getting list of containers: %q", err)
-				} else {
-					dispatcher.containers <- containers.Items
+			touched := make(map[string]bool)
+			dispatcher.getContainers(paramsQ, touched)
+			dispatcher.getContainers(paramsP, touched)
+			dispatcher.mineMutex.Lock()
+			var monitored []string
+			for k, _ := range dispatcher.mineMap {
+				if _, ok := touched[k]; !ok {
+					monitored = append(monitored, k)
 				}
 			}
-			{
-				var containers ContainerList
-				err := dispatcher.Arv.List("containers", paramsP, &containers)
-				if err != nil {
-					log.Printf("Error getting list of containers: %q", err)
-				} else {
-					dispatcher.containers <- containers.Items
-				}
+			dispatcher.mineMutex.Unlock()
+			if monitored != nil {
+				dispatcher.getContainers(arvadosclient.Dict{
+					"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
 			}
 		case <-dispatcher.DoneProcessing:
 			close(dispatcher.containers)
@@ -123,35 +140,40 @@ func (dispatcher *DispatcherState) pollContainers() {
 }
 
 func (dispatcher *DispatcherState) handleContainers() {
-	for containerlist := range dispatcher.containers {
-		for _, container := range containerlist {
-			if dispatcher.updateMine(container) {
-				continue
+	for container := range dispatcher.containers {
+		if dispatcher.updateMine(container) {
+			if container.State == Complete || container.State == Cancelled {
+				log.Printf("Container %v now in state %v", container.UUID, container.State)
+				dispatcher.notMine(container.UUID)
 			}
+			continue
+		}
 
-			if container.State == "Locked" || container.State == "Running" {
-				log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
-					"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
-					container.UUID, dispatcher.auth.UUID)
-				go func(c Container, ch chan Container) {
-					defer dispatcher.notMine(c.UUID)
-					dispatcher.waitContainer(dispatcher, c, ch)
-				}(container, dispatcher.setMine(container.UUID))
-				continue
-			}
+		if container.State == Locked || container.State == Running {
+			log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+				"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+				container.UUID, dispatcher.auth.UUID)
+			go func(uuid string, ch chan Container) {
+				dispatcher.monitorContainer(dispatcher, uuid, ch)
+			}(container.UUID, dispatcher.setMine(container.UUID))
+			continue
+		}
 
-			// Lock container to this dispatcher
-			if err := dispatcher.UpdateState(container.UUID, "Locked"); err != nil {
-				continue
-			}
+		if container.State != Queued {
+			// Not queued, pass
+			continue
+		}
 
-			// Run it
-			go func(c Container, ch chan Container) {
-				defer dispatcher.notMine(c.UUID)
-				dispatcher.runContainer(dispatcher, c, ch)
-				dispatcher.waitContainer(dispatcher, c, ch)
-			}(container, dispatcher.setMine(container.UUID))
+		// Must be queued, so try to lock container to this dispatcher
+		if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+			continue
 		}
+
+		// Run it
+		go func(c Container, ch chan Container) {
+			dispatcher.runContainer(dispatcher, c)
+			dispatcher.monitorContainer(dispatcher, c.UUID, ch)
+		}(container, dispatcher.setMine(container.UUID))
 	}
 }
 
@@ -187,8 +209,8 @@ func (dispatcher *DispatcherState) RunDispatcher() {
 
 func MakeDispatcher(arv arvadosclient.ArvadosClient,
 	pollInterval time.Duration,
-	runContainer func(*DispatcherState, Container, chan Container),
-	waitContainer func(*DispatcherState, Container, chan Container)) (*DispatcherState, error) {
+	runContainer func(*DispatcherState, Container),
+	monitorContainer func(*DispatcherState, string, chan Container)) (*DispatcherState, error) {
 
 	dispatcher := &DispatcherState{}
 	dispatcher.Arv = arv
@@ -201,10 +223,10 @@ func MakeDispatcher(arv arvadosclient.ArvadosClient,
 
 	dispatcher.pollInterval = pollInterval
 	dispatcher.runContainer = runContainer
-	dispatcher.waitContainer = waitContainer
+	dispatcher.monitorContainer = monitorContainer
 	dispatcher.mineMap = make(map[string]chan Container)
 	dispatcher.DoneProcessing = make(chan struct{})
-	dispatcher.containers = make(chan []Container)
+	dispatcher.containers = make(chan Container)
 
 	return dispatcher, nil
 }
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 4c77008..94cb0d3 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -56,7 +56,7 @@ class Container < ArvadosModel
   State_transitions = {
     nil => [Queued],
     Queued => [Locked, Cancelled],
-    Locked => [Queued, Running, Cancelled],
+    Locked => [Queued, Running, Cancelled, Complete],
     Running => [Complete, Cancelled]
   }
 
@@ -125,8 +125,11 @@ class Container < ArvadosModel
       end
 
     when Complete
-      if self.state_was == Running
+      case self.state_was
+      when Running
         permitted.push :finished_at, :output, :log, :exit_code
+      when Locked
+        permitted.push :finished_at
       end
 
     when Cancelled
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 1f19a29..5e2e34e 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -48,10 +48,11 @@ func doMain() error {
 		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
+	arv.Retries = 25
 
 	var dispatcher *dispatch.DispatcherState
 	dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
-		run, setFinalState)
+		run, monitorRun)
 	if err != nil {
 		return err
 	}
@@ -76,19 +77,12 @@ func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
 
 var startCmd = startFunc
 
-// Run queued container:
-// Set container state to Locked
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
 func run(dispatcher *dispatch.DispatcherState,
-	container dispatch.Container,
-	status chan dispatch.Container) {
+	container dispatch.Container) {
 
 	uuid := container.UUID
 
 	waitGroup.Add(1)
-	defer waitGroup.Done()
 
 	cmd := exec.Command(*crunchRunCommand, uuid)
 	cmd.Stdin = nil
@@ -103,63 +97,71 @@ func run(dispatcher *dispatch.DispatcherState,
 	if err := startCmd(container, cmd); err != nil {
 		runningCmdsMutex.Unlock()
 		log.Printf("Error starting crunch-run for %v: %q", uuid, err)
-		dispatcher.UpdateState(uuid, "Cancelled")
-		go func() {
-			for _ = range status {
-			}
-		}()
+		dispatcher.UpdateState(uuid, dispatch.Complete)
+		waitGroup.Done()
 		return
 	}
 	runningCmds[uuid] = cmd
 	runningCmdsMutex.Unlock()
+}
+
+func monitorRun(dispatcher *dispatch.DispatcherState,
+	uuid string,
+	status chan dispatch.Container) {
 
-	// Interrupt the child process if priority changes to 0
-	go func() {
-		for c := range status {
-			if (c.State == "Locked" || c.State == "Running") && c.Priority == 0 {
-				log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
-				cmd.Process.Signal(os.Interrupt)
+	runningCmdsMutex.Lock()
+	cmd, ok := runningCmds[uuid]
+	runningCmdsMutex.Unlock()
+
+	if ok {
+		// Interrupt the child process if priority changes to 0
+		go func() {
+			for c := range status {
+				if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
+					log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+					cmd.Process.Signal(os.Interrupt)
+				}
 			}
+		}()
+
+		// Wait for crunch-run to exit
+		if _, err := cmd.Process.Wait(); err != nil {
+			log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
 		}
-	}()
 
-	// Wait for crunch-run to exit
-	if _, err := cmd.Process.Wait(); err != nil {
-		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
-	}
+		waitGroup.Done()
 
-	log.Printf("Finished container run for %v", uuid)
-}
+		log.Printf("Finished container run for %v", uuid)
 
-func setFinalState(dispatcher *dispatch.DispatcherState,
-	container dispatch.Container,
-	status chan dispatch.Container) {
-
-	uuid := container.UUID
+		// Remove the crunch job from runningCmds
+		runningCmdsMutex.Lock()
+		delete(runningCmds, uuid)
+		runningCmdsMutex.Unlock()
+	} else {
+		go func() {
+			for _ = range status {
+			}
+		}()
+	}
 
-	// The container state should be 'Complete' if everything went well.
-	// If the container is "Running" or "Locked", that's an error, so
-	// change it to "Cancelled".  TODO: perhaps this should be "Error"
-	// state instead?
+	// The container state should be 'Complete'.  If the container is still
+	// "Running" or "Locked", but we know it isn't actually running, so in
+	// that case change it to "Complete".
+	var container dispatch.Container
 	err := dispatcher.Arv.Get("containers", uuid, nil, &container)
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
-	fixState := map[string]string{
-		"Running": "Cancelled",
-		"Locked":  "Cancelled",
-	}
-	if newState, ok := fixState[container.State]; ok {
+	if container.State == dispatch.Running || container.State == dispatch.Locked {
+		var newState string
+		if container.Priority == 0 {
+			newState = dispatch.Cancelled
+		} else {
+			newState = dispatch.Complete
+		}
 		log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
-		dispatcher.UpdateState(uuid, newState)
-	}
-
-	// Remove the crunch job from runningCmds
-	runningCmdsMutex.Lock()
-	if _, ok := runningCmds[uuid]; ok {
-		delete(runningCmds, uuid)
+		dispatcher.UpdateState(uuid, dispatch.Cancelled)
 	}
-	runningCmdsMutex.Unlock()
 
 	log.Printf("Finalized container %v", uuid)
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 9a727bc..b169575 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -12,6 +12,7 @@ import (
 	"net/http/httptest"
 	"os"
 	"os/exec"
+	"strings"
 	"testing"
 	"time"
 )
@@ -63,9 +64,9 @@ func (s *TestSuite) TestIntegration(c *C) {
 	dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
 		run,
 		func(dispatcher *dispatch.DispatcherState,
-			container dispatch.Container,
+			container string,
 			status chan dispatch.Container) {
-			setFinalState(dispatcher, container, status)
+			monitorRun(dispatcher, container, status)
 			dispatcher.DoneProcessing <- struct{}{}
 		})
 	c.Assert(err, IsNil)
@@ -107,7 +108,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
 		arvadostest.StubResponse{500, string(`{}`)}
 
@@ -117,7 +118,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
 		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
 
@@ -128,7 +129,14 @@ func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
 func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`)}
+
+	// seq := &arvadostest.ResponseSequence{0,
+	// 	[]arvadostest.StubResponse{{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`), nil},
+	// 		{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Running"}]}`), nil},
+	// 		{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Complete"}]}`), nil}}}
+	//apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{0, "", seq.Respond}
+
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
 		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
 
@@ -161,9 +169,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
 		run,
 		func(dispatcher *dispatch.DispatcherState,
-			container dispatch.Container,
+			container string,
 			status chan dispatch.Container) {
-			setFinalState(dispatcher, container, status)
+			monitorRun(dispatcher, container, status)
 			dispatcher.DoneProcessing <- struct{}{}
 		})
 	c.Assert(err, IsNil)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 649b784..e0b7ce8 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -12,6 +12,7 @@ import (
 	"os"
 	"os/exec"
 	"strconv"
+	"strings"
 	"time"
 )
 
@@ -53,10 +54,11 @@ func doMain() error {
 		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
+	arv.Retries = 25
 
 	var dispatcher *dispatch.DispatcherState
 	dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
-		run, waitContainer)
+		run, monitorContainer)
 	if err != nil {
 		return err
 	}
@@ -163,7 +165,7 @@ func submit(dispatcher *dispatch.DispatcherState,
 	}
 
 	// If everything worked out, got the jobid on stdout
-	jobid = string(stdoutMsg)
+	jobid = strings.TrimSpace(string(stdoutMsg))
 
 	return
 }
@@ -188,10 +190,9 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arva
 }
 
 // Run a queued container: [1] Set container state to locked. [2]
-// Execute crunch-run as a slurm batch job. [3] waitContainer().
+// Execute crunch-run as a slurm batch job. [3] monitorContainer().
 func run(dispatcher *dispatch.DispatcherState,
-	container dispatch.Container,
-	status chan dispatch.Container) {
+	container dispatch.Container) {
 
 	log.Printf("About to submit queued container %v", container.UUID)
 
@@ -208,15 +209,18 @@ func run(dispatcher *dispatch.DispatcherState,
 
 // Wait for a container to finish. Cancel the slurm job if the
 // container priority changes to zero before it ends.
-func waitContainer(dispatcher *dispatch.DispatcherState,
-	container dispatch.Container,
+func monitorContainer(dispatcher *dispatch.DispatcherState,
+	uuid string,
 	status chan dispatch.Container) {
 
-	log.Printf("Monitoring container %v started", container.UUID)
-	defer log.Printf("Monitoring container %v finished", container.UUID)
+	log.Printf("Monitoring container %v started", uuid)
+	defer log.Printf("Monitoring container %v finished", uuid)
+
+	var container dispatch.Container
+	dispatcher.Arv.Get("containers", uuid, nil, &container)
 
-	for container = range status {
-		if (container.State == "Locked" || container.State == "Running") && container.Priority == 0 {
+	for container := range status {
+		if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
 			log.Printf("Canceling container %s", container.UUID)
 
 			err := exec.Command("scancel", "--name="+container.UUID).Run()
@@ -233,11 +237,10 @@ func waitContainer(dispatcher *dispatch.DispatcherState,
 
 			err = dispatcher.Arv.Update("containers", container.UUID,
 				arvadosclient.Dict{
-					"container": arvadosclient.Dict{"state": "Cancelled"}},
+					"container": arvadosclient.Dict{"state": dispatch.Cancelled}},
 				nil)
 			if err != nil {
 				log.Printf("Error updating state for container %s: %s", container.UUID, err)
-				continue
 			}
 		}
 	}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3658737..34f497c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -85,14 +85,12 @@ func (s *TestSuite) TestIntegration(c *C) {
 			apiHost, apiToken, apiInsecure).Args
 		go func() {
 			time.Sleep(5 * time.Second)
-			for _, state := range []string{"Running", "Complete"} {
-				arv.Update("containers", containerUUID,
-					arvadosclient.Dict{
-						"container": arvadosclient.Dict{"state": state}},
-					nil)
-			}
+			arv.Update("containers", containerUUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"state": dispatch.Complete}},
+				nil)
 		}()
-		return exec.Command("echo", "strigger")
+		return exec.Command("echo", striggerCmdLine...)
 	}
 
 	// There should be no queued containers now
@@ -106,14 +104,19 @@ func (s *TestSuite) TestIntegration(c *C) {
 
 	echo := "echo"
 	crunchRunCommand = &echo
+	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+	finishCommand = &finishCmd
 
 	var dispatcher *dispatch.DispatcherState
 	dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
 		run,
 		func(dispatcher *dispatch.DispatcherState,
-			container dispatch.Container,
+			container string,
 			status chan dispatch.Container) {
-			waitContainer(dispatcher, container, status)
+			go func() {
+				dispatcher.UpdateState(container, "Complete")
+			}()
+			monitorContainer(dispatcher, container, status)
 			dispatcher.DoneProcessing <- struct{}{}
 		})
 	c.Assert(err, IsNil)
@@ -127,7 +130,7 @@ func (s *TestSuite) TestIntegration(c *C) {
 		fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
 	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
-	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
+	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
 		"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
 
 	// There should be no queued containers now
@@ -147,7 +150,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 	apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -168,14 +171,16 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	log.SetOutput(buf)
 	defer log.SetOutput(os.Stderr)
 
-	*crunchRunCommand = crunchCmd
+	crunchRunCommand = &crunchCmd
+	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+	finishCommand = &finishCmd
 
 	dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
 		run,
 		func(dispatcher *dispatch.DispatcherState,
-			container dispatch.Container,
+			container string,
 			status chan dispatch.Container) {
-			waitContainer(dispatcher, container, status)
+			monitorContainer(dispatcher, container, status)
 			dispatcher.DoneProcessing <- struct{}{}
 		})
 	c.Assert(err, IsNil)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list