[ARVADOS] created: c405873d87e0764acf3855c559c85fa6d7a63cfb

Git user git at public.curoverse.com
Mon Jun 6 14:12:04 EDT 2016


        at  c405873d87e0764acf3855c559c85fa6d7a63cfb (commit)


commit c405873d87e0764acf3855c559c85fa6d7a63cfb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 10:44:11 2016 -0400

    9187: Remove "squeueError" because checkSqueue for a successful squeue run.  Refactor tests a bit and add a test for canceling containers.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 1dada2f..f718fbc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -78,8 +78,14 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
 		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
+// scancelCmd
+func scancelFunc(container dispatch.Container) *exec.Cmd {
+	return exec.Command("scancel", "--name="+container.UUID)
+}
+
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
+var scancelCmd = scancelFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher,
@@ -178,10 +184,7 @@ func submit(dispatcher *dispatch.Dispatcher,
 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
 	submitted := false
 	for !*monitorDone {
-		if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
-			// Most recent run of squeue had an error, so do nothing.
-			continue
-		} else if inQ {
+		if squeueUpdater.CheckSqueue(container.UUID) {
 			// Found in the queue, so continue monitoring
 			submitted = true
 		} else if container.State == dispatch.Locked && !submitted {
@@ -249,15 +252,13 @@ func run(dispatcher *dispatch.Dispatcher,
 
 				// Mutex between squeue sync and running sbatch or scancel.
 				squeueUpdater.SlurmLock.Lock()
-				err := exec.Command("scancel", "--name="+container.UUID).Run()
+				err := scancelCmd(container).Run()
 				squeueUpdater.SlurmLock.Unlock()
 
 				if err != nil {
 					log.Printf("Error stopping container %s with scancel: %v",
 						container.UUID, err)
-					if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
-						continue
-					} else if inQ {
+					if squeueUpdater.CheckSqueue(container.UUID) {
 						log.Printf("Container %s is still in squeue after scancel.",
 							container.UUID)
 						continue
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index be347e4..cddbe8c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -8,7 +8,6 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"io"
 	"log"
-	"math"
 	"net/http"
 	"net/http/httptest"
 	"os"
@@ -58,14 +57,60 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-	s.integrationTest(c, false)
+	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+		[]string(nil),
+		func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+			dispatcher.UpdateState(container.UUID, dispatch.Running)
+			time.Sleep(3 * time.Second)
+			dispatcher.UpdateState(container.UUID, dispatch.Complete)
+		})
+	c.Check(container.State, Equals, "Complete")
+}
+
+func (s *TestSuite) TestIntegrationCancel(c *C) {
+
+	// Override sbatchCmd
+	var scancelCmdLine []string
+	defer func(orig func(dispatch.Container) *exec.Cmd) {
+		scancelCmd = orig
+	}(scancelCmd)
+	scancelCmd = func(container dispatch.Container) *exec.Cmd {
+		scancelCmdLine = scancelFunc(container).Args
+		return exec.Command("echo")
+	}
+
+	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+		[]string(nil),
+		func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+			dispatcher.UpdateState(container.UUID, dispatch.Running)
+			time.Sleep(1 * time.Second)
+			dispatcher.Arv.Update("containers", container.UUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"priority": 0}},
+				nil)
+		})
+	c.Check(container.State, Equals, "Cancelled")
+	c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-	s.integrationTest(c, true)
+	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
+		fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
+		fmt.Sprintf("--mem-per-cpu=%d", 2862),
+		fmt.Sprintf("--cpus-per-task=%d", 4),
+		fmt.Sprintf("--priority=%d", 1)},
+		func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+			dispatcher.UpdateState(container.UUID, dispatch.Running)
+			time.Sleep(3 * time.Second)
+			dispatcher.UpdateState(container.UUID, dispatch.Complete)
+		})
+	c.Check(container.State, Equals, "Cancelled")
 }
 
-func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+func (s *TestSuite) integrationTest(c *C,
+	newSqueueCmd func() *exec.Cmd,
+	sbatchCmdComps []string,
+	runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
 	arvadostest.ResetEnv()
 
 	arv, err := arvadosclient.MakeArvadosClient()
@@ -86,13 +131,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 	defer func(orig func() *exec.Cmd) {
 		squeueCmd = orig
 	}(squeueCmd)
-	squeueCmd = func() *exec.Cmd {
-		if missingFromSqueue {
-			return exec.Command("echo")
-		} else {
-			return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
-		}
-	}
+	squeueCmd = newSqueueCmd
 
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
@@ -113,11 +152,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 		RunContainer: func(dispatcher *dispatch.Dispatcher,
 			container dispatch.Container,
 			status chan dispatch.Container) {
-			go func() {
-				dispatcher.UpdateState(container.UUID, dispatch.Running)
-				time.Sleep(3 * time.Second)
-				dispatcher.UpdateState(container.UUID, dispatch.Complete)
-			}()
+			go runContainer(dispatcher, container)
 			run(dispatcher, container, status)
 			doneProcessing <- struct{}{}
 		},
@@ -130,20 +165,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 
 	squeueUpdater.Done()
 
-	item := containers.Items[0]
-	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
-		fmt.Sprintf("--job-name=%s", item.UUID),
-		fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
-		fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
-		fmt.Sprintf("--priority=%d", item.Priority)}
-
-	if missingFromSqueue {
-		// not in squeue when run() started, so it will have called sbatch
-		c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
-	} else {
-		// already in squeue when run() started, will have just monitored it instead
-		c.Check(sbatchCmdLine, DeepEquals, []string(nil))
-	}
+	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
 	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
@@ -154,11 +176,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
-	if missingFromSqueue {
-		c.Check(container.State, Equals, "Cancelled")
-	} else {
-		c.Check(container.State, Equals, "Complete")
-	}
+	return container
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 3ee8b6f..34e6632 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -13,7 +13,6 @@ import (
 type Squeue struct {
 	squeueContents []string
 	squeueDone     chan struct{}
-	squeueError    error
 	squeueCond     *sync.Cond
 	SlurmLock      sync.Mutex
 }
@@ -25,10 +24,10 @@ func squeueFunc() *exec.Cmd {
 
 var squeueCmd = squeueFunc
 
-// RunSqueue runs squeue once and captures the output.  If there is an error,
-// set "squeueError".  If it succeeds, set "squeueContents" and then wake up
-// any goroutines waiting squeueCond in CheckSqueue().
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output.  If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue().  If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
 	var newSqueueContents []string
 
 	// Mutex between squeue sync and running sbatch or scancel.  This
@@ -39,15 +38,12 @@ func (squeue *Squeue) RunSqueue() error {
 	defer squeue.SlurmLock.Unlock()
 
 	// Also ensure unlock on all return paths
-	defer squeue.squeueCond.L.Unlock()
 
 	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
 		log.Printf("Error creating stdout pipe for squeue: %v", err)
-		squeue.squeueCond.L.Lock()
-		squeue.squeueError = err
-		return err
+		return
 	}
 	cmd.Start()
 	scanner := bufio.NewScanner(sq)
@@ -57,55 +53,43 @@ func (squeue *Squeue) RunSqueue() error {
 	if err := scanner.Err(); err != nil {
 		cmd.Wait()
 		log.Printf("Error reading from squeue pipe: %v", err)
-		squeue.squeueCond.L.Lock()
-		squeue.squeueError = err
-		return err
+		return
 	}
 
 	err = cmd.Wait()
 	if err != nil {
 		log.Printf("Error running squeue: %v", err)
-		squeue.squeueCond.L.Lock()
-		squeue.squeueError = err
-		return err
+		return
 	}
 
 	squeue.squeueCond.L.Lock()
-	squeue.squeueError = nil
 	squeue.squeueContents = newSqueueContents
 	squeue.squeueCond.Broadcast()
-
-	return nil
+	squeue.squeueCond.L.Unlock()
 }
 
 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
 // does not run squeue directly, but instead blocks until woken up by next
 // successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
 	squeue.squeueCond.L.Lock()
 	// block until next squeue broadcast signaling an update.
 	squeue.squeueCond.Wait()
-	if squeue.squeueError != nil {
-		e := squeue.squeueError
-		squeue.squeueCond.L.Unlock()
-		return false, e
-	}
 	contents := squeue.squeueContents
 	squeue.squeueCond.L.Unlock()
 
 	for _, k := range contents {
 		if k == uuid {
-			return true, nil
+			return true
 		}
 	}
-	return false, nil
+	return false
 }
 
 // StartMonitor starts the squeue monitoring goroutine.
 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
 	squeue.squeueCond = sync.NewCond(&sync.Mutex{})
 	squeue.squeueDone = make(chan struct{})
-	squeue.RunSqueue()
 	go squeue.SyncSqueue(pollInterval)
 }
 

commit d45be86b354adec485504bfc09f41e0e22241f34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 3 17:57:48 2016 -0400

    9187: Fix refactoring messup

diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index e157469..3ee8b6f 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -35,18 +35,18 @@ func (squeue *Squeue) RunSqueue() error {
 	// establishes a sequence so that squeue doesn't run concurrently with
 	// sbatch or scancel; the next update of squeue will occur only after
 	// sbatch or scancel has completed.
-	squeueUpdater.SlurmLock.Lock()
-	defer squeueUpdater.SlurmLock.Unlock()
+	squeue.SlurmLock.Lock()
+	defer squeue.SlurmLock.Unlock()
 
 	// Also ensure unlock on all return paths
-	defer squeueUpdater.squeueCond.L.Unlock()
+	defer squeue.squeueCond.L.Unlock()
 
 	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
 		log.Printf("Error creating stdout pipe for squeue: %v", err)
-		squeueUpdater.squeueCond.L.Lock()
-		squeueUpdater.squeueError = err
+		squeue.squeueCond.L.Lock()
+		squeue.squeueError = err
 		return err
 	}
 	cmd.Start()
@@ -57,23 +57,23 @@ func (squeue *Squeue) RunSqueue() error {
 	if err := scanner.Err(); err != nil {
 		cmd.Wait()
 		log.Printf("Error reading from squeue pipe: %v", err)
-		squeueUpdater.squeueCond.L.Lock()
-		squeueUpdater.squeueError = err
+		squeue.squeueCond.L.Lock()
+		squeue.squeueError = err
 		return err
 	}
 
 	err = cmd.Wait()
 	if err != nil {
 		log.Printf("Error running squeue: %v", err)
-		squeueUpdater.squeueCond.L.Lock()
-		squeueUpdater.squeueError = err
+		squeue.squeueCond.L.Lock()
+		squeue.squeueError = err
 		return err
 	}
 
-	squeueUpdater.squeueCond.L.Lock()
-	squeueUpdater.squeueError = nil
-	squeueUpdater.squeueContents = newSqueueContents
-	squeueUpdater.squeueCond.Broadcast()
+	squeue.squeueCond.L.Lock()
+	squeue.squeueError = nil
+	squeue.squeueContents = newSqueueContents
+	squeue.squeueCond.Broadcast()
 
 	return nil
 }
@@ -82,16 +82,16 @@ func (squeue *Squeue) RunSqueue() error {
 // does not run squeue directly, but instead blocks until woken up by next
 // successful update of squeue.
 func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
-	squeueUpdater.squeueCond.L.Lock()
+	squeue.squeueCond.L.Lock()
 	// block until next squeue broadcast signaling an update.
-	squeueUpdater.squeueCond.Wait()
-	if squeueUpdater.squeueError != nil {
-		e := squeueUpdater.squeueError
-		squeueUpdater.squeueCond.L.Unlock()
+	squeue.squeueCond.Wait()
+	if squeue.squeueError != nil {
+		e := squeue.squeueError
+		squeue.squeueCond.L.Unlock()
 		return false, e
 	}
-	contents := squeueUpdater.squeueContents
-	squeueUpdater.squeueCond.L.Unlock()
+	contents := squeue.squeueContents
+	squeue.squeueCond.L.Unlock()
 
 	for _, k := range contents {
 		if k == uuid {
@@ -103,16 +103,16 @@ func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
 
 // StartMonitor starts the squeue monitoring goroutine.
 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
-	squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
-	squeueUpdater.squeueDone = make(chan struct{})
-	squeueUpdater.RunSqueue()
-	go squeueUpdater.SyncSqueue(pollInterval)
+	squeue.squeueCond = sync.NewCond(&sync.Mutex{})
+	squeue.squeueDone = make(chan struct{})
+	squeue.RunSqueue()
+	go squeue.SyncSqueue(pollInterval)
 }
 
 // Done stops the squeue monitoring goroutine.
 func (squeue *Squeue) Done() {
-	squeueUpdater.squeueDone <- struct{}{}
-	close(squeueUpdater.squeueDone)
+	squeue.squeueDone <- struct{}{}
+	close(squeue.squeueDone)
 }
 
 // SyncSqueue periodically polls RunSqueue() at the given duration until
@@ -121,10 +121,10 @@ func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
 	ticker := time.NewTicker(pollInterval)
 	for {
 		select {
-		case <-squeueUpdater.squeueDone:
+		case <-squeue.squeueDone:
 			return
 		case <-ticker.C:
-			squeueUpdater.RunSqueue()
+			squeue.RunSqueue()
 		}
 	}
 }

commit bb10b7777ed6db229fbb35e6a829bec4e8efcd23
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 2 22:46:55 2016 -0400

    9187: Fix comment typo

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 785b6ec..fb7b5fb 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -176,7 +176,7 @@ func (dispatcher *Dispatcher) pollContainers() {
 func (dispatcher *Dispatcher) handleUpdate(container Container) {
 	if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
 		// If container is Complete, Cancelled, or Queued, LockedByUUID
-		// will be nil.  If the container was formally Locked, moved
+		// will be nil.  If the container was formerly Locked, moved
 		// back to Queued and then locked by another dispatcher,
 		// LockedByUUID will be different.  In either case, we want
 		// to stop monitoring it.

commit 5cf8c18e735bb15da3f131e7ae57bb4b222bb4ed
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 2 22:18:55 2016 -0400

    9187: Add documentation comments to Squeue functions.

diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index b86a4d9..e157469 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -8,6 +8,8 @@ import (
 	"time"
 )
 
+// Squeue implements asynchronous polling monitor of the SLURM queue using the
+// command 'squeue'.
 type Squeue struct {
 	squeueContents []string
 	squeueDone     chan struct{}
@@ -23,6 +25,9 @@ func squeueFunc() *exec.Cmd {
 
 var squeueCmd = squeueFunc
 
+// RunSqueue runs squeue once and captures the output.  If there is an error,
+// set "squeueError".  If it succeeds, set "squeueContents" and then wake up
+// any goroutines waiting squeueCond in CheckSqueue().
 func (squeue *Squeue) RunSqueue() error {
 	var newSqueueContents []string
 
@@ -73,8 +78,9 @@ func (squeue *Squeue) RunSqueue() error {
 	return nil
 }
 
-// Check if a container UUID is in the slurm queue.  This will block until the
-// next successful update from SLURM.
+// CheckSqueue checks if a given container UUID is in the slurm queue.  This
+// does not run squeue directly, but instead blocks until woken up by next
+// successful update of squeue.
 func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
 	squeueUpdater.squeueCond.L.Lock()
 	// block until next squeue broadcast signaling an update.
@@ -95,6 +101,7 @@ func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
 	return false, nil
 }
 
+// StartMonitor starts the squeue monitoring goroutine.
 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
 	squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
 	squeueUpdater.squeueDone = make(chan struct{})
@@ -102,11 +109,14 @@ func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
 	go squeueUpdater.SyncSqueue(pollInterval)
 }
 
+// Done stops the squeue monitoring goroutine.
 func (squeue *Squeue) Done() {
 	squeueUpdater.squeueDone <- struct{}{}
 	close(squeueUpdater.squeueDone)
 }
 
+// SyncSqueue periodically polls RunSqueue() at the given duration until
+// terminated by calling Done().
 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
 	ticker := time.NewTicker(pollInterval)
 	for {

commit d77c4cc58d393c48ce46b987f6eada7c7cc381c6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 2 17:59:20 2016 -0400

    9187: Improve squeue synchronization
    
    * Put squeue functions into separate file.
    
    * CheckSqueue() now blocks on a condition variable until the next successful
    update of squeue, which then wakes up all goroutines waiting on CheckSqueue().
    
    * Never do anything when squeue returns an error.
    
    * Merge submitting, monitoring, and cleanup behaviors into a single goroutine
    which updates based on CheckSqueue() instead of a ticker.
    
    * Introduce a lock on squeue, sbatch and scancel operations, so that on next
    wakeup the queue is guaranteed to reflect most recent sbatch/scancel
    operations.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 3e14820..1dada2f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,7 +3,6 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-	"bufio"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -14,16 +13,9 @@ import (
 	"os"
 	"os/exec"
 	"strings"
-	"sync"
 	"time"
 )
 
-type Squeue struct {
-	sync.Mutex
-	squeueContents []string
-	SqueueDone     chan struct{}
-}
-
 func main() {
 	err := doMain()
 	if err != nil {
@@ -59,23 +51,20 @@ func doMain() error {
 	}
 	arv.Retries = 25
 
+	squeueUpdater.StartMonitor(time.Duration(*pollInterval) * time.Second)
+	defer squeueUpdater.Done()
+
 	dispatcher := dispatch.Dispatcher{
 		Arv:            arv,
 		RunContainer:   run,
 		PollInterval:   time.Duration(*pollInterval) * time.Second,
 		DoneProcessing: make(chan struct{})}
 
-	squeueUpdater.SqueueDone = make(chan struct{})
-	go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
-
 	err = dispatcher.RunDispatcher()
 	if err != nil {
 		return err
 	}
 
-	squeueUpdater.SqueueDone <- struct{}{}
-	close(squeueUpdater.SqueueDone)
-
 	return nil
 }
 
@@ -89,14 +78,8 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
 		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-// squeueFunc
-func squeueFunc() *exec.Cmd {
-	return exec.Command("squeue", "--format=%j")
-}
-
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
-var squeueCmd = squeueFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher,
@@ -139,6 +122,10 @@ func submit(dispatcher *dispatch.Dispatcher,
 		return
 	}
 
+	// Mutex between squeue sync and running sbatch or scancel.
+	squeueUpdater.SlurmLock.Lock()
+	defer squeueUpdater.SlurmLock.Unlock()
+
 	err := cmd.Start()
 	if err != nil {
 		submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
@@ -183,90 +170,24 @@ func submit(dispatcher *dispatch.Dispatcher,
 	return
 }
 
-func (squeue *Squeue) runSqueue() ([]string, error) {
-	var newSqueueContents []string
-
-	cmd := squeueCmd()
-	sq, err := cmd.StdoutPipe()
-	if err != nil {
-		return nil, err
-	}
-	cmd.Start()
-	scanner := bufio.NewScanner(sq)
-	for scanner.Scan() {
-		newSqueueContents = append(newSqueueContents, scanner.Text())
-	}
-	if err := scanner.Err(); err != nil {
-		cmd.Wait()
-		return nil, err
-	}
-
-	err = cmd.Wait()
-	if err != nil {
-		return nil, err
-	}
-
-	return newSqueueContents, nil
-}
-
-func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
-	if check {
-		n, err := squeue.runSqueue()
-		if err != nil {
-			return false, err
-		}
-		squeue.Lock()
-		squeue.squeueContents = n
-		squeue.Unlock()
-	}
-
-	if uuid != "" {
-		squeue.Lock()
-		defer squeue.Unlock()
-		for _, k := range squeue.squeueContents {
-			if k == uuid {
-				return true, nil
-			}
-		}
-	}
-	return false, nil
-}
-
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
-	// TODO: considering using "squeue -i" instead of polling squeue.
-	ticker := time.NewTicker(pollInterval)
-	for {
-		select {
-		case <-squeueUpdater.SqueueDone:
-			return
-		case <-ticker.C:
-			squeue.CheckSqueue("", true)
-		}
-	}
-}
-
-// Run or monitor a container.
-//
 // If the container is marked as Locked, check if it is already in the slurm
 // queue.  If not, submit it.
 //
 // If the container is marked as Running, check if it is in the slurm queue.
 // If not, mark it as Cancelled.
-//
-// Monitor status updates.  If the priority changes to zero, cancel the
-// container using scancel.
-func run(dispatcher *dispatch.Dispatcher,
-	container dispatch.Container,
-	status chan dispatch.Container) {
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
+	submitted := false
+	for !*monitorDone {
+		if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
+			// Most recent run of squeue had an error, so do nothing.
+			continue
+		} else if inQ {
+			// Found in the queue, so continue monitoring
+			submitted = true
+		} else if container.State == dispatch.Locked && !submitted {
+			// Not in queue but in Locked state and we haven't
+			// submitted it yet, so submit it.
 
-	uuid := container.UUID
-
-	if container.State == dispatch.Locked {
-		if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
-			// maybe squeue is broken, put it back in the queue
-			log.Printf("Error running squeue: %v", err)
-			dispatcher.UpdateState(container.UUID, dispatch.Queued)
-		} else if !inQ {
 			log.Printf("About to submit queued container %v", container.UUID)
 
 			if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
@@ -275,64 +196,66 @@ func run(dispatcher *dispatch.Dispatcher,
 				// maybe sbatch is broken, put it back to queued
 				dispatcher.UpdateState(container.UUID, dispatch.Queued)
 			}
+			submitted = true
+		} else {
+			// Not in queue and we are not going to submit it.
+			// Refresh the container state. If it is
+			// Complete/Cancelled, do nothing, if it is Locked then
+			// release it back to the Queue, if it is Running then
+			// clean up the record.
+
+			var con dispatch.Container
+			err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
+			if err != nil {
+				log.Printf("Error getting final container state: %v", err)
+			}
+
+			var st string
+			switch con.State {
+			case dispatch.Locked:
+				st = dispatch.Queued
+			case dispatch.Running:
+				st = dispatch.Cancelled
+			default:
+				// Container state is Queued, Complete or Cancelled so stop monitoring it.
+				return
+			}
+
+			log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+				container.UUID, con.State, st)
+			dispatcher.UpdateState(container.UUID, st)
 		}
 	}
+}
 
-	log.Printf("Monitoring container %v started", uuid)
-
-	// periodically check squeue
-	doneSqueue := make(chan struct{})
-	go func() {
-		squeueUpdater.CheckSqueue(container.UUID, true)
-		ticker := time.NewTicker(dispatcher.PollInterval)
-		for {
-			select {
-			case <-ticker.C:
-				if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
-					log.Printf("Error running squeue: %v", err)
-					// don't cancel, just leave it the way it is
-				} else if !inQ {
-					var con dispatch.Container
-					err := dispatcher.Arv.Get("containers", uuid, nil, &con)
-					if err != nil {
-						log.Printf("Error getting final container state: %v", err)
-					}
+// Run or monitor a container.
+//
+// Monitor status updates.  If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container,
+	status chan dispatch.Container) {
 
-					var st string
-					switch con.State {
-					case dispatch.Locked:
-						st = dispatch.Queued
-					case dispatch.Running:
-						st = dispatch.Cancelled
-					default:
-						st = ""
-					}
+	log.Printf("Monitoring container %v started", container.UUID)
+	defer log.Printf("Monitoring container %v finished", container.UUID)
 
-					if st != "" {
-						log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-							uuid, con.State, st)
-						dispatcher.UpdateState(uuid, st)
-					}
-				}
-			case <-doneSqueue:
-				close(doneSqueue)
-				ticker.Stop()
-				return
-			}
-		}
-	}()
+	monitorDone := false
+	go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
 
 	for container = range status {
 		if container.State == dispatch.Locked || container.State == dispatch.Running {
 			if container.Priority == 0 {
 				log.Printf("Canceling container %s", container.UUID)
 
+				// Mutex between squeue sync and running sbatch or scancel.
+				squeueUpdater.SlurmLock.Lock()
 				err := exec.Command("scancel", "--name="+container.UUID).Run()
+				squeueUpdater.SlurmLock.Unlock()
+
 				if err != nil {
 					log.Printf("Error stopping container %s with scancel: %v",
 						container.UUID, err)
-					if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
-						log.Printf("Error running squeue: %v", err)
+					if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
 						continue
 					} else if inQ {
 						log.Printf("Container %s is still in squeue after scancel.",
@@ -345,8 +268,5 @@ func run(dispatcher *dispatch.Dispatcher,
 			}
 		}
 	}
-
-	doneSqueue <- struct{}{}
-
-	log.Printf("Monitoring container %v finished", uuid)
+	monitorDone = true
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index d30c5df..be347e4 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -123,14 +123,12 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 		},
 		DoneProcessing: doneProcessing}
 
-	squeueUpdater.SqueueDone = make(chan struct{})
-	go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+	squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
 
 	err = dispatcher.RunDispatcher()
 	c.Assert(err, IsNil)
 
-	squeueUpdater.SqueueDone <- struct{}{}
-	close(squeueUpdater.SqueueDone)
+	squeueUpdater.Done()
 
 	item := containers.Items[0]
 	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
new file mode 100644
index 0000000..b86a4d9
--- /dev/null
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -0,0 +1,120 @@
+package main
+
+import (
+	"bufio"
+	"log"
+	"os/exec"
+	"sync"
+	"time"
+)
+
+type Squeue struct {
+	squeueContents []string
+	squeueDone     chan struct{}
+	squeueError    error
+	squeueCond     *sync.Cond
+	SlurmLock      sync.Mutex
+}
+
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+	return exec.Command("squeue", "--format=%j")
+}
+
+var squeueCmd = squeueFunc
+
+func (squeue *Squeue) RunSqueue() error {
+	var newSqueueContents []string
+
+	// Mutex between squeue sync and running sbatch or scancel.  This
+	// establishes a sequence so that squeue doesn't run concurrently with
+	// sbatch or scancel; the next update of squeue will occur only after
+	// sbatch or scancel has completed.
+	squeueUpdater.SlurmLock.Lock()
+	defer squeueUpdater.SlurmLock.Unlock()
+
+	// Also ensure unlock on all return paths
+	defer squeueUpdater.squeueCond.L.Unlock()
+
+	cmd := squeueCmd()
+	sq, err := cmd.StdoutPipe()
+	if err != nil {
+		log.Printf("Error creating stdout pipe for squeue: %v", err)
+		squeueUpdater.squeueCond.L.Lock()
+		squeueUpdater.squeueError = err
+		return err
+	}
+	cmd.Start()
+	scanner := bufio.NewScanner(sq)
+	for scanner.Scan() {
+		newSqueueContents = append(newSqueueContents, scanner.Text())
+	}
+	if err := scanner.Err(); err != nil {
+		cmd.Wait()
+		log.Printf("Error reading from squeue pipe: %v", err)
+		squeueUpdater.squeueCond.L.Lock()
+		squeueUpdater.squeueError = err
+		return err
+	}
+
+	err = cmd.Wait()
+	if err != nil {
+		log.Printf("Error running squeue: %v", err)
+		squeueUpdater.squeueCond.L.Lock()
+		squeueUpdater.squeueError = err
+		return err
+	}
+
+	squeueUpdater.squeueCond.L.Lock()
+	squeueUpdater.squeueError = nil
+	squeueUpdater.squeueContents = newSqueueContents
+	squeueUpdater.squeueCond.Broadcast()
+
+	return nil
+}
+
+// Check if a container UUID is in the slurm queue.  This will block until the
+// next successful update from SLURM.
+func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+	squeueUpdater.squeueCond.L.Lock()
+	// block until next squeue broadcast signaling an update.
+	squeueUpdater.squeueCond.Wait()
+	if squeueUpdater.squeueError != nil {
+		e := squeueUpdater.squeueError
+		squeueUpdater.squeueCond.L.Unlock()
+		return false, e
+	}
+	contents := squeueUpdater.squeueContents
+	squeueUpdater.squeueCond.L.Unlock()
+
+	for _, k := range contents {
+		if k == uuid {
+			return true, nil
+		}
+	}
+	return false, nil
+}
+
+func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
+	squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
+	squeueUpdater.squeueDone = make(chan struct{})
+	squeueUpdater.RunSqueue()
+	go squeueUpdater.SyncSqueue(pollInterval)
+}
+
+func (squeue *Squeue) Done() {
+	squeueUpdater.squeueDone <- struct{}{}
+	close(squeueUpdater.squeueDone)
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+	ticker := time.NewTicker(pollInterval)
+	for {
+		select {
+		case <-squeueUpdater.squeueDone:
+			return
+		case <-ticker.C:
+			squeueUpdater.RunSqueue()
+		}
+	}
+}

commit 3ae9a789410e93eeb31ca5670c17a6d03d77f608
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 1 16:06:26 2016 -0400

    9187: Slurm dispatcher improvements around squeue
    
    * Clarify that status updates are not guaranteed to be delivered on a
    heartbeat.
    * Refactor slurm dispatcher to monitor the container in squeue in a separate
    goroutine.
    * Refactor polling squeue to a single goroutine and cache the results so that
    monitoring 100 containers doesn't result in 100 calls to squeue.
    * No longer set up strigger to cancel job on finish, instead cancel running
    jobs not in squeue.
    * Test both cases where a job is/is not in squeue.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index a27971f..785b6ec 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,3 +1,6 @@
+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
 package dispatch
 
 import (
@@ -28,7 +31,7 @@ type apiClientAuthorizationList struct {
 	Items []apiClientAuthorization `json:"items"`
 }
 
-// Container data
+// Represents an Arvados container record
 type Container struct {
 	UUID               string           `json:"uuid"`
 	State              string           `json:"state"`
@@ -45,9 +48,27 @@ type ContainerList struct {
 
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
-	Arv            arvadosclient.ArvadosClient
-	RunContainer   func(*Dispatcher, Container, chan Container)
-	PollInterval   time.Duration
+	// The Arvados client
+	Arv arvadosclient.ArvadosClient
+
+	// When a new queued container appears and is either already owned by
+	// this dispatcher or is successfully locked, the dispatcher will call
+	// go RunContainer().  The RunContainer() goroutine gets a channel over
+	// which it will receive updates to the container state.  The
+	// RunContainer() goroutine should only assume status updates come when
+	// the container record changes on the API server; if it needs to
+	// monitor the job submission to the underlying slurm/grid engine/etc
+	// queue it should spin up its own polling goroutines.  When the
+	// channel is closed, that means the container is no longer being
+	// handled by this dispatcher and the goroutine should terminate.  The
+	// goroutine is responsible for draining the 'status' channel, failure
+	// to do so may deadlock the dispatcher.
+	RunContainer func(*Dispatcher, Container, chan Container)
+
+	// Amount of time to wait between polling for updates.
+	PollInterval time.Duration
+
+	// Channel used to signal that RunDispatcher loop should exit.
 	DoneProcessing chan struct{}
 
 	mineMutex  sync.Mutex
@@ -159,7 +180,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
 		// back to Queued and then locked by another dispatcher,
 		// LockedByUUID will be different.  In either case, we want
 		// to stop monitoring it.
-		log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+		log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
 		dispatcher.notMine(container.UUID)
 		return
 	}
@@ -191,7 +212,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 			"container": arvadosclient.Dict{"state": newState}},
 		nil)
 	if err != nil {
-		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+		log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
 	}
 	return err
 }
@@ -199,14 +220,6 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 // RunDispatcher runs the main loop of the dispatcher until receiving a message
 // on the dispatcher.DoneProcessing channel.  It also installs a signal handler
 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer().  If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer().  The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
 	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
 	if err != nil {
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index aca60e9..0248f18 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -115,7 +115,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
 		arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
+	testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to state \"Locked\"")
 }
 
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -123,7 +123,7 @@ func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
 	apiStubResponses["/arvados/v1/containers"] =
 		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
-		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
 
 	testWithServerStub(c, apiStubResponses, "echo",
 		`After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2".  Updating it to "Cancelled"`)
@@ -142,7 +142,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
 	apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
-		arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+		arvadostest.StubResponse{200, string(`{"uuid": "` + arvadostest.Dispatch1AuthUUID + `", "api_token": "xyz"}`)}
 
 	apiStub := arvadostest.ServerStub{apiStubResponses}
 
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 641b4bc..3e14820 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -14,9 +14,16 @@ import (
 	"os"
 	"os/exec"
 	"strings"
+	"sync"
 	"time"
 )
 
+type Squeue struct {
+	sync.Mutex
+	squeueContents []string
+	SqueueDone     chan struct{}
+}
+
 func main() {
 	err := doMain()
 	if err != nil {
@@ -26,7 +33,7 @@ func main() {
 
 var (
 	crunchRunCommand *string
-	finishCommand    *string
+	squeueUpdater    Squeue
 )
 
 func doMain() error {
@@ -42,11 +49,6 @@ func doMain() error {
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
 
-	finishCommand = flags.String(
-		"finish-command",
-		"/usr/bin/crunch-finish-slurm.sh",
-		"Command to run from strigger when job is finished")
-
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
@@ -63,11 +65,17 @@ func doMain() error {
 		PollInterval:   time.Duration(*pollInterval) * time.Second,
 		DoneProcessing: make(chan struct{})}
 
+	squeueUpdater.SqueueDone = make(chan struct{})
+	go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
+
 	err = dispatcher.RunDispatcher()
 	if err != nil {
 		return err
 	}
 
+	squeueUpdater.SqueueDone <- struct{}{}
+	close(squeueUpdater.SqueueDone)
+
 	return nil
 }
 
@@ -81,19 +89,12 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
 		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-// striggerCmd
-func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
-	return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
-		fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
-}
-
 // squeueFunc
 func squeueFunc() *exec.Cmd {
 	return exec.Command("squeue", "--format=%j")
 }
 
 // Wrap these so that they can be overridden by tests
-var striggerCmd = striggerFunc
 var sbatchCmd = sbatchFunc
 var squeueCmd = squeueFunc
 
@@ -182,44 +183,66 @@ func submit(dispatcher *dispatch.Dispatcher,
 	return
 }
 
-// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
-// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
-	insecure := "0"
-	if arv.ApiInsecure {
-		insecure = "1"
-	}
-	cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-	cmd.Stdout = os.Stdout
-	cmd.Stderr = os.Stderr
-	err := cmd.Run()
-	if err != nil {
-		log.Printf("While setting up strigger: %v", err)
-		// BUG: we drop the error here and forget about it. A
-		// human has to notice the container is stuck in
-		// Running state, and fix it manually.
-	}
-}
+func (squeue *Squeue) runSqueue() ([]string, error) {
+	var newSqueueContents []string
 
-func checkSqueue(uuid string) (bool, error) {
 	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
-		return false, err
+		return nil, err
 	}
 	cmd.Start()
-	defer cmd.Wait()
 	scanner := bufio.NewScanner(sq)
-	found := false
 	for scanner.Scan() {
-		if scanner.Text() == uuid {
-			found = true
-		}
+		newSqueueContents = append(newSqueueContents, scanner.Text())
 	}
 	if err := scanner.Err(); err != nil {
-		return false, err
+		cmd.Wait()
+		return nil, err
+	}
+
+	err = cmd.Wait()
+	if err != nil {
+		return nil, err
+	}
+
+	return newSqueueContents, nil
+}
+
+func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
+	if check {
+		n, err := squeue.runSqueue()
+		if err != nil {
+			return false, err
+		}
+		squeue.Lock()
+		squeue.squeueContents = n
+		squeue.Unlock()
+	}
+
+	if uuid != "" {
+		squeue.Lock()
+		defer squeue.Unlock()
+		for _, k := range squeue.squeueContents {
+			if k == uuid {
+				return true, nil
+			}
+		}
+	}
+	return false, nil
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+	// TODO: considering using "squeue -i" instead of polling squeue.
+	ticker := time.NewTicker(pollInterval)
+	for {
+		select {
+		case <-squeueUpdater.SqueueDone:
+			return
+		case <-ticker.C:
+			squeue.CheckSqueue("", true)
+		}
 	}
-	return found, nil
 }
 
 // Run or monitor a container.
@@ -239,50 +262,91 @@ func run(dispatcher *dispatch.Dispatcher,
 	uuid := container.UUID
 
 	if container.State == dispatch.Locked {
-		if inQ, err := checkSqueue(container.UUID); err != nil {
+		if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+			// maybe squeue is broken, put it back in the queue
 			log.Printf("Error running squeue: %v", err)
-			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+			dispatcher.UpdateState(container.UUID, dispatch.Queued)
 		} else if !inQ {
 			log.Printf("About to submit queued container %v", container.UUID)
 
-			jobid, err := submit(dispatcher, container, *crunchRunCommand)
-			if err != nil {
-				log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
-			} else {
-				finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+			if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+				log.Printf("Error submitting container %s to slurm: %v",
+					container.UUID, err)
+				// maybe sbatch is broken, put it back to queued
+				dispatcher.UpdateState(container.UUID, dispatch.Queued)
 			}
 		}
-	} else if container.State == dispatch.Running {
-		if inQ, err := checkSqueue(container.UUID); err != nil {
-			log.Printf("Error running squeue: %v", err)
-			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-		} else if !inQ {
-			log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
-			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-		}
 	}
 
 	log.Printf("Monitoring container %v started", uuid)
 
-	for container = range status {
-		if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
-			log.Printf("Canceling container %s", container.UUID)
-
-			err := exec.Command("scancel", "--name="+container.UUID).Run()
-			if err != nil {
-				log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
-				if inQ, err := checkSqueue(container.UUID); err != nil {
+	// periodically check squeue
+	doneSqueue := make(chan struct{})
+	go func() {
+		squeueUpdater.CheckSqueue(container.UUID, true)
+		ticker := time.NewTicker(dispatcher.PollInterval)
+		for {
+			select {
+			case <-ticker.C:
+				if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
 					log.Printf("Error running squeue: %v", err)
-					continue
-				} else if inQ {
-					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
-					continue
+					// don't cancel, just leave it the way it is
+				} else if !inQ {
+					var con dispatch.Container
+					err := dispatcher.Arv.Get("containers", uuid, nil, &con)
+					if err != nil {
+						log.Printf("Error getting final container state: %v", err)
+					}
+
+					var st string
+					switch con.State {
+					case dispatch.Locked:
+						st = dispatch.Queued
+					case dispatch.Running:
+						st = dispatch.Cancelled
+					default:
+						st = ""
+					}
+
+					if st != "" {
+						log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+							uuid, con.State, st)
+						dispatcher.UpdateState(uuid, st)
+					}
 				}
+			case <-doneSqueue:
+				close(doneSqueue)
+				ticker.Stop()
+				return
 			}
+		}
+	}()
 
-			err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+	for container = range status {
+		if container.State == dispatch.Locked || container.State == dispatch.Running {
+			if container.Priority == 0 {
+				log.Printf("Canceling container %s", container.UUID)
+
+				err := exec.Command("scancel", "--name="+container.UUID).Run()
+				if err != nil {
+					log.Printf("Error stopping container %s with scancel: %v",
+						container.UUID, err)
+					if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+						log.Printf("Error running squeue: %v", err)
+						continue
+					} else if inQ {
+						log.Printf("Container %s is still in squeue after scancel.",
+							container.UUID)
+						continue
+					}
+				}
+
+				err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+			}
 		}
 	}
 
+	doneSqueue <- struct{}{}
+
 	log.Printf("Monitoring container %v finished", uuid)
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 348d5e4..d30c5df 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -1,12 +1,12 @@
 package main
 
 import (
+	"bytes"
+	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
-
-	"bytes"
-	"fmt"
+	"io"
 	"log"
 	"math"
 	"net/http"
@@ -35,35 +35,43 @@ var initialArgs []string
 
 func (s *TestSuite) SetUpSuite(c *C) {
 	initialArgs = os.Args
-	arvadostest.StartAPI()
 }
 
 func (s *TestSuite) TearDownSuite(c *C) {
-	arvadostest.StopAPI()
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-slurm"}
 	os.Args = args
 
+	arvadostest.StartAPI()
 	os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
-	arvadostest.ResetEnv()
 	os.Args = initialArgs
+	arvadostest.StopAPI()
 }
 
 func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) TestIntegration(c *C) {
+func (s *TestSuite) TestIntegrationNormal(c *C) {
+	s.integrationTest(c, false)
+}
+
+func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
+	s.integrationTest(c, true)
+}
+
+func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+	arvadostest.ResetEnv()
+
 	arv, err := arvadosclient.MakeArvadosClient()
 	c.Assert(err, IsNil)
 
 	var sbatchCmdLine []string
-	var striggerCmdLine []string
 
 	// Override sbatchCmd
 	defer func(orig func(dispatch.Container) *exec.Cmd) {
@@ -74,30 +82,16 @@ func (s *TestSuite) TestIntegration(c *C) {
 		return exec.Command("sh")
 	}
 
-	// Override striggerCmd
-	defer func(orig func(jobid, containerUUID, finishCommand,
-		apiHost, apiToken, apiInsecure string) *exec.Cmd) {
-		striggerCmd = orig
-	}(striggerCmd)
-	striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
-		striggerCmdLine = striggerFunc(jobid, containerUUID, finishCommand,
-			apiHost, apiToken, apiInsecure).Args
-		go func() {
-			time.Sleep(5 * time.Second)
-			arv.Update("containers", containerUUID,
-				arvadosclient.Dict{
-					"container": arvadosclient.Dict{"state": dispatch.Complete}},
-				nil)
-		}()
-		return exec.Command("echo", striggerCmdLine...)
-	}
-
 	// Override squeueCmd
 	defer func(orig func() *exec.Cmd) {
 		squeueCmd = orig
 	}(squeueCmd)
 	squeueCmd = func() *exec.Cmd {
-		return exec.Command("echo")
+		if missingFromSqueue {
+			return exec.Command("echo")
+		} else {
+			return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+		}
 	}
 
 	// There should be no queued containers now
@@ -111,8 +105,6 @@ func (s *TestSuite) TestIntegration(c *C) {
 
 	echo := "echo"
 	crunchRunCommand = &echo
-	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
-	finishCommand = &finishCmd
 
 	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
@@ -122,8 +114,8 @@ func (s *TestSuite) TestIntegration(c *C) {
 			container dispatch.Container,
 			status chan dispatch.Container) {
 			go func() {
-				time.Sleep(1)
 				dispatcher.UpdateState(container.UUID, dispatch.Running)
+				time.Sleep(3 * time.Second)
 				dispatcher.UpdateState(container.UUID, dispatch.Complete)
 			}()
 			run(dispatcher, container, status)
@@ -131,19 +123,29 @@ func (s *TestSuite) TestIntegration(c *C) {
 		},
 		DoneProcessing: doneProcessing}
 
+	squeueUpdater.SqueueDone = make(chan struct{})
+	go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+
 	err = dispatcher.RunDispatcher()
 	c.Assert(err, IsNil)
 
+	squeueUpdater.SqueueDone <- struct{}{}
+	close(squeueUpdater.SqueueDone)
+
 	item := containers.Items[0]
 	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
 		fmt.Sprintf("--job-name=%s", item.UUID),
 		fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
 		fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
 		fmt.Sprintf("--priority=%d", item.Priority)}
-	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
-	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
-		"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
+	if missingFromSqueue {
+		// not in squeue when run() started, so it will have called sbatch
+		c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+	} else {
+		// already in squeue when run() started, will have just monitored it instead
+		c.Check(sbatchCmdLine, DeepEquals, []string(nil))
+	}
 
 	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
@@ -154,7 +156,11 @@ func (s *TestSuite) TestIntegration(c *C) {
 	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
-	c.Check(container.State, Equals, "Complete")
+	if missingFromSqueue {
+		c.Check(container.State, Equals, "Cancelled")
+	} else {
+		c.Check(container.State, Equals, "Complete")
+	}
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -180,12 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	}
 
 	buf := bytes.NewBuffer(nil)
-	log.SetOutput(buf)
+	log.SetOutput(io.MultiWriter(buf, os.Stderr))
 	defer log.SetOutput(os.Stderr)
 
 	crunchRunCommand = &crunchCmd
-	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
-	finishCommand = &finishCmd
 
 	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
@@ -195,7 +199,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 			container dispatch.Container,
 			status chan dispatch.Container) {
 			go func() {
-				time.Sleep(1)
+				time.Sleep(1 * time.Second)
 				dispatcher.UpdateState(container.UUID, dispatch.Running)
 				dispatcher.UpdateState(container.UUID, dispatch.Complete)
 			}()
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
deleted file mode 100755
index 95a37ba..0000000
--- a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/sh
-
-# Script to be called by strigger when a job finishes.  This ensures the job
-# record has the correct state "Complete" even if the node running the job
-# failed.
-
-ARVADOS_API_HOST=$1
-ARVADOS_API_TOKEN=$2
-ARVADOS_API_HOST_INSECURE=$3
-uuid=$4
-jobid=$5
-
-# If it is possible to attach metadata to job records we could look up the
-# above information instead of getting it on the command line.  For example,
-# this is the recipe for getting the job name (container uuid) from the job id.
-#uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
-
-export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE
-
-exec arv container update --uuid $uuid --container '{"state": "Complete"}'

commit 3a3910fdc8a5003c182f68e3423c96327a136175
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 27 17:30:07 2016 -0400

    9187: Check LockedByUUID on container updates and terminate status updates if
    not equal to dispatcher token.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 355ed7c..a27971f 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -52,7 +52,7 @@ type Dispatcher struct {
 
 	mineMutex  sync.Mutex
 	mineMap    map[string]chan Container
-	auth       apiClientAuthorization
+	Auth       apiClientAuthorization
 	containers chan Container
 }
 
@@ -100,17 +100,18 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
 	err := dispatcher.Arv.List("containers", params, &containers)
 	if err != nil {
 		log.Printf("Error getting list of containers: %q", err)
-	} else {
-		if containers.ItemsAvailable > len(containers.Items) {
-			// TODO: support paging
-			log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-				containers.ItemsAvailable,
-				len(containers.Items))
-		}
-		for _, container := range containers.Items {
-			touched[container.UUID] = true
-			dispatcher.containers <- container
-		}
+		return
+	}
+
+	if containers.ItemsAvailable > len(containers.Items) {
+		// TODO: support paging
+		log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+			containers.ItemsAvailable,
+			len(containers.Items))
+	}
+	for _, container := range containers.Items {
+		touched[container.UUID] = true
+		dispatcher.containers <- container
 	}
 }
 
@@ -122,7 +123,7 @@ func (dispatcher *Dispatcher) pollContainers() {
 		"order":   []string{"priority desc"},
 		"limit":   "1000"}
 	paramsP := arvadosclient.Dict{
-		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
 		"limit":   "1000"}
 
 	for {
@@ -152,11 +153,19 @@ func (dispatcher *Dispatcher) pollContainers() {
 }
 
 func (dispatcher *Dispatcher) handleUpdate(container Container) {
+	if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+		// If container is Complete, Cancelled, or Queued, LockedByUUID
+		// will be nil.  If the container was formally Locked, moved
+		// back to Queued and then locked by another dispatcher,
+		// LockedByUUID will be different.  In either case, we want
+		// to stop monitoring it.
+		log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+		dispatcher.notMine(container.UUID)
+		return
+	}
+
 	if dispatcher.updateMine(container) {
-		if container.State == Complete || container.State == Cancelled {
-			log.Printf("Container %v now in state %v", container.UUID, container.State)
-			dispatcher.notMine(container.UUID)
-		}
+		// Already monitored, sent status update
 		return
 	}
 
@@ -169,6 +178,8 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
 	}
 
 	if container.State == Locked || container.State == Running {
+		// Not currently monitored but in Locked or Running state and
+		// owned by this dispatcher, so start monitoring.
 		go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
 	}
 }
@@ -197,7 +208,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
 // The callback is responsible for draining the channel, if it fails to do so
 // it will deadlock the dispatcher.
 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
-	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
 	if err != nil {
 		log.Printf("Error getting my token UUID: %v", err)
 		return
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index cc472a4..73a3895 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -159,7 +159,8 @@ func run(dispatcher *dispatch.Dispatcher,
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
-	if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+	if container.LockedByUUID == dispatcher.Auth.UUID &&
+		(container.State == dispatch.Locked || container.State == dispatch.Running) {
 		log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
 			*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
 		dispatcher.UpdateState(uuid, dispatch.Cancelled)

commit 4153cb6cfad920ed0b1a4b818d3bcc8de492d134
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 19 14:12:42 2016 -0400

    9187: Refactor dispatcher support into common library and update to use Locking API.
    
    New dispatcher package in Go SDK provides framework for monitoring list of
    queued/locked/running containers.  Try to lock containers in the queue; locked
    or running containers are passed to RunContainer goroutine supplied by the
    specific dispatcher.  Refactor existing dispatchers (-local and -slurm) to use
    this framework.  Dispatchers have crash recovery behavior, can put containers
    which are unaccounted in cancelled state.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
new file mode 100644
index 0000000..355ed7c
--- /dev/null
+++ b/sdk/go/dispatch/dispatch.go
@@ -0,0 +1,229 @@
+package dispatch
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"log"
+	"os"
+	"os/signal"
+	"sync"
+	"syscall"
+	"time"
+)
+
+// Constants for container states
+const (
+	Queued    = "Queued"
+	Locked    = "Locked"
+	Running   = "Running"
+	Complete  = "Complete"
+	Cancelled = "Cancelled"
+)
+
+type apiClientAuthorization struct {
+	UUID     string `json:"uuid"`
+	APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+	Items []apiClientAuthorization `json:"items"`
+}
+
+// Container data
+type Container struct {
+	UUID               string           `json:"uuid"`
+	State              string           `json:"state"`
+	Priority           int              `json:"priority"`
+	RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+	LockedByUUID       string           `json:"locked_by_uuid"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+	Items          []Container `json:"items"`
+	ItemsAvailable int         `json:"items_available"`
+}
+
+// Dispatcher holds the state of the dispatcher
+type Dispatcher struct {
+	Arv            arvadosclient.ArvadosClient
+	RunContainer   func(*Dispatcher, Container, chan Container)
+	PollInterval   time.Duration
+	DoneProcessing chan struct{}
+
+	mineMutex  sync.Mutex
+	mineMap    map[string]chan Container
+	auth       apiClientAuthorization
+	containers chan Container
+}
+
+// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
+// for which this process is actively starting/monitoring.  Returns channel to
+// be used to send container status updates.
+func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	if ch, ok := dispatcher.mineMap[uuid]; ok {
+		return ch
+	}
+
+	ch := make(chan Container)
+	dispatcher.mineMap[uuid] = ch
+	return ch
+}
+
+// Release a container which is no longer being monitored.
+func (dispatcher *Dispatcher) notMine(uuid string) {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	if ch, ok := dispatcher.mineMap[uuid]; ok {
+		close(ch)
+		delete(dispatcher.mineMap, uuid)
+	}
+}
+
+// Check if there is a channel for updates associated with this container.  If
+// so send the container record on the channel and return true, if not return
+// false.
+func (dispatcher *Dispatcher) updateMine(c Container) bool {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	ch, ok := dispatcher.mineMap[c.UUID]
+	if ok {
+		ch <- c
+		return true
+	}
+	return false
+}
+
+func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
+	var containers ContainerList
+	err := dispatcher.Arv.List("containers", params, &containers)
+	if err != nil {
+		log.Printf("Error getting list of containers: %q", err)
+	} else {
+		if containers.ItemsAvailable > len(containers.Items) {
+			// TODO: support paging
+			log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+				containers.ItemsAvailable,
+				len(containers.Items))
+		}
+		for _, container := range containers.Items {
+			touched[container.UUID] = true
+			dispatcher.containers <- container
+		}
+	}
+}
+
+func (dispatcher *Dispatcher) pollContainers() {
+	ticker := time.NewTicker(dispatcher.PollInterval)
+
+	paramsQ := arvadosclient.Dict{
+		"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
+		"order":   []string{"priority desc"},
+		"limit":   "1000"}
+	paramsP := arvadosclient.Dict{
+		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+		"limit":   "1000"}
+
+	for {
+		select {
+		case <-ticker.C:
+			touched := make(map[string]bool)
+			dispatcher.getContainers(paramsQ, touched)
+			dispatcher.getContainers(paramsP, touched)
+			dispatcher.mineMutex.Lock()
+			var monitored []string
+			for k := range dispatcher.mineMap {
+				if _, ok := touched[k]; !ok {
+					monitored = append(monitored, k)
+				}
+			}
+			dispatcher.mineMutex.Unlock()
+			if monitored != nil {
+				dispatcher.getContainers(arvadosclient.Dict{
+					"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+			}
+		case <-dispatcher.DoneProcessing:
+			close(dispatcher.containers)
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+func (dispatcher *Dispatcher) handleUpdate(container Container) {
+	if dispatcher.updateMine(container) {
+		if container.State == Complete || container.State == Cancelled {
+			log.Printf("Container %v now in state %v", container.UUID, container.State)
+			dispatcher.notMine(container.UUID)
+		}
+		return
+	}
+
+	if container.State == Queued {
+		// Try to take the lock
+		if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+			return
+		}
+		container.State = Locked
+	}
+
+	if container.State == Locked || container.State == Running {
+		go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+	}
+}
+
+// UpdateState makes an API call to change the state of a container.
+func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+	err := dispatcher.Arv.Update("containers", uuid,
+		arvadosclient.Dict{
+			"container": arvadosclient.Dict{"state": newState}},
+		nil)
+	if err != nil {
+		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+	}
+	return err
+}
+
+// RunDispatcher runs the main loop of the dispatcher until receiving a message
+// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
+// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
+//
+// When a new queued container appears and is successfully locked, the
+// dispatcher will call RunContainer() followed by MonitorContainer().  If a
+// container appears that is Locked or Running but not known to the dispatcher,
+// it will only call monitorContainer().  The monitorContainer() callback is
+// passed a channel over which it will receive updates to the container state.
+// The callback is responsible for draining the channel, if it fails to do so
+// it will deadlock the dispatcher.
+func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+	if err != nil {
+		log.Printf("Error getting my token UUID: %v", err)
+		return
+	}
+
+	dispatcher.mineMap = make(map[string]chan Container)
+	dispatcher.containers = make(chan Container)
+
+	// Graceful shutdown on signal
+	sigChan := make(chan os.Signal)
+	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+	go func(sig <-chan os.Signal) {
+		for sig := range sig {
+			log.Printf("Caught signal: %v", sig)
+			dispatcher.DoneProcessing <- struct{}{}
+		}
+	}(sigChan)
+
+	defer close(sigChan)
+	defer signal.Stop(sigChan)
+
+	go dispatcher.pollContainers()
+	for container := range dispatcher.containers {
+		dispatcher.handleUpdate(container)
+	}
+
+	return nil
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 4023870..cc472a4 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -1,14 +1,15 @@
 package main
 
+// Dispatcher service for Crunch that runs containers locally.
+
 import (
 	"flag"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"log"
 	"os"
 	"os/exec"
-	"os/signal"
 	"sync"
-	"syscall"
 	"time"
 )
 
@@ -20,12 +21,10 @@ func main() {
 }
 
 var (
-	arv              arvadosclient.ArvadosClient
 	runningCmds      map[string]*exec.Cmd
 	runningCmdsMutex sync.Mutex
 	waitGroup        sync.WaitGroup
-	doneProcessing   chan bool
-	sigChan          chan os.Signal
+	crunchRunCommand *string
 )
 
 func doMain() error {
@@ -36,12 +35,7 @@ func doMain() error {
 		10,
 		"Interval in seconds to poll for queued containers")
 
-	priorityPollInterval := flags.Int(
-		"container-priority-poll-interval",
-		60,
-		"Interval in seconds to check priority of a dispatched container")
-
-	crunchRunCommand := flags.String(
+	crunchRunCommand = flags.String(
 		"crunch-run-command",
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
@@ -49,35 +43,32 @@ func doMain() error {
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
+	runningCmds = make(map[string]*exec.Cmd)
+
+	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
+	arv.Retries = 25
 
-	// Channel to terminate
-	doneProcessing = make(chan bool)
-
-	// Map of running crunch jobs
-	runningCmds = make(map[string]*exec.Cmd)
-
-	// Graceful shutdown
-	sigChan = make(chan os.Signal, 1)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			doneProcessing <- true
-		}
-	}(sigChan)
+	dispatcher := dispatch.Dispatcher{
+		Arv:            arv,
+		RunContainer:   run,
+		PollInterval:   time.Duration(*pollInterval) * time.Second,
+		DoneProcessing: make(chan struct{})}
 
-	// Run all queued containers
-	runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
+	err = dispatcher.RunDispatcher()
+	if err != nil {
+		return err
+	}
 
+	runningCmdsMutex.Lock()
 	// Finished dispatching; interrupt any crunch jobs that are still running
 	for _, cmd := range runningCmds {
 		cmd.Process.Signal(os.Interrupt)
 	}
+	runningCmdsMutex.Unlock()
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
@@ -85,166 +76,98 @@ func doMain() error {
 	return nil
 }
 
-// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
-	ticker := time.NewTicker(pollInterval)
-
-	for {
-		select {
-		case <-ticker.C:
-			dispatchLocal(priorityPollInterval, crunchRunCommand)
-		case <-doneProcessing:
-			ticker.Stop()
-			return
-		}
-	}
+func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+	return cmd.Start()
 }
 
-// Container data
-type Container struct {
-	UUID         string `json:"uuid"`
-	State        string `json:"state"`
-	Priority     int    `json:"priority"`
-	LockedByUUID string `json:"locked_by_uuid"`
-}
+var startCmd = startFunc
 
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-	Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
-	params := arvadosclient.Dict{
-		"filters": [][]string{[]string{"state", "=", "Queued"}},
-	}
+// Run a container.
+//
+// If the container is Locked, start a new crunch-run process and wait until
+// crunch-run completes.  If the priority is set to zero, set an interrupt
+// signal to the crunch-run process.
+//
+// If the container is in any other state, or is not Complete/Cancelled after
+// crunch-run terminates, mark the container as Cancelled.
+func run(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container,
+	status chan dispatch.Container) {
 
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
-	if err != nil {
-		log.Printf("Error getting list of queued containers: %q", err)
-		return
-	}
+	uuid := container.UUID
 
-	for _, c := range containers.Items {
-		log.Printf("About to run queued container %v", c.UUID)
-		// Run the container
+	if container.State == dispatch.Locked {
 		waitGroup.Add(1)
-		go func(c Container) {
-			run(c.UUID, crunchRunCommand, pollInterval)
-			waitGroup.Done()
-		}(c)
-	}
-}
-
-func updateState(uuid, newState string) error {
-	err := arv.Update("containers", uuid,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": newState}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
-	}
-	return err
-}
-
-// Run queued container:
-// Set container state to Locked
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
-	if err := updateState(uuid, "Locked"); err != nil {
-		return
-	}
 
-	cmd := exec.Command(crunchRunCommand, uuid)
-	cmd.Stdin = nil
-	cmd.Stderr = os.Stderr
-	cmd.Stdout = os.Stderr
+		cmd := exec.Command(*crunchRunCommand, uuid)
+		cmd.Stdin = nil
+		cmd.Stderr = os.Stderr
+		cmd.Stdout = os.Stderr
 
-	// Add this crunch job to the list of runningCmds only if we
-	// succeed in starting crunch-run.
-	runningCmdsMutex.Lock()
-	if err := cmd.Start(); err != nil {
-		log.Printf("Error starting crunch-run for %v: %q", uuid, err)
-		runningCmdsMutex.Unlock()
-		updateState(uuid, "Queued")
-		return
-	}
-	runningCmds[uuid] = cmd
-	runningCmdsMutex.Unlock()
+		log.Printf("Starting container %v", uuid)
 
-	defer func() {
-		setFinalState(uuid)
+		// Add this crunch job to the list of runningCmds only if we
+		// succeed in starting crunch-run.
 
-		// Remove the crunch job from runningCmds
 		runningCmdsMutex.Lock()
-		delete(runningCmds, uuid)
-		runningCmdsMutex.Unlock()
-	}()
-
-	log.Printf("Starting container %v", uuid)
-
-	updateState(uuid, "Running")
+		if err := startCmd(container, cmd); err != nil {
+			runningCmdsMutex.Unlock()
+			log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
+			dispatcher.UpdateState(uuid, dispatch.Cancelled)
+		} else {
+			runningCmds[uuid] = cmd
+			runningCmdsMutex.Unlock()
+
+			// Need to wait for crunch-run to exit
+			done := make(chan struct{})
+
+			go func() {
+				if _, err := cmd.Process.Wait(); err != nil {
+					log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+				}
+				log.Printf("sending done")
+				done <- struct{}{}
+			}()
+
+		Loop:
+			for {
+				select {
+				case <-done:
+					break Loop
+				case c := <-status:
+					// Interrupt the child process if priority changes to 0
+					if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
+						log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+						cmd.Process.Signal(os.Interrupt)
+					}
+				}
+			}
+			close(done)
 
-	cmdExited := make(chan struct{})
+			log.Printf("Finished container run for %v", uuid)
 
-	// Kill the child process if container priority changes to zero
-	go func() {
-		ticker := time.NewTicker(pollInterval)
-		defer ticker.Stop()
-		for {
-			select {
-			case <-cmdExited:
-				return
-			case <-ticker.C:
-			}
-			var container Container
-			err := arv.Get("containers", uuid, nil, &container)
-			if err != nil {
-				log.Printf("Error getting container %v: %q", uuid, err)
-				continue
-			}
-			if container.Priority == 0 {
-				log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
-				cmd.Process.Signal(os.Interrupt)
-			}
+			// Remove the crunch job from runningCmds
+			runningCmdsMutex.Lock()
+			delete(runningCmds, uuid)
+			runningCmdsMutex.Unlock()
 		}
-	}()
-
-	// Wait for crunch-run to exit
-	if _, err := cmd.Process.Wait(); err != nil {
-		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+		waitGroup.Done()
 	}
-	close(cmdExited)
-
-	log.Printf("Finished container run for %v", uuid)
-}
 
-func setFinalState(uuid string) {
-	// The container state should now be 'Complete' if everything
-	// went well. If it started but crunch-run didn't change its
-	// final state to 'Running', fix that now. If it never even
-	// started, cancel it as unrunnable. (TODO: Requeue instead,
-	// and fix tests so they can tell something happened even if
-	// the final state is Queued.)
-	var container Container
-	err := arv.Get("containers", uuid, nil, &container)
+	// If the container is not finalized, then change it to "Cancelled".
+	err := dispatcher.Arv.Get("containers", uuid, nil, &container)
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
-	fixState := map[string]string{
-		"Running": "Complete",
-		"Locked": "Cancelled",
+	if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+		log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
+			*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
+		dispatcher.UpdateState(uuid, dispatch.Cancelled)
 	}
-	if newState, ok := fixState[container.State]; ok {
-		log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
-		updateState(uuid, newState)
+
+	// drain any subsequent status changes
+	for _ = range status {
 	}
+
+	log.Printf("Finalized container %v", uuid)
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index e3ab3a4..aca60e9 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -1,19 +1,20 @@
 package main
 
 import (
+	"bytes"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-
-	"bytes"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
+	. "gopkg.in/check.v1"
+	"io"
 	"log"
 	"net/http"
 	"net/http/httptest"
 	"os"
-	"syscall"
+	"os/exec"
+	"strings"
 	"testing"
 	"time"
-
-	. "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -32,6 +33,7 @@ var initialArgs []string
 func (s *TestSuite) SetUpSuite(c *C) {
 	initialArgs = os.Args
 	arvadostest.StartAPI()
+	runningCmds = make(map[string]*exec.Cmd)
 }
 
 func (s *TestSuite) TearDownSuite(c *C) {
@@ -41,12 +43,6 @@ func (s *TestSuite) TearDownSuite(c *C) {
 func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-local"}
 	os.Args = args
-
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
-	if err != nil {
-		c.Fatalf("Error making arvados client: %s", err)
-	}
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
@@ -58,29 +54,48 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-	os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
+
+	echo := "echo"
+	crunchRunCommand = &echo
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
+	startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+		dispatcher.UpdateState(container.UUID, "Running")
+		dispatcher.UpdateState(container.UUID, "Complete")
+		return cmd.Start()
+	}
 
-	go func() {
-		time.Sleep(5 * time.Second)
-		sigChan <- syscall.SIGINT
-	}()
+	err = dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
-	err := doMain()
-	c.Check(err, IsNil)
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
 
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
-	var containers ContainerList
+	var containers dispatch.ContainerList
 	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
 	c.Assert(len(containers.Items), Equals, 0)
 
 	// Previously "Queued" container should now be in "Complete" state
-	var container Container
+	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
 	c.Check(container.State, Equals, "Complete")
@@ -90,13 +105,13 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
 		arvadostest.StubResponse{500, string(`{}`)}
 
@@ -106,31 +121,35 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
 		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
 
 	testWithServerStub(c, apiStubResponses, "echo",
-		"After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+		`After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2".  Updating it to "Cancelled"`)
 }
 
 func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`)}
+
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
 		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
 
-	testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
+	testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting nosuchcommand for zzzzz-dz642-xxxxxxxxxxxxxx3")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+	apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
+		arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+
 	apiStub := arvadostest.ServerStub{apiStubResponses}
 
 	api := httptest.NewServer(&apiStub)
 	defer api.Close()
 
-	arv = arvadosclient.ArvadosClient{
+	arv := arvadosclient.ArvadosClient{
 		Scheme:    "http",
 		ApiServer: api.URL[7:],
 		ApiToken:  "abc123",
@@ -139,15 +158,38 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	}
 
 	buf := bytes.NewBuffer(nil)
-	log.SetOutput(buf)
+	log.SetOutput(io.MultiWriter(buf, os.Stderr))
 	defer log.SetOutput(os.Stderr)
 
+	*crunchRunCommand = crunchCmd
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
+	startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+		dispatcher.UpdateState(container.UUID, "Running")
+		dispatcher.UpdateState(container.UUID, "Complete")
+		return cmd.Start()
+	}
+
 	go func() {
-		time.Sleep(2 * time.Second)
-		sigChan <- syscall.SIGTERM
+		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+			time.Sleep(100 * time.Millisecond)
+		}
+		dispatcher.DoneProcessing <- struct{}{}
 	}()
 
-	runQueuedContainers(time.Second, time.Second, crunchCmd)
+	err := dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 53e4705..641b4bc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -1,19 +1,19 @@
 package main
 
+// Dispatcher service for Crunch that submits containers to the slurm queue.
+
 import (
 	"bufio"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"io/ioutil"
 	"log"
 	"math"
 	"os"
 	"os/exec"
-	"os/signal"
-	"strconv"
-	"sync"
-	"syscall"
+	"strings"
 	"time"
 )
 
@@ -25,12 +25,8 @@ func main() {
 }
 
 var (
-	arv              arvadosclient.ArvadosClient
-	runningCmds      map[string]*exec.Cmd
-	runningCmdsMutex sync.Mutex
-	waitGroup        sync.WaitGroup
-	doneProcessing   chan bool
-	sigChan          chan os.Signal
+	crunchRunCommand *string
+	finishCommand    *string
 )
 
 func doMain() error {
@@ -41,17 +37,12 @@ func doMain() error {
 		10,
 		"Interval in seconds to poll for queued containers")
 
-	priorityPollInterval := flags.Int(
-		"container-priority-poll-interval",
-		60,
-		"Interval in seconds to check priority of a dispatched container")
-
-	crunchRunCommand := flags.String(
+	crunchRunCommand = flags.String(
 		"crunch-run-command",
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
 
-	finishCommand := flags.String(
+	finishCommand = flags.String(
 		"finish-command",
 		"/usr/bin/crunch-finish-slurm.sh",
 		"Command to run from strigger when job is finished")
@@ -59,142 +50,56 @@ func doMain() error {
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
+	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
+	arv.Retries = 25
 
-	// Channel to terminate
-	doneProcessing = make(chan bool)
-
-	// Graceful shutdown
-	sigChan = make(chan os.Signal, 1)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			doneProcessing <- true
-		}
-	}(sigChan)
-
-	// Run all queued containers
-	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
-
-	// Wait for all running crunch jobs to complete / terminate
-	waitGroup.Wait()
-
-	return nil
-}
-
-type apiClientAuthorization struct {
-	UUID     string `json:"uuid"`
-	APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
-	Items []apiClientAuthorization `json:"items"`
-}
+	dispatcher := dispatch.Dispatcher{
+		Arv:            arv,
+		RunContainer:   run,
+		PollInterval:   time.Duration(*pollInterval) * time.Second,
+		DoneProcessing: make(chan struct{})}
 
-// Poll for queued containers using pollInterval.
-// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
-	var auth apiClientAuthorization
-	err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+	err = dispatcher.RunDispatcher()
 	if err != nil {
-		log.Printf("Error getting my token UUID: %v", err)
-		return
-	}
-
-	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
-	for {
-		select {
-		case <-ticker.C:
-			dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
-		case <-doneProcessing:
-			ticker.Stop()
-			return
-		}
-	}
-}
-
-// Container data
-type Container struct {
-	UUID               string           `json:"uuid"`
-	State              string           `json:"state"`
-	Priority           int              `json:"priority"`
-	RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-	LockedByUUID       string           `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-	Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run
-// for each container.
-func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
-	params := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
-	}
-
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
-	if err != nil {
-		log.Printf("Error getting list of queued containers: %q", err)
-		return
+		return err
 	}
 
-	for _, container := range containers.Items {
-		if container.State == "Locked" {
-			if container.LockedByUUID != auth.UUID {
-				// Locked by a different dispatcher
-				continue
-			} else if checkMine(container.UUID) {
-				// I already have a goroutine running
-				// for this container: it just hasn't
-				// gotten past Locked state yet.
-				continue
-			}
-			log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
-				"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
-				container.UUID, auth.UUID)
-			setMine(container.UUID, true)
-			go func() {
-				waitContainer(container, pollInterval)
-				setMine(container.UUID, false)
-			}()
-		}
-		go run(container, crunchRunCommand, finishCommand, pollInterval)
-	}
+	return nil
 }
 
 // sbatchCmd
-func sbatchFunc(container Container) *exec.Cmd {
+func sbatchFunc(container dispatch.Container) *exec.Cmd {
 	memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
 	return exec.Command("sbatch", "--share", "--parsable",
-		"--job-name="+container.UUID,
-		"--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
-		"--cpus-per-task="+strconv.Itoa(int(container.RuntimeConstraints["vcpus"])))
+		fmt.Sprintf("--job-name=%s", container.UUID),
+		fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
+		fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-var sbatchCmd = sbatchFunc
-
 // striggerCmd
 func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
 	return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
 		fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
 }
 
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+	return exec.Command("squeue", "--format=%j")
+}
+
+// Wrap these so that they can be overridden by tests
 var striggerCmd = striggerFunc
+var sbatchCmd = sbatchFunc
+var squeueCmd = squeueFunc
 
 // Submit job to slurm using sbatch.
-func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+func submit(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
 	submitErr = nil
 
 	defer func() {
@@ -204,7 +109,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 			// OK, no cleanup needed
 			return
 		}
-		err := arv.Update("containers", container.UUID,
+		err := dispatcher.Arv.Update("containers", container.UUID,
 			arvadosclient.Dict{
 				"container": arvadosclient.Dict{"state": "Queued"}},
 			nil)
@@ -244,7 +149,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 		b, _ := ioutil.ReadAll(stdoutReader)
 		stdoutReader.Close()
 		stdoutChan <- b
-		close(stdoutChan)
 	}()
 
 	stderrChan := make(chan []byte)
@@ -252,7 +156,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 		b, _ := ioutil.ReadAll(stderrReader)
 		stderrReader.Close()
 		stderrChan <- b
-		close(stderrChan)
 	}()
 
 	// Send a tiny script on stdin to execute the crunch-run command
@@ -265,21 +168,28 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 	stdoutMsg := <-stdoutChan
 	stderrmsg := <-stderrChan
 
+	close(stdoutChan)
+	close(stderrChan)
+
 	if err != nil {
 		submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
 		return
 	}
 
 	// If everything worked out, got the jobid on stdout
-	jobid = string(stdoutMsg)
+	jobid = strings.TrimSpace(string(stdoutMsg))
 
 	return
 }
 
 // finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
 // the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
-	cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
+	insecure := "0"
+	if arv.ApiInsecure {
+		insecure = "1"
+	}
+	cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
 	err := cmd.Run()
@@ -291,104 +201,8 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
 	}
 }
 
-// Run a queued container: [1] Set container state to locked. [2]
-// Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
-	setMine(container.UUID, true)
-	defer setMine(container.UUID, false)
-
-	// Update container status to Locked. This will fail if
-	// another dispatcher (token) has already locked it. It will
-	// succeed if *this* dispatcher has already locked it.
-	err := arv.Update("containers", container.UUID,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Locked"}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
-		return
-	}
-
-	log.Printf("About to submit queued container %v", container.UUID)
-
-	jobid, err := submit(container, crunchRunCommand)
-	if err != nil {
-		log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
-		return
-	}
-
-	insecure := "0"
-	if arv.ApiInsecure {
-		insecure = "1"
-	}
-	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-
-	// Update container status to Running. This will fail if
-	// another dispatcher (token) has already locked it. It will
-	// succeed if *this* dispatcher has already locked it.
-	err = arv.Update("containers", container.UUID,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Running"}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
-	}
-	log.Printf("Submitted container %v to slurm", container.UUID)
-	waitContainer(container, pollInterval)
-}
-
-// Wait for a container to finish. Cancel the slurm job if the
-// container priority changes to zero before it ends.
-func waitContainer(container Container, pollInterval time.Duration) {
-	log.Printf("Monitoring container %v started", container.UUID)
-	defer log.Printf("Monitoring container %v finished", container.UUID)
-
-	pollTicker := time.NewTicker(pollInterval)
-	defer pollTicker.Stop()
-	for _ = range pollTicker.C {
-		var updated Container
-		err := arv.Get("containers", container.UUID, nil, &updated)
-		if err != nil {
-			log.Printf("Error getting container %s: %q", container.UUID, err)
-			continue
-		}
-		if updated.State == "Complete" || updated.State == "Cancelled" {
-			return
-		}
-		if updated.Priority != 0 {
-			continue
-		}
-
-		// Priority is zero, but state is Running or Locked
-		log.Printf("Canceling container %s", container.UUID)
-
-		err = exec.Command("scancel", "--name="+container.UUID).Run()
-		if err != nil {
-			log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
-			if inQ, err := checkSqueue(container.UUID); err != nil {
-				log.Printf("Error running squeue: %v", err)
-				continue
-			} else if inQ {
-				log.Printf("Container %s is still in squeue; will retry", container.UUID)
-				continue
-			}
-		}
-
-		err = arv.Update("containers", container.UUID,
-			arvadosclient.Dict{
-				"container": arvadosclient.Dict{"state": "Cancelled"}},
-			nil)
-		if err != nil {
-			log.Printf("Error updating state for container %s: %s", container.UUID, err)
-			continue
-		}
-
-		return
-	}
-}
-
 func checkSqueue(uuid string) (bool, error) {
-	cmd := exec.Command("squeue", "--format=%j")
+	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
 		return false, err
@@ -408,25 +222,67 @@ func checkSqueue(uuid string) (bool, error) {
 	return found, nil
 }
 
-var mineMutex sync.RWMutex
-var mineMap = make(map[string]bool)
-
-// Goroutine-safely add/remove uuid to the set of "my" containers,
-// i.e., ones for which this process has a goroutine running.
-func setMine(uuid string, t bool) {
-	mineMutex.Lock()
-	if t {
-		mineMap[uuid] = true
-	} else {
-		delete(mineMap, uuid)
+// Run or monitor a container.
+//
+// If the container is marked as Locked, check if it is already in the slurm
+// queue.  If not, submit it.
+//
+// If the container is marked as Running, check if it is in the slurm queue.
+// If not, mark it as Cancelled.
+//
+// Monitor status updates.  If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container,
+	status chan dispatch.Container) {
+
+	uuid := container.UUID
+
+	if container.State == dispatch.Locked {
+		if inQ, err := checkSqueue(container.UUID); err != nil {
+			log.Printf("Error running squeue: %v", err)
+			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		} else if !inQ {
+			log.Printf("About to submit queued container %v", container.UUID)
+
+			jobid, err := submit(dispatcher, container, *crunchRunCommand)
+			if err != nil {
+				log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
+			} else {
+				finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+			}
+		}
+	} else if container.State == dispatch.Running {
+		if inQ, err := checkSqueue(container.UUID); err != nil {
+			log.Printf("Error running squeue: %v", err)
+			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		} else if !inQ {
+			log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
+			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		}
+	}
+
+	log.Printf("Monitoring container %v started", uuid)
+
+	for container = range status {
+		if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
+			log.Printf("Canceling container %s", container.UUID)
+
+			err := exec.Command("scancel", "--name="+container.UUID).Run()
+			if err != nil {
+				log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+				if inQ, err := checkSqueue(container.UUID); err != nil {
+					log.Printf("Error running squeue: %v", err)
+					continue
+				} else if inQ {
+					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+					continue
+				}
+			}
+
+			err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		}
 	}
-	mineMutex.Unlock()
-}
 
-// Check whether there is already a goroutine running for this
-// container.
-func checkMine(uuid string) bool {
-	mineMutex.RLocker().Lock()
-	defer mineMutex.RLocker().Unlock()
-	return mineMap[uuid]
+	log.Printf("Monitoring container %v finished", uuid)
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3dfb7d5..348d5e4 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -3,6 +3,7 @@ package main
 import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 
 	"bytes"
 	"fmt"
@@ -12,9 +13,7 @@ import (
 	"net/http/httptest"
 	"os"
 	"os/exec"
-	"strconv"
 	"strings"
-	"syscall"
 	"testing"
 	"time"
 
@@ -47,11 +46,6 @@ func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-slurm"}
 	os.Args = args
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
-	if err != nil {
-		c.Fatalf("Error making arvados client: %s", err)
-	}
 	os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
@@ -64,18 +58,18 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-	os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
 
 	var sbatchCmdLine []string
 	var striggerCmdLine []string
 
 	// Override sbatchCmd
-	defer func(orig func(Container) *exec.Cmd) {
+	defer func(orig func(dispatch.Container) *exec.Cmd) {
 		sbatchCmd = orig
 	}(sbatchCmd)
-	sbatchCmd = func(container Container) *exec.Cmd {
+	sbatchCmd = func(container dispatch.Container) *exec.Cmd {
 		sbatchCmdLine = sbatchFunc(container).Args
 		return exec.Command("sh")
 	}
@@ -90,41 +84,65 @@ func (s *TestSuite) Test_doMain(c *C) {
 			apiHost, apiToken, apiInsecure).Args
 		go func() {
 			time.Sleep(5 * time.Second)
-			for _, state := range []string{"Running", "Complete"} {
-				arv.Update("containers", containerUUID,
-					arvadosclient.Dict{
-						"container": arvadosclient.Dict{"state": state}},
-					nil)
-			}
+			arv.Update("containers", containerUUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"state": dispatch.Complete}},
+				nil)
 		}()
-		return exec.Command("echo", "strigger")
+		return exec.Command("echo", striggerCmdLine...)
 	}
 
-	go func() {
-		time.Sleep(8 * time.Second)
-		sigChan <- syscall.SIGINT
-	}()
+	// Override squeueCmd
+	defer func(orig func() *exec.Cmd) {
+		squeueCmd = orig
+	}(squeueCmd)
+	squeueCmd = func() *exec.Cmd {
+		return exec.Command("echo")
+	}
 
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
+	var containers dispatch.ContainerList
+	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
 	c.Check(len(containers.Items), Equals, 1)
 
-	err = doMain()
-	c.Check(err, IsNil)
+	echo := "echo"
+	crunchRunCommand = &echo
+	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+	finishCommand = &finishCmd
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			go func() {
+				time.Sleep(1)
+				dispatcher.UpdateState(container.UUID, dispatch.Running)
+				dispatcher.UpdateState(container.UUID, dispatch.Complete)
+			}()
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
+	err = dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
 	item := containers.Items[0]
 	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
 		fmt.Sprintf("--job-name=%s", item.UUID),
-		fmt.Sprintf("--mem-per-cpu=%s", strconv.Itoa(int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576))))),
-		fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
+		fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
+		fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
+		fmt.Sprintf("--priority=%d", item.Priority)}
 	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
-	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
+	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
 		"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
 
 	// There should be no queued containers now
@@ -133,7 +151,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 	c.Check(len(containers.Items), Equals, 0)
 
 	// Previously "Queued" container should now be in "Complete" state
-	var container Container
+	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
 	c.Check(container.State, Equals, "Complete")
@@ -144,7 +162,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 	apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -153,7 +171,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	api := httptest.NewServer(&apiStub)
 	defer api.Close()
 
-	arv = arvadosclient.ArvadosClient{
+	arv := arvadosclient.ArvadosClient{
 		Scheme:    "http",
 		ApiServer: api.URL[7:],
 		ApiToken:  "abc123",
@@ -165,14 +183,36 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	log.SetOutput(buf)
 	defer log.SetOutput(os.Stderr)
 
+	crunchRunCommand = &crunchCmd
+	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+	finishCommand = &finishCmd
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			go func() {
+				time.Sleep(1)
+				dispatcher.UpdateState(container.UUID, dispatch.Running)
+				dispatcher.UpdateState(container.UUID, dispatch.Complete)
+			}()
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
 	go func() {
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		sigChan <- syscall.SIGTERM
+		dispatcher.DoneProcessing <- struct{}{}
 	}()
 
-	runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+	err := dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
 	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list