[ARVADOS] updated: 3ae9a789410e93eeb31ca5670c17a6d03d77f608

Git user git at public.curoverse.com
Wed Jun 1 16:06:33 EDT 2016


Summary of changes:
 sdk/go/dispatch/dispatch.go                        |  86 +++++----
 .../crunch-dispatch-local/crunch-dispatch-local.go |   3 +-
 .../crunch-dispatch-local_test.go                  |   6 +-
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 200 ++++++++++++++-------
 .../crunch-dispatch-slurm_test.go                  |  80 +++++----
 .../crunch-dispatch-slurm/crunch-finish-slurm.sh   |  20 ---
 6 files changed, 234 insertions(+), 161 deletions(-)
 delete mode 100755 services/crunch-dispatch-slurm/crunch-finish-slurm.sh

       via  3ae9a789410e93eeb31ca5670c17a6d03d77f608 (commit)
       via  3a3910fdc8a5003c182f68e3423c96327a136175 (commit)
      from  4153cb6cfad920ed0b1a4b818d3bcc8de492d134 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 3ae9a789410e93eeb31ca5670c17a6d03d77f608
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 1 16:06:26 2016 -0400

    9187: Slurm dispatcher improvements around squeue
    
    * Clarify that status updates are not guaranteed to be delivered on a
    heartbeat.
    * Refactor slurm dispatcher to monitor the container in squeue in a separate
    goroutine.
    * Refactor polling squeue to a single goroutine and cache the results so that
    monitoring 100 containers doesn't result in 100 calls to squeue.
    * No longer set up strigger to cancel job on finish, instead cancel running
    jobs not in squeue.
    * Test both cases where a job is/is not in squeue.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index a27971f..785b6ec 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,3 +1,6 @@
+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
 package dispatch
 
 import (
@@ -28,7 +31,7 @@ type apiClientAuthorizationList struct {
 	Items []apiClientAuthorization `json:"items"`
 }
 
-// Container data
+// Represents an Arvados container record
 type Container struct {
 	UUID               string           `json:"uuid"`
 	State              string           `json:"state"`
@@ -45,9 +48,27 @@ type ContainerList struct {
 
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
-	Arv            arvadosclient.ArvadosClient
-	RunContainer   func(*Dispatcher, Container, chan Container)
-	PollInterval   time.Duration
+	// The Arvados client
+	Arv arvadosclient.ArvadosClient
+
+	// When a new queued container appears and is either already owned by
+	// this dispatcher or is successfully locked, the dispatcher will call
+	// go RunContainer().  The RunContainer() goroutine gets a channel over
+	// which it will receive updates to the container state.  The
+	// RunContainer() goroutine should only assume status updates come when
+	// the container record changes on the API server; if it needs to
+	// monitor the job submission to the underlying slurm/grid engine/etc
+	// queue it should spin up its own polling goroutines.  When the
+	// channel is closed, that means the container is no longer being
+	// handled by this dispatcher and the goroutine should terminate.  The
+	// goroutine is responsible for draining the 'status' channel, failure
+	// to do so may deadlock the dispatcher.
+	RunContainer func(*Dispatcher, Container, chan Container)
+
+	// Amount of time to wait between polling for updates.
+	PollInterval time.Duration
+
+	// Channel used to signal that RunDispatcher loop should exit.
 	DoneProcessing chan struct{}
 
 	mineMutex  sync.Mutex
@@ -159,7 +180,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
 		// back to Queued and then locked by another dispatcher,
 		// LockedByUUID will be different.  In either case, we want
 		// to stop monitoring it.
-		log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+		log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
 		dispatcher.notMine(container.UUID)
 		return
 	}
@@ -191,7 +212,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 			"container": arvadosclient.Dict{"state": newState}},
 		nil)
 	if err != nil {
-		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+		log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
 	}
 	return err
 }
@@ -199,14 +220,6 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 // RunDispatcher runs the main loop of the dispatcher until receiving a message
 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer().  If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer().  The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
 	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
 	if err != nil {
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index aca60e9..0248f18 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -115,7 +115,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
 		arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
+	testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to state \"Locked\"")
 }
 
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -123,7 +123,7 @@ func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
 	apiStubResponses["/arvados/v1/containers"] =
 		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
-		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
 
 	testWithServerStub(c, apiStubResponses, "echo",
 		`After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2".  Updating it to "Cancelled"`)
@@ -142,7 +142,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
 	apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
-		arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+		arvadostest.StubResponse{200, string(`{"uuid": "` + arvadostest.Dispatch1AuthUUID + `", "api_token": "xyz"}`)}
 
 	apiStub := arvadostest.ServerStub{apiStubResponses}
 
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 641b4bc..3e14820 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -14,9 +14,16 @@ import (
 	"os"
 	"os/exec"
 	"strings"
+	"sync"
 	"time"
 )
 
+type Squeue struct {
+	sync.Mutex
+	squeueContents []string
+	SqueueDone     chan struct{}
+}
+
 func main() {
 	err := doMain()
 	if err != nil {
@@ -26,7 +33,7 @@ func main() {
 
 var (
 	crunchRunCommand *string
-	finishCommand    *string
+	squeueUpdater    Squeue
 )
 
 func doMain() error {
@@ -42,11 +49,6 @@ func doMain() error {
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
 
-	finishCommand = flags.String(
-		"finish-command",
-		"/usr/bin/crunch-finish-slurm.sh",
-		"Command to run from strigger when job is finished")
-
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
@@ -63,11 +65,17 @@ func doMain() error {
 		PollInterval:   time.Duration(*pollInterval) * time.Second,
 		DoneProcessing: make(chan struct{})}
 
+	squeueUpdater.SqueueDone = make(chan struct{})
+	go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
+
 	err = dispatcher.RunDispatcher()
 	if err != nil {
 		return err
 	}
 
+	squeueUpdater.SqueueDone <- struct{}{}
+	close(squeueUpdater.SqueueDone)
+
 	return nil
 }
 
@@ -81,19 +89,12 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
 		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-// striggerCmd
-func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
-	return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
-		fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
-}
-
 // squeueFunc
 func squeueFunc() *exec.Cmd {
 	return exec.Command("squeue", "--format=%j")
 }
 
 // Wrap these so that they can be overridden by tests
-var striggerCmd = striggerFunc
 var sbatchCmd = sbatchFunc
 var squeueCmd = squeueFunc
 
@@ -182,44 +183,66 @@ func submit(dispatcher *dispatch.Dispatcher,
 	return
 }
 
-// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
-// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
-	insecure := "0"
-	if arv.ApiInsecure {
-		insecure = "1"
-	}
-	cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-	cmd.Stdout = os.Stdout
-	cmd.Stderr = os.Stderr
-	err := cmd.Run()
-	if err != nil {
-		log.Printf("While setting up strigger: %v", err)
-		// BUG: we drop the error here and forget about it. A
-		// human has to notice the container is stuck in
-		// Running state, and fix it manually.
-	}
-}
+func (squeue *Squeue) runSqueue() ([]string, error) {
+	var newSqueueContents []string
 
-func checkSqueue(uuid string) (bool, error) {
 	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
-		return false, err
+		return nil, err
 	}
 	cmd.Start()
-	defer cmd.Wait()
 	scanner := bufio.NewScanner(sq)
-	found := false
 	for scanner.Scan() {
-		if scanner.Text() == uuid {
-			found = true
-		}
+		newSqueueContents = append(newSqueueContents, scanner.Text())
 	}
 	if err := scanner.Err(); err != nil {
-		return false, err
+		cmd.Wait()
+		return nil, err
+	}
+
+	err = cmd.Wait()
+	if err != nil {
+		return nil, err
+	}
+
+	return newSqueueContents, nil
+}
+
+func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
+	if check {
+		n, err := squeue.runSqueue()
+		if err != nil {
+			return false, err
+		}
+		squeue.Lock()
+		squeue.squeueContents = n
+		squeue.Unlock()
+	}
+
+	if uuid != "" {
+		squeue.Lock()
+		defer squeue.Unlock()
+		for _, k := range squeue.squeueContents {
+			if k == uuid {
+				return true, nil
+			}
+		}
+	}
+	return false, nil
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+	// TODO: considering using "squeue -i" instead of polling squeue.
+	ticker := time.NewTicker(pollInterval)
+	for {
+		select {
+		case <-squeueUpdater.SqueueDone:
+			return
+		case <-ticker.C:
+			squeue.CheckSqueue("", true)
+		}
 	}
-	return found, nil
 }
 
 // Run or monitor a container.
@@ -239,50 +262,91 @@ func run(dispatcher *dispatch.Dispatcher,
 	uuid := container.UUID
 
 	if container.State == dispatch.Locked {
-		if inQ, err := checkSqueue(container.UUID); err != nil {
+		if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+			// maybe squeue is broken, put it back in the queue
 			log.Printf("Error running squeue: %v", err)
-			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+			dispatcher.UpdateState(container.UUID, dispatch.Queued)
 		} else if !inQ {
 			log.Printf("About to submit queued container %v", container.UUID)
 
-			jobid, err := submit(dispatcher, container, *crunchRunCommand)
-			if err != nil {
-				log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
-			} else {
-				finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+			if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+				log.Printf("Error submitting container %s to slurm: %v",
+					container.UUID, err)
+				// maybe sbatch is broken, put it back to queued
+				dispatcher.UpdateState(container.UUID, dispatch.Queued)
 			}
 		}
-	} else if container.State == dispatch.Running {
-		if inQ, err := checkSqueue(container.UUID); err != nil {
-			log.Printf("Error running squeue: %v", err)
-			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-		} else if !inQ {
-			log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
-			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-		}
 	}
 
 	log.Printf("Monitoring container %v started", uuid)
 
-	for container = range status {
-		if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
-			log.Printf("Canceling container %s", container.UUID)
-
-			err := exec.Command("scancel", "--name="+container.UUID).Run()
-			if err != nil {
-				log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
-				if inQ, err := checkSqueue(container.UUID); err != nil {
+	// periodically check squeue
+	doneSqueue := make(chan struct{})
+	go func() {
+		squeueUpdater.CheckSqueue(container.UUID, true)
+		ticker := time.NewTicker(dispatcher.PollInterval)
+		for {
+			select {
+			case <-ticker.C:
+				if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
 					log.Printf("Error running squeue: %v", err)
-					continue
-				} else if inQ {
-					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
-					continue
+					// don't cancel, just leave it the way it is
+				} else if !inQ {
+					var con dispatch.Container
+					err := dispatcher.Arv.Get("containers", uuid, nil, &con)
+					if err != nil {
+						log.Printf("Error getting final container state: %v", err)
+					}
+
+					var st string
+					switch con.State {
+					case dispatch.Locked:
+						st = dispatch.Queued
+					case dispatch.Running:
+						st = dispatch.Cancelled
+					default:
+						st = ""
+					}
+
+					if st != "" {
+						log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+							uuid, con.State, st)
+						dispatcher.UpdateState(uuid, st)
+					}
 				}
+			case <-doneSqueue:
+				close(doneSqueue)
+				ticker.Stop()
+				return
 			}
+		}
+	}()
 
-			err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+	for container = range status {
+		if container.State == dispatch.Locked || container.State == dispatch.Running {
+			if container.Priority == 0 {
+				log.Printf("Canceling container %s", container.UUID)
+
+				err := exec.Command("scancel", "--name="+container.UUID).Run()
+				if err != nil {
+					log.Printf("Error stopping container %s with scancel: %v",
+						container.UUID, err)
+					if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+						log.Printf("Error running squeue: %v", err)
+						continue
+					} else if inQ {
+						log.Printf("Container %s is still in squeue after scancel.",
+							container.UUID)
+						continue
+					}
+				}
+
+				err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+			}
 		}
 	}
 
+	doneSqueue <- struct{}{}
+
 	log.Printf("Monitoring container %v finished", uuid)
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 348d5e4..d30c5df 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -1,12 +1,12 @@
 package main
 
 import (
+	"bytes"
+	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
-
-	"bytes"
-	"fmt"
+	"io"
 	"log"
 	"math"
 	"net/http"
@@ -35,35 +35,43 @@ var initialArgs []string
 
 func (s *TestSuite) SetUpSuite(c *C) {
 	initialArgs = os.Args
-	arvadostest.StartAPI()
 }
 
 func (s *TestSuite) TearDownSuite(c *C) {
-	arvadostest.StopAPI()
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-slurm"}
 	os.Args = args
 
+	arvadostest.StartAPI()
 	os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
-	arvadostest.ResetEnv()
 	os.Args = initialArgs
+	arvadostest.StopAPI()
 }
 
 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) TestIntegration(c *C) {
+func (s *TestSuite) TestIntegrationNormal(c *C) {
+	s.integrationTest(c, false)
+}
+
+func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
+	s.integrationTest(c, true)
+}
+
+func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+	arvadostest.ResetEnv()
+
 	arv, err := arvadosclient.MakeArvadosClient()
 	c.Assert(err, IsNil)
 
 	var sbatchCmdLine []string
-	var striggerCmdLine []string
 
 	// Override sbatchCmd
 	defer func(orig func(dispatch.Container) *exec.Cmd) {
@@ -74,30 +82,16 @@ func (s *TestSuite) TestIntegration(c *C) {
 		return exec.Command("sh")
 	}
 
-	// Override striggerCmd
-	defer func(orig func(jobid, containerUUID, finishCommand,
-		apiHost, apiToken, apiInsecure string) *exec.Cmd) {
-		striggerCmd = orig
-	}(striggerCmd)
-	striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
-		striggerCmdLine = striggerFunc(jobid, containerUUID, finishCommand,
-			apiHost, apiToken, apiInsecure).Args
-		go func() {
-			time.Sleep(5 * time.Second)
-			arv.Update("containers", containerUUID,
-				arvadosclient.Dict{
-					"container": arvadosclient.Dict{"state": dispatch.Complete}},
-				nil)
-		}()
-		return exec.Command("echo", striggerCmdLine...)
-	}
-
 	// Override squeueCmd
 	defer func(orig func() *exec.Cmd) {
 		squeueCmd = orig
 	}(squeueCmd)
 	squeueCmd = func() *exec.Cmd {
-		return exec.Command("echo")
+		if missingFromSqueue {
+			return exec.Command("echo")
+		} else {
+			return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+		}
 	}
 
 	// There should be no queued containers now
@@ -111,8 +105,6 @@ func (s *TestSuite) TestIntegration(c *C) {
 
 	echo := "echo"
 	crunchRunCommand = &echo
-	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
-	finishCommand = &finishCmd
 
 	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
@@ -122,8 +114,8 @@ func (s *TestSuite) TestIntegration(c *C) {
 			container dispatch.Container,
 			status chan dispatch.Container) {
 			go func() {
-				time.Sleep(1)
 				dispatcher.UpdateState(container.UUID, dispatch.Running)
+				time.Sleep(3 * time.Second)
 				dispatcher.UpdateState(container.UUID, dispatch.Complete)
 			}()
 			run(dispatcher, container, status)
@@ -131,19 +123,29 @@ func (s *TestSuite) TestIntegration(c *C) {
 		},
 		DoneProcessing: doneProcessing}
 
+	squeueUpdater.SqueueDone = make(chan struct{})
+	go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+
 	err = dispatcher.RunDispatcher()
 	c.Assert(err, IsNil)
 
+	squeueUpdater.SqueueDone <- struct{}{}
+	close(squeueUpdater.SqueueDone)
+
 	item := containers.Items[0]
 	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
 		fmt.Sprintf("--job-name=%s", item.UUID),
 		fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
 		fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
 		fmt.Sprintf("--priority=%d", item.Priority)}
-	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
-	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
-		"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
+	if missingFromSqueue {
+		// not in squeue when run() started, so it will have called sbatch
+		c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+	} else {
+		// already in squeue when run() started, will have just monitored it instead
+		c.Check(sbatchCmdLine, DeepEquals, []string(nil))
+	}
 
 	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
@@ -154,7 +156,11 @@ func (s *TestSuite) TestIntegration(c *C) {
 	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
-	c.Check(container.State, Equals, "Complete")
+	if missingFromSqueue {
+		c.Check(container.State, Equals, "Cancelled")
+	} else {
+		c.Check(container.State, Equals, "Complete")
+	}
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -180,12 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	}
 
 	buf := bytes.NewBuffer(nil)
-	log.SetOutput(buf)
+	log.SetOutput(io.MultiWriter(buf, os.Stderr))
 	defer log.SetOutput(os.Stderr)
 
 	crunchRunCommand = &crunchCmd
-	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
-	finishCommand = &finishCmd
 
 	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
@@ -195,7 +199,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 			container dispatch.Container,
 			status chan dispatch.Container) {
 			go func() {
-				time.Sleep(1)
+				time.Sleep(1 * time.Second)
 				dispatcher.UpdateState(container.UUID, dispatch.Running)
 				dispatcher.UpdateState(container.UUID, dispatch.Complete)
 			}()
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
deleted file mode 100755
index 95a37ba..0000000
--- a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/sh
-
-# Script to be called by strigger when a job finishes.  This ensures the job
-# record has the correct state "Complete" even if the node running the job
-# failed.
-
-ARVADOS_API_HOST=$1
-ARVADOS_API_TOKEN=$2
-ARVADOS_API_HOST_INSECURE=$3
-uuid=$4
-jobid=$5
-
-# If it is possible to attach metadata to job records we could look up the
-# above information instead of getting it on the command line.  For example,
-# this is the recipe for getting the job name (container uuid) from the job id.
-#uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
-
-export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE
-
-exec arv container update --uuid $uuid --container '{"state": "Complete"}'

commit 3a3910fdc8a5003c182f68e3423c96327a136175
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 27 17:30:07 2016 -0400

    9187: Check LockedByUUID on container updates and terminate status updates if
    not equal to dispatcher token.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 355ed7c..a27971f 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -52,7 +52,7 @@ type Dispatcher struct {
 
 	mineMutex  sync.Mutex
 	mineMap    map[string]chan Container
-	auth       apiClientAuthorization
+	Auth       apiClientAuthorization
 	containers chan Container
 }
 
@@ -100,17 +100,18 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
 	err := dispatcher.Arv.List("containers", params, &containers)
 	if err != nil {
 		log.Printf("Error getting list of containers: %q", err)
-	} else {
-		if containers.ItemsAvailable > len(containers.Items) {
-			// TODO: support paging
-			log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-				containers.ItemsAvailable,
-				len(containers.Items))
-		}
-		for _, container := range containers.Items {
-			touched[container.UUID] = true
-			dispatcher.containers <- container
-		}
+		return
+	}
+
+	if containers.ItemsAvailable > len(containers.Items) {
+		// TODO: support paging
+		log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+			containers.ItemsAvailable,
+			len(containers.Items))
+	}
+	for _, container := range containers.Items {
+		touched[container.UUID] = true
+		dispatcher.containers <- container
 	}
 }
 
@@ -122,7 +123,7 @@ func (dispatcher *Dispatcher) pollContainers() {
 		"order":   []string{"priority desc"},
 		"limit":   "1000"}
 	paramsP := arvadosclient.Dict{
-		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
 		"limit":   "1000"}
 
 	for {
@@ -152,11 +153,19 @@ func (dispatcher *Dispatcher) pollContainers() {
 }
 
 func (dispatcher *Dispatcher) handleUpdate(container Container) {
+	if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+		// If container is Complete, Cancelled, or Queued, LockedByUUID
+		// will be nil.  If the container was formally Locked, moved
+		// back to Queued and then locked by another dispatcher,
+		// LockedByUUID will be different.  In either case, we want
+		// to stop monitoring it.
+		log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+		dispatcher.notMine(container.UUID)
+		return
+	}
+
 	if dispatcher.updateMine(container) {
-		if container.State == Complete || container.State == Cancelled {
-			log.Printf("Container %v now in state %v", container.UUID, container.State)
-			dispatcher.notMine(container.UUID)
-		}
+		// Already monitored, sent status update
 		return
 	}
 
@@ -169,6 +178,8 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
 	}
 
 	if container.State == Locked || container.State == Running {
+		// Not currently monitored but in Locked or Running state and
+		// owned by this dispatcher, so start monitoring.
 		go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
 	}
 }
@@ -197,7 +208,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 // The callback is responsible for draining the channel, if it fails to do so
 // it will deadlock the dispatcher.
 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
-	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
 	if err != nil {
 		log.Printf("Error getting my token UUID: %v", err)
 		return
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index cc472a4..73a3895 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -159,7 +159,8 @@ func run(dispatcher *dispatch.Dispatcher,
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
-	if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+	if container.LockedByUUID == dispatcher.Auth.UUID &&
+		(container.State == dispatch.Locked || container.State == dispatch.Running) {
 		log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
 			*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
 		dispatcher.UpdateState(uuid, dispatch.Cancelled)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list