[ARVADOS] created: 4153cb6cfad920ed0b1a4b818d3bcc8de492d134

Git user git at public.curoverse.com
Thu May 26 16:11:55 EDT 2016


        at  4153cb6cfad920ed0b1a4b818d3bcc8de492d134 (commit)


commit 4153cb6cfad920ed0b1a4b818d3bcc8de492d134
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 19 14:12:42 2016 -0400

    9187: Refactor dispatcher support into common library and update to use Locking API.
    
    New dispatcher package in Go SDK provides framework for monitoring list of
    queued/locked/running containers.  Try to lock containers in the queue; locked
    or running containers are passed to RunContainer goroutine supplied by the
    specific dispatcher.  Refactor existing dispatchers (-local and -slurm) to use
    this framework.  Dispatchers have crash recovery behavior, can put containers
    which are unaccounted in cancelled state.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
new file mode 100644
index 0000000..355ed7c
--- /dev/null
+++ b/sdk/go/dispatch/dispatch.go
@@ -0,0 +1,229 @@
+package dispatch
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"log"
+	"os"
+	"os/signal"
+	"sync"
+	"syscall"
+	"time"
+)
+
+// Constants for container states
+const (
+	Queued    = "Queued"
+	Locked    = "Locked"
+	Running   = "Running"
+	Complete  = "Complete"
+	Cancelled = "Cancelled"
+)
+
+type apiClientAuthorization struct {
+	UUID     string `json:"uuid"`
+	APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+	Items []apiClientAuthorization `json:"items"`
+}
+
+// Container data
+type Container struct {
+	UUID               string           `json:"uuid"`
+	State              string           `json:"state"`
+	Priority           int              `json:"priority"`
+	RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+	LockedByUUID       string           `json:"locked_by_uuid"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+	Items          []Container `json:"items"`
+	ItemsAvailable int         `json:"items_available"`
+}
+
+// Dispatcher holds the state of the dispatcher
+type Dispatcher struct {
+	Arv            arvadosclient.ArvadosClient
+	RunContainer   func(*Dispatcher, Container, chan Container)
+	PollInterval   time.Duration
+	DoneProcessing chan struct{}
+
+	mineMutex  sync.Mutex
+	mineMap    map[string]chan Container
+	auth       apiClientAuthorization
+	containers chan Container
+}
+
+// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
+// for which this process is actively starting/monitoring.  Returns channel to
+// be used to send container status updates.
+func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	if ch, ok := dispatcher.mineMap[uuid]; ok {
+		return ch
+	}
+
+	ch := make(chan Container)
+	dispatcher.mineMap[uuid] = ch
+	return ch
+}
+
+// Release a container which is no longer being monitored.
+func (dispatcher *Dispatcher) notMine(uuid string) {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	if ch, ok := dispatcher.mineMap[uuid]; ok {
+		close(ch)
+		delete(dispatcher.mineMap, uuid)
+	}
+}
+
+// Check if there is a channel for updates associated with this container.  If
+// so send the container record on the channel and return true, if not return
+// false.
+func (dispatcher *Dispatcher) updateMine(c Container) bool {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	ch, ok := dispatcher.mineMap[c.UUID]
+	if ok {
+		ch <- c
+		return true
+	}
+	return false
+}
+
+func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
+	var containers ContainerList
+	err := dispatcher.Arv.List("containers", params, &containers)
+	if err != nil {
+		log.Printf("Error getting list of containers: %q", err)
+	} else {
+		if containers.ItemsAvailable > len(containers.Items) {
+			// TODO: support paging
+			log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+				containers.ItemsAvailable,
+				len(containers.Items))
+		}
+		for _, container := range containers.Items {
+			touched[container.UUID] = true
+			dispatcher.containers <- container
+		}
+	}
+}
+
+func (dispatcher *Dispatcher) pollContainers() {
+	ticker := time.NewTicker(dispatcher.PollInterval)
+
+	paramsQ := arvadosclient.Dict{
+		"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
+		"order":   []string{"priority desc"},
+		"limit":   "1000"}
+	paramsP := arvadosclient.Dict{
+		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+		"limit":   "1000"}
+
+	for {
+		select {
+		case <-ticker.C:
+			touched := make(map[string]bool)
+			dispatcher.getContainers(paramsQ, touched)
+			dispatcher.getContainers(paramsP, touched)
+			dispatcher.mineMutex.Lock()
+			var monitored []string
+			for k := range dispatcher.mineMap {
+				if _, ok := touched[k]; !ok {
+					monitored = append(monitored, k)
+				}
+			}
+			dispatcher.mineMutex.Unlock()
+			if monitored != nil {
+				dispatcher.getContainers(arvadosclient.Dict{
+					"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+			}
+		case <-dispatcher.DoneProcessing:
+			close(dispatcher.containers)
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+func (dispatcher *Dispatcher) handleUpdate(container Container) {
+	if dispatcher.updateMine(container) {
+		if container.State == Complete || container.State == Cancelled {
+			log.Printf("Container %v now in state %v", container.UUID, container.State)
+			dispatcher.notMine(container.UUID)
+		}
+		return
+	}
+
+	if container.State == Queued {
+		// Try to take the lock
+		if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+			return
+		}
+		container.State = Locked
+	}
+
+	if container.State == Locked || container.State == Running {
+		go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+	}
+}
+
+// UpdateState makes an API call to change the state of a container.
+func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+	err := dispatcher.Arv.Update("containers", uuid,
+		arvadosclient.Dict{
+			"container": arvadosclient.Dict{"state": newState}},
+		nil)
+	if err != nil {
+		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+	}
+	return err
+}
+
+// RunDispatcher runs the main loop of the dispatcher until receiving a message
+// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
+// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
+//
+// When a new queued container appears and is successfully locked, the
+// dispatcher will call RunContainer() followed by MonitorContainer().  If a
+// container appears that is Locked or Running but not known to the dispatcher,
+// it will only call monitorContainer().  The monitorContainer() callback is
+// passed a channel over which it will receive updates to the container state.
+// The callback is responsible for draining the channel, if it fails to do so
+// it will deadlock the dispatcher.
+func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+	if err != nil {
+		log.Printf("Error getting my token UUID: %v", err)
+		return
+	}
+
+	dispatcher.mineMap = make(map[string]chan Container)
+	dispatcher.containers = make(chan Container)
+
+	// Graceful shutdown on signal
+	sigChan := make(chan os.Signal)
+	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+	go func(sig <-chan os.Signal) {
+		for sig := range sig {
+			log.Printf("Caught signal: %v", sig)
+			dispatcher.DoneProcessing <- struct{}{}
+		}
+	}(sigChan)
+
+	defer close(sigChan)
+	defer signal.Stop(sigChan)
+
+	go dispatcher.pollContainers()
+	for container := range dispatcher.containers {
+		dispatcher.handleUpdate(container)
+	}
+
+	return nil
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 4023870..cc472a4 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -1,14 +1,15 @@
 package main
 
+// Dispatcher service for Crunch that runs containers locally.
+
 import (
 	"flag"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"log"
 	"os"
 	"os/exec"
-	"os/signal"
 	"sync"
-	"syscall"
 	"time"
 )
 
@@ -20,12 +21,10 @@ func main() {
 }
 
 var (
-	arv              arvadosclient.ArvadosClient
 	runningCmds      map[string]*exec.Cmd
 	runningCmdsMutex sync.Mutex
 	waitGroup        sync.WaitGroup
-	doneProcessing   chan bool
-	sigChan          chan os.Signal
+	crunchRunCommand *string
 )
 
 func doMain() error {
@@ -36,12 +35,7 @@ func doMain() error {
 		10,
 		"Interval in seconds to poll for queued containers")
 
-	priorityPollInterval := flags.Int(
-		"container-priority-poll-interval",
-		60,
-		"Interval in seconds to check priority of a dispatched container")
-
-	crunchRunCommand := flags.String(
+	crunchRunCommand = flags.String(
 		"crunch-run-command",
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
@@ -49,35 +43,32 @@ func doMain() error {
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
+	runningCmds = make(map[string]*exec.Cmd)
+
+	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
+	arv.Retries = 25
 
-	// Channel to terminate
-	doneProcessing = make(chan bool)
-
-	// Map of running crunch jobs
-	runningCmds = make(map[string]*exec.Cmd)
-
-	// Graceful shutdown
-	sigChan = make(chan os.Signal, 1)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			doneProcessing <- true
-		}
-	}(sigChan)
+	dispatcher := dispatch.Dispatcher{
+		Arv:            arv,
+		RunContainer:   run,
+		PollInterval:   time.Duration(*pollInterval) * time.Second,
+		DoneProcessing: make(chan struct{})}
 
-	// Run all queued containers
-	runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
+	err = dispatcher.RunDispatcher()
+	if err != nil {
+		return err
+	}
 
+	runningCmdsMutex.Lock()
 	// Finished dispatching; interrupt any crunch jobs that are still running
 	for _, cmd := range runningCmds {
 		cmd.Process.Signal(os.Interrupt)
 	}
+	runningCmdsMutex.Unlock()
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
@@ -85,166 +76,98 @@ func doMain() error {
 	return nil
 }
 
-// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
-	ticker := time.NewTicker(pollInterval)
-
-	for {
-		select {
-		case <-ticker.C:
-			dispatchLocal(priorityPollInterval, crunchRunCommand)
-		case <-doneProcessing:
-			ticker.Stop()
-			return
-		}
-	}
+func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+	return cmd.Start()
 }
 
-// Container data
-type Container struct {
-	UUID         string `json:"uuid"`
-	State        string `json:"state"`
-	Priority     int    `json:"priority"`
-	LockedByUUID string `json:"locked_by_uuid"`
-}
+var startCmd = startFunc
 
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-	Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
-	params := arvadosclient.Dict{
-		"filters": [][]string{[]string{"state", "=", "Queued"}},
-	}
+// Run a container.
+//
+// If the container is Locked, start a new crunch-run process and wait until
+// crunch-run completes.  If the priority is set to zero, set an interrupt
+// signal to the crunch-run process.
+//
+// If the container is in any other state, or is not Complete/Cancelled after
+// crunch-run terminates, mark the container as Cancelled.
+func run(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container,
+	status chan dispatch.Container) {
 
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
-	if err != nil {
-		log.Printf("Error getting list of queued containers: %q", err)
-		return
-	}
+	uuid := container.UUID
 
-	for _, c := range containers.Items {
-		log.Printf("About to run queued container %v", c.UUID)
-		// Run the container
+	if container.State == dispatch.Locked {
 		waitGroup.Add(1)
-		go func(c Container) {
-			run(c.UUID, crunchRunCommand, pollInterval)
-			waitGroup.Done()
-		}(c)
-	}
-}
-
-func updateState(uuid, newState string) error {
-	err := arv.Update("containers", uuid,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": newState}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
-	}
-	return err
-}
-
-// Run queued container:
-// Set container state to Locked
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
-	if err := updateState(uuid, "Locked"); err != nil {
-		return
-	}
 
-	cmd := exec.Command(crunchRunCommand, uuid)
-	cmd.Stdin = nil
-	cmd.Stderr = os.Stderr
-	cmd.Stdout = os.Stderr
+		cmd := exec.Command(*crunchRunCommand, uuid)
+		cmd.Stdin = nil
+		cmd.Stderr = os.Stderr
+		cmd.Stdout = os.Stderr
 
-	// Add this crunch job to the list of runningCmds only if we
-	// succeed in starting crunch-run.
-	runningCmdsMutex.Lock()
-	if err := cmd.Start(); err != nil {
-		log.Printf("Error starting crunch-run for %v: %q", uuid, err)
-		runningCmdsMutex.Unlock()
-		updateState(uuid, "Queued")
-		return
-	}
-	runningCmds[uuid] = cmd
-	runningCmdsMutex.Unlock()
+		log.Printf("Starting container %v", uuid)
 
-	defer func() {
-		setFinalState(uuid)
+		// Add this crunch job to the list of runningCmds only if we
+		// succeed in starting crunch-run.
 
-		// Remove the crunch job from runningCmds
 		runningCmdsMutex.Lock()
-		delete(runningCmds, uuid)
-		runningCmdsMutex.Unlock()
-	}()
-
-	log.Printf("Starting container %v", uuid)
-
-	updateState(uuid, "Running")
+		if err := startCmd(container, cmd); err != nil {
+			runningCmdsMutex.Unlock()
+			log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
+			dispatcher.UpdateState(uuid, dispatch.Cancelled)
+		} else {
+			runningCmds[uuid] = cmd
+			runningCmdsMutex.Unlock()
+
+			// Need to wait for crunch-run to exit
+			done := make(chan struct{})
+
+			go func() {
+				if _, err := cmd.Process.Wait(); err != nil {
+					log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+				}
+				log.Printf("sending done")
+				done <- struct{}{}
+			}()
+
+		Loop:
+			for {
+				select {
+				case <-done:
+					break Loop
+				case c := <-status:
+					// Interrupt the child process if priority changes to 0
+					if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
+						log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+						cmd.Process.Signal(os.Interrupt)
+					}
+				}
+			}
+			close(done)
 
-	cmdExited := make(chan struct{})
+			log.Printf("Finished container run for %v", uuid)
 
-	// Kill the child process if container priority changes to zero
-	go func() {
-		ticker := time.NewTicker(pollInterval)
-		defer ticker.Stop()
-		for {
-			select {
-			case <-cmdExited:
-				return
-			case <-ticker.C:
-			}
-			var container Container
-			err := arv.Get("containers", uuid, nil, &container)
-			if err != nil {
-				log.Printf("Error getting container %v: %q", uuid, err)
-				continue
-			}
-			if container.Priority == 0 {
-				log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
-				cmd.Process.Signal(os.Interrupt)
-			}
+			// Remove the crunch job from runningCmds
+			runningCmdsMutex.Lock()
+			delete(runningCmds, uuid)
+			runningCmdsMutex.Unlock()
 		}
-	}()
-
-	// Wait for crunch-run to exit
-	if _, err := cmd.Process.Wait(); err != nil {
-		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+		waitGroup.Done()
 	}
-	close(cmdExited)
-
-	log.Printf("Finished container run for %v", uuid)
-}
 
-func setFinalState(uuid string) {
-	// The container state should now be 'Complete' if everything
-	// went well. If it started but crunch-run didn't change its
-	// final state to 'Running', fix that now. If it never even
-	// started, cancel it as unrunnable. (TODO: Requeue instead,
-	// and fix tests so they can tell something happened even if
-	// the final state is Queued.)
-	var container Container
-	err := arv.Get("containers", uuid, nil, &container)
+	// If the container is not finalized, then change it to "Cancelled".
+	err := dispatcher.Arv.Get("containers", uuid, nil, &container)
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
-	fixState := map[string]string{
-		"Running": "Complete",
-		"Locked": "Cancelled",
+	if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+		log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
+			*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
+		dispatcher.UpdateState(uuid, dispatch.Cancelled)
 	}
-	if newState, ok := fixState[container.State]; ok {
-		log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
-		updateState(uuid, newState)
+
+	// drain any subsequent status changes
+	for _ = range status {
 	}
+
+	log.Printf("Finalized container %v", uuid)
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index e3ab3a4..aca60e9 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -1,19 +1,20 @@
 package main
 
 import (
+	"bytes"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-
-	"bytes"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
+	. "gopkg.in/check.v1"
+	"io"
 	"log"
 	"net/http"
 	"net/http/httptest"
 	"os"
-	"syscall"
+	"os/exec"
+	"strings"
 	"testing"
 	"time"
-
-	. "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -32,6 +33,7 @@ var initialArgs []string
 func (s *TestSuite) SetUpSuite(c *C) {
 	initialArgs = os.Args
 	arvadostest.StartAPI()
+	runningCmds = make(map[string]*exec.Cmd)
 }
 
 func (s *TestSuite) TearDownSuite(c *C) {
@@ -41,12 +43,6 @@ func (s *TestSuite) TearDownSuite(c *C) {
 func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-local"}
 	os.Args = args
-
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
-	if err != nil {
-		c.Fatalf("Error making arvados client: %s", err)
-	}
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
@@ -58,29 +54,48 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-	os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
+
+	echo := "echo"
+	crunchRunCommand = &echo
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
+	startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+		dispatcher.UpdateState(container.UUID, "Running")
+		dispatcher.UpdateState(container.UUID, "Complete")
+		return cmd.Start()
+	}
 
-	go func() {
-		time.Sleep(5 * time.Second)
-		sigChan <- syscall.SIGINT
-	}()
+	err = dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
-	err := doMain()
-	c.Check(err, IsNil)
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
 
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
-	var containers ContainerList
+	var containers dispatch.ContainerList
 	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
 	c.Assert(len(containers.Items), Equals, 0)
 
 	// Previously "Queued" container should now be in "Complete" state
-	var container Container
+	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
 	c.Check(container.State, Equals, "Complete")
@@ -90,13 +105,13 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
 		arvadostest.StubResponse{500, string(`{}`)}
 
@@ -106,31 +121,35 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
 		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
 
 	testWithServerStub(c, apiStubResponses, "echo",
-		"After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+		`After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2".  Updating it to "Cancelled"`)
 }
 
 func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] =
-		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`)}
+
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
 		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
 
-	testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
+	testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting nosuchcommand for zzzzz-dz642-xxxxxxxxxxxxxx3")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+	apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
+		arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+
 	apiStub := arvadostest.ServerStub{apiStubResponses}
 
 	api := httptest.NewServer(&apiStub)
 	defer api.Close()
 
-	arv = arvadosclient.ArvadosClient{
+	arv := arvadosclient.ArvadosClient{
 		Scheme:    "http",
 		ApiServer: api.URL[7:],
 		ApiToken:  "abc123",
@@ -139,15 +158,38 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	}
 
 	buf := bytes.NewBuffer(nil)
-	log.SetOutput(buf)
+	log.SetOutput(io.MultiWriter(buf, os.Stderr))
 	defer log.SetOutput(os.Stderr)
 
+	*crunchRunCommand = crunchCmd
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
+	startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+		dispatcher.UpdateState(container.UUID, "Running")
+		dispatcher.UpdateState(container.UUID, "Complete")
+		return cmd.Start()
+	}
+
 	go func() {
-		time.Sleep(2 * time.Second)
-		sigChan <- syscall.SIGTERM
+		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+			time.Sleep(100 * time.Millisecond)
+		}
+		dispatcher.DoneProcessing <- struct{}{}
 	}()
 
-	runQueuedContainers(time.Second, time.Second, crunchCmd)
+	err := dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 53e4705..641b4bc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -1,19 +1,19 @@
 package main
 
+// Dispatcher service for Crunch that submits containers to the slurm queue.
+
 import (
 	"bufio"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"io/ioutil"
 	"log"
 	"math"
 	"os"
 	"os/exec"
-	"os/signal"
-	"strconv"
-	"sync"
-	"syscall"
+	"strings"
 	"time"
 )
 
@@ -25,12 +25,8 @@ func main() {
 }
 
 var (
-	arv              arvadosclient.ArvadosClient
-	runningCmds      map[string]*exec.Cmd
-	runningCmdsMutex sync.Mutex
-	waitGroup        sync.WaitGroup
-	doneProcessing   chan bool
-	sigChan          chan os.Signal
+	crunchRunCommand *string
+	finishCommand    *string
 )
 
 func doMain() error {
@@ -41,17 +37,12 @@ func doMain() error {
 		10,
 		"Interval in seconds to poll for queued containers")
 
-	priorityPollInterval := flags.Int(
-		"container-priority-poll-interval",
-		60,
-		"Interval in seconds to check priority of a dispatched container")
-
-	crunchRunCommand := flags.String(
+	crunchRunCommand = flags.String(
 		"crunch-run-command",
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
 
-	finishCommand := flags.String(
+	finishCommand = flags.String(
 		"finish-command",
 		"/usr/bin/crunch-finish-slurm.sh",
 		"Command to run from strigger when job is finished")
@@ -59,142 +50,56 @@ func doMain() error {
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
+	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
+	arv.Retries = 25
 
-	// Channel to terminate
-	doneProcessing = make(chan bool)
-
-	// Graceful shutdown
-	sigChan = make(chan os.Signal, 1)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			doneProcessing <- true
-		}
-	}(sigChan)
-
-	// Run all queued containers
-	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
-
-	// Wait for all running crunch jobs to complete / terminate
-	waitGroup.Wait()
-
-	return nil
-}
-
-type apiClientAuthorization struct {
-	UUID     string `json:"uuid"`
-	APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
-	Items []apiClientAuthorization `json:"items"`
-}
+	dispatcher := dispatch.Dispatcher{
+		Arv:            arv,
+		RunContainer:   run,
+		PollInterval:   time.Duration(*pollInterval) * time.Second,
+		DoneProcessing: make(chan struct{})}
 
-// Poll for queued containers using pollInterval.
-// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
-	var auth apiClientAuthorization
-	err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+	err = dispatcher.RunDispatcher()
 	if err != nil {
-		log.Printf("Error getting my token UUID: %v", err)
-		return
-	}
-
-	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
-	for {
-		select {
-		case <-ticker.C:
-			dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
-		case <-doneProcessing:
-			ticker.Stop()
-			return
-		}
-	}
-}
-
-// Container data
-type Container struct {
-	UUID               string           `json:"uuid"`
-	State              string           `json:"state"`
-	Priority           int              `json:"priority"`
-	RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-	LockedByUUID       string           `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-	Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run
-// for each container.
-func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
-	params := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
-	}
-
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
-	if err != nil {
-		log.Printf("Error getting list of queued containers: %q", err)
-		return
+		return err
 	}
 
-	for _, container := range containers.Items {
-		if container.State == "Locked" {
-			if container.LockedByUUID != auth.UUID {
-				// Locked by a different dispatcher
-				continue
-			} else if checkMine(container.UUID) {
-				// I already have a goroutine running
-				// for this container: it just hasn't
-				// gotten past Locked state yet.
-				continue
-			}
-			log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
-				"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
-				container.UUID, auth.UUID)
-			setMine(container.UUID, true)
-			go func() {
-				waitContainer(container, pollInterval)
-				setMine(container.UUID, false)
-			}()
-		}
-		go run(container, crunchRunCommand, finishCommand, pollInterval)
-	}
+	return nil
 }
 
 // sbatchCmd
-func sbatchFunc(container Container) *exec.Cmd {
+func sbatchFunc(container dispatch.Container) *exec.Cmd {
 	memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
 	return exec.Command("sbatch", "--share", "--parsable",
-		"--job-name="+container.UUID,
-		"--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
-		"--cpus-per-task="+strconv.Itoa(int(container.RuntimeConstraints["vcpus"])))
+		fmt.Sprintf("--job-name=%s", container.UUID),
+		fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
+		fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-var sbatchCmd = sbatchFunc
-
 // striggerCmd
 func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
 	return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
 		fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
 }
 
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+	return exec.Command("squeue", "--format=%j")
+}
+
+// Wrap these so that they can be overridden by tests
 var striggerCmd = striggerFunc
+var sbatchCmd = sbatchFunc
+var squeueCmd = squeueFunc
 
 // Submit job to slurm using sbatch.
-func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+func submit(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
 	submitErr = nil
 
 	defer func() {
@@ -204,7 +109,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 			// OK, no cleanup needed
 			return
 		}
-		err := arv.Update("containers", container.UUID,
+		err := dispatcher.Arv.Update("containers", container.UUID,
 			arvadosclient.Dict{
 				"container": arvadosclient.Dict{"state": "Queued"}},
 			nil)
@@ -244,7 +149,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 		b, _ := ioutil.ReadAll(stdoutReader)
 		stdoutReader.Close()
 		stdoutChan <- b
-		close(stdoutChan)
 	}()
 
 	stderrChan := make(chan []byte)
@@ -252,7 +156,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 		b, _ := ioutil.ReadAll(stderrReader)
 		stderrReader.Close()
 		stderrChan <- b
-		close(stderrChan)
 	}()
 
 	// Send a tiny script on stdin to execute the crunch-run command
@@ -265,21 +168,28 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 	stdoutMsg := <-stdoutChan
 	stderrmsg := <-stderrChan
 
+	close(stdoutChan)
+	close(stderrChan)
+
 	if err != nil {
 		submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
 		return
 	}
 
 	// If everything worked out, got the jobid on stdout
-	jobid = string(stdoutMsg)
+	jobid = strings.TrimSpace(string(stdoutMsg))
 
 	return
 }
 
 // finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
 // the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
-	cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
+	insecure := "0"
+	if arv.ApiInsecure {
+		insecure = "1"
+	}
+	cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
 	err := cmd.Run()
@@ -291,104 +201,8 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
 	}
 }
 
-// Run a queued container: [1] Set container state to locked. [2]
-// Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
-	setMine(container.UUID, true)
-	defer setMine(container.UUID, false)
-
-	// Update container status to Locked. This will fail if
-	// another dispatcher (token) has already locked it. It will
-	// succeed if *this* dispatcher has already locked it.
-	err := arv.Update("containers", container.UUID,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Locked"}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
-		return
-	}
-
-	log.Printf("About to submit queued container %v", container.UUID)
-
-	jobid, err := submit(container, crunchRunCommand)
-	if err != nil {
-		log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
-		return
-	}
-
-	insecure := "0"
-	if arv.ApiInsecure {
-		insecure = "1"
-	}
-	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-
-	// Update container status to Running. This will fail if
-	// another dispatcher (token) has already locked it. It will
-	// succeed if *this* dispatcher has already locked it.
-	err = arv.Update("containers", container.UUID,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Running"}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
-	}
-	log.Printf("Submitted container %v to slurm", container.UUID)
-	waitContainer(container, pollInterval)
-}
-
-// Wait for a container to finish. Cancel the slurm job if the
-// container priority changes to zero before it ends.
-func waitContainer(container Container, pollInterval time.Duration) {
-	log.Printf("Monitoring container %v started", container.UUID)
-	defer log.Printf("Monitoring container %v finished", container.UUID)
-
-	pollTicker := time.NewTicker(pollInterval)
-	defer pollTicker.Stop()
-	for _ = range pollTicker.C {
-		var updated Container
-		err := arv.Get("containers", container.UUID, nil, &updated)
-		if err != nil {
-			log.Printf("Error getting container %s: %q", container.UUID, err)
-			continue
-		}
-		if updated.State == "Complete" || updated.State == "Cancelled" {
-			return
-		}
-		if updated.Priority != 0 {
-			continue
-		}
-
-		// Priority is zero, but state is Running or Locked
-		log.Printf("Canceling container %s", container.UUID)
-
-		err = exec.Command("scancel", "--name="+container.UUID).Run()
-		if err != nil {
-			log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
-			if inQ, err := checkSqueue(container.UUID); err != nil {
-				log.Printf("Error running squeue: %v", err)
-				continue
-			} else if inQ {
-				log.Printf("Container %s is still in squeue; will retry", container.UUID)
-				continue
-			}
-		}
-
-		err = arv.Update("containers", container.UUID,
-			arvadosclient.Dict{
-				"container": arvadosclient.Dict{"state": "Cancelled"}},
-			nil)
-		if err != nil {
-			log.Printf("Error updating state for container %s: %s", container.UUID, err)
-			continue
-		}
-
-		return
-	}
-}
-
 func checkSqueue(uuid string) (bool, error) {
-	cmd := exec.Command("squeue", "--format=%j")
+	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
 		return false, err
@@ -408,25 +222,67 @@ func checkSqueue(uuid string) (bool, error) {
 	return found, nil
 }
 
-var mineMutex sync.RWMutex
-var mineMap = make(map[string]bool)
-
-// Goroutine-safely add/remove uuid to the set of "my" containers,
-// i.e., ones for which this process has a goroutine running.
-func setMine(uuid string, t bool) {
-	mineMutex.Lock()
-	if t {
-		mineMap[uuid] = true
-	} else {
-		delete(mineMap, uuid)
+// Run or monitor a container.
+//
+// If the container is marked as Locked, check if it is already in the slurm
+// queue.  If not, submit it.
+//
+// If the container is marked as Running, check if it is in the slurm queue.
+// If not, mark it as Cancelled.
+//
+// Monitor status updates.  If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container,
+	status chan dispatch.Container) {
+
+	uuid := container.UUID
+
+	if container.State == dispatch.Locked {
+		if inQ, err := checkSqueue(container.UUID); err != nil {
+			log.Printf("Error running squeue: %v", err)
+			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		} else if !inQ {
+			log.Printf("About to submit queued container %v", container.UUID)
+
+			jobid, err := submit(dispatcher, container, *crunchRunCommand)
+			if err != nil {
+				log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
+			} else {
+				finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+			}
+		}
+	} else if container.State == dispatch.Running {
+		if inQ, err := checkSqueue(container.UUID); err != nil {
+			log.Printf("Error running squeue: %v", err)
+			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		} else if !inQ {
+			log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
+			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		}
+	}
+
+	log.Printf("Monitoring container %v started", uuid)
+
+	for container = range status {
+		if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
+			log.Printf("Canceling container %s", container.UUID)
+
+			err := exec.Command("scancel", "--name="+container.UUID).Run()
+			if err != nil {
+				log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+				if inQ, err := checkSqueue(container.UUID); err != nil {
+					log.Printf("Error running squeue: %v", err)
+					continue
+				} else if inQ {
+					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+					continue
+				}
+			}
+
+			err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		}
 	}
-	mineMutex.Unlock()
-}
 
-// Check whether there is already a goroutine running for this
-// container.
-func checkMine(uuid string) bool {
-	mineMutex.RLocker().Lock()
-	defer mineMutex.RLocker().Unlock()
-	return mineMap[uuid]
+	log.Printf("Monitoring container %v finished", uuid)
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3dfb7d5..348d5e4 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -3,6 +3,7 @@ package main
 import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 
 	"bytes"
 	"fmt"
@@ -12,9 +13,7 @@ import (
 	"net/http/httptest"
 	"os"
 	"os/exec"
-	"strconv"
 	"strings"
-	"syscall"
 	"testing"
 	"time"
 
@@ -47,11 +46,6 @@ func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-slurm"}
 	os.Args = args
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
-	if err != nil {
-		c.Fatalf("Error making arvados client: %s", err)
-	}
 	os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
@@ -64,18 +58,18 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-	os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
 
 	var sbatchCmdLine []string
 	var striggerCmdLine []string
 
 	// Override sbatchCmd
-	defer func(orig func(Container) *exec.Cmd) {
+	defer func(orig func(dispatch.Container) *exec.Cmd) {
 		sbatchCmd = orig
 	}(sbatchCmd)
-	sbatchCmd = func(container Container) *exec.Cmd {
+	sbatchCmd = func(container dispatch.Container) *exec.Cmd {
 		sbatchCmdLine = sbatchFunc(container).Args
 		return exec.Command("sh")
 	}
@@ -90,41 +84,65 @@ func (s *TestSuite) Test_doMain(c *C) {
 			apiHost, apiToken, apiInsecure).Args
 		go func() {
 			time.Sleep(5 * time.Second)
-			for _, state := range []string{"Running", "Complete"} {
-				arv.Update("containers", containerUUID,
-					arvadosclient.Dict{
-						"container": arvadosclient.Dict{"state": state}},
-					nil)
-			}
+			arv.Update("containers", containerUUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"state": dispatch.Complete}},
+				nil)
 		}()
-		return exec.Command("echo", "strigger")
+		return exec.Command("echo", striggerCmdLine...)
 	}
 
-	go func() {
-		time.Sleep(8 * time.Second)
-		sigChan <- syscall.SIGINT
-	}()
+	// Override squeueCmd
+	defer func(orig func() *exec.Cmd) {
+		squeueCmd = orig
+	}(squeueCmd)
+	squeueCmd = func() *exec.Cmd {
+		return exec.Command("echo")
+	}
 
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
+	var containers dispatch.ContainerList
+	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
 	c.Check(len(containers.Items), Equals, 1)
 
-	err = doMain()
-	c.Check(err, IsNil)
+	echo := "echo"
+	crunchRunCommand = &echo
+	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+	finishCommand = &finishCmd
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			go func() {
+				time.Sleep(1)
+				dispatcher.UpdateState(container.UUID, dispatch.Running)
+				dispatcher.UpdateState(container.UUID, dispatch.Complete)
+			}()
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
+	err = dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
 	item := containers.Items[0]
 	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
 		fmt.Sprintf("--job-name=%s", item.UUID),
-		fmt.Sprintf("--mem-per-cpu=%s", strconv.Itoa(int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576))))),
-		fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
+		fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
+		fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
+		fmt.Sprintf("--priority=%d", item.Priority)}
 	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
-	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
+	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
 		"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
 
 	// There should be no queued containers now
@@ -133,7 +151,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 	c.Check(len(containers.Items), Equals, 0)
 
 	// Previously "Queued" container should now be in "Complete" state
-	var container Container
+	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
 	c.Check(container.State, Equals, "Complete")
@@ -144,7 +162,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 	apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -153,7 +171,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	api := httptest.NewServer(&apiStub)
 	defer api.Close()
 
-	arv = arvadosclient.ArvadosClient{
+	arv := arvadosclient.ArvadosClient{
 		Scheme:    "http",
 		ApiServer: api.URL[7:],
 		ApiToken:  "abc123",
@@ -165,14 +183,36 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	log.SetOutput(buf)
 	defer log.SetOutput(os.Stderr)
 
+	crunchRunCommand = &crunchCmd
+	finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+	finishCommand = &finishCmd
+
+	doneProcessing := make(chan struct{})
+	dispatcher := dispatch.Dispatcher{
+		Arv:          arv,
+		PollInterval: time.Duration(1) * time.Second,
+		RunContainer: func(dispatcher *dispatch.Dispatcher,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			go func() {
+				time.Sleep(1)
+				dispatcher.UpdateState(container.UUID, dispatch.Running)
+				dispatcher.UpdateState(container.UUID, dispatch.Complete)
+			}()
+			run(dispatcher, container, status)
+			doneProcessing <- struct{}{}
+		},
+		DoneProcessing: doneProcessing}
+
 	go func() {
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		sigChan <- syscall.SIGTERM
+		dispatcher.DoneProcessing <- struct{}{}
 	}()
 
-	runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+	err := dispatcher.RunDispatcher()
+	c.Assert(err, IsNil)
 
 	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list