[ARVADOS] created: ddee3839f8a82b889f84171e2354108cb20f93e0

Git user git at public.curoverse.com
Fri Feb 10 01:25:54 EST 2017


        at  ddee3839f8a82b889f84171e2354108cb20f93e0 (commit)


commit ddee3839f8a82b889f84171e2354108cb20f93e0
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Feb 10 01:25:14 2017 -0500

    10701: Refactor dispatch library.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index ce960c0..7342c3b 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -4,11 +4,14 @@
 package dispatch
 
 import (
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"context"
+	"fmt"
 	"log"
 	"sync"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 )
 
 const (
@@ -19,231 +22,173 @@ const (
 	Cancelled = arvados.ContainerStateCancelled
 )
 
-// Dispatcher holds the state of the dispatcher
-type Dispatcher struct {
-	// The Arvados client
-	Arv *arvadosclient.ArvadosClient
-
-	// When a new queued container appears and is either already owned by
-	// this dispatcher or is successfully locked, the dispatcher will call
-	// go RunContainer().  The RunContainer() goroutine gets a channel over
-	// which it will receive updates to the container state.  The
-	// RunContainer() goroutine should only assume status updates come when
-	// the container record changes on the API server; if it needs to
-	// monitor the job submission to the underlying slurm/grid engine/etc
-	// queue it should spin up its own polling goroutines.  When the
-	// channel is closed, that means the container is no longer being
-	// handled by this dispatcher and the goroutine should terminate.  The
-	// goroutine is responsible for draining the 'status' channel, failure
-	// to do so may deadlock the dispatcher.
-	RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
-
-	// Amount of time to wait between polling for updates.
-	PollPeriod time.Duration
-
-	// Minimum time between two attempts to run the same container
-	MinRetryPeriod time.Duration
-
-	mineMutex sync.Mutex
-	mineMap   map[string]chan arvados.Container
-	Auth      arvados.APIClientAuthorization
-
-	throttle throttle
-
-	stop chan struct{}
+type runner struct {
+	closing bool
+	updates chan arvados.Container
 }
 
-// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
-// for which this process is actively starting/monitoring.  Returns channel to
-// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
-	dispatcher.mineMutex.Lock()
-	defer dispatcher.mineMutex.Unlock()
-	if ch, ok := dispatcher.mineMap[uuid]; ok {
-		return ch
+func (ex *runner) close() {
+	if !ex.closing {
+		close(ex.updates)
 	}
-
-	ch := make(chan arvados.Container)
-	dispatcher.mineMap[uuid] = ch
-	return ch
+	ex.closing = true
 }
 
-// Release a container which is no longer being monitored.
-func (dispatcher *Dispatcher) notMine(uuid string) {
-	dispatcher.mineMutex.Lock()
-	defer dispatcher.mineMutex.Unlock()
-	if ch, ok := dispatcher.mineMap[uuid]; ok {
-		close(ch)
-		delete(dispatcher.mineMap, uuid)
+func (ex *runner) update(c arvados.Container) {
+	if ex.closing {
+		return
 	}
-}
-
-// checkMine returns true if there is a channel for updates associated
-// with container c.  If update is true, also send the container record on
-// the channel.
-func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
-	dispatcher.mineMutex.Lock()
-	defer dispatcher.mineMutex.Unlock()
-	ch, ok := dispatcher.mineMap[c.UUID]
-	if ok {
-		if update {
-			ch <- c
-		}
-		return true
+	select {
+	case <-ex.updates:
+		log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID)
+	default:
 	}
-	return false
+	ex.updates <- c
 }
 
-func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
-	var containers arvados.ContainerList
-	err := dispatcher.Arv.List("containers", params, &containers)
-	if err != nil {
-		log.Printf("Error getting list of containers: %q", err)
-		return
-	}
+type Dispatcher struct {
+	Arv            *arvadosclient.ArvadosClient
+	PollPeriod     time.Duration
+	MinRetryPeriod time.Duration
+	RunContainer   Runner
 
-	if containers.ItemsAvailable > len(containers.Items) {
-		// TODO: support paging
-		log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-			containers.ItemsAvailable,
-			len(containers.Items))
-	}
-	for _, container := range containers.Items {
-		touched[container.UUID] = true
-		dispatcher.handleUpdate(container)
-	}
+	auth     arvados.APIClientAuthorization
+	mtx      sync.Mutex
+	running  map[string]*runner
+	throttle throttle
 }
 
-func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
-	ticker := time.NewTicker(dispatcher.PollPeriod)
-	defer ticker.Stop()
+// A Runner executes a container. If it starts any goroutines, it must
+// not return until it can guarantee that none of those goroutines
+// will do anything with this container.
+type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
 
-	paramsQ := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
-		"order":   []string{"priority desc"},
-		"limit":   "1000"}
-	paramsP := arvadosclient.Dict{
-		"filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
-		"limit":   "1000"}
+func (d *Dispatcher) Run(ctx context.Context) error {
+	err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
+	if err != nil {
+		return fmt.Errorf("error getting my token UUID: %v", err)
+	}
+
+	poll := time.NewTicker(d.PollPeriod)
+	defer poll.Stop()
 
 	for {
-		touched := make(map[string]bool)
-		dispatcher.getContainers(paramsQ, touched)
-		dispatcher.getContainers(paramsP, touched)
-		dispatcher.mineMutex.Lock()
-		var monitored []string
-		for k := range dispatcher.mineMap {
-			if _, ok := touched[k]; !ok {
-				monitored = append(monitored, k)
-			}
+		running := make([]string, 0, len(d.running))
+		d.mtx.Lock()
+		for uuid := range d.running {
+			running = append(running, uuid)
 		}
-		dispatcher.mineMutex.Unlock()
-		if monitored != nil {
-			dispatcher.getContainers(arvadosclient.Dict{
-				"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+		d.mtx.Unlock()
+		if len(running) == 0 {
+			// API bug: ["uuid", "not in", []] does not match everything
+			running = []string{"X"}
 		}
+		d.checkForUpdates([][]interface{}{
+			{"uuid", "in", running}})
+		d.checkForUpdates([][]interface{}{
+			{"state", "=", Queued},
+			{"priority", ">", "0"},
+			{"uuid", "not in", running}})
+		d.checkForUpdates([][]interface{}{
+			{"locked_by_uuid", "=", d.auth.UUID},
+			{"uuid", "not in", running}})
 		select {
-		case <-ticker.C:
-		case <-stop:
-			return
+		case <-poll.C:
+			continue
+		case <-ctx.Done():
+			return ctx.Err()
 		}
 	}
 }
 
-func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
-	if container.State == Queued && dispatcher.checkMine(container, false) {
-		// If we previously started the job, something failed, and it
-		// was re-queued, this dispatcher might still be monitoring it.
-		// Stop the existing monitor, then try to lock and run it
-		// again.
-		dispatcher.notMine(container.UUID)
-	}
+func (d *Dispatcher) start(c arvados.Container) *runner {
+	ex := &runner{
+		updates: make(chan arvados.Container, 1),
+	}
+	if d.running == nil {
+		d.running = make(map[string]*runner)
+	}
+	d.running[c.UUID] = ex
+	go func() {
+		d.RunContainer(d, c, ex.updates)
+		d.mtx.Lock()
+		delete(d.running, c.UUID)
+		d.mtx.Unlock()
+	}()
+	return ex
+}
 
-	if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
-		// If container is Complete, Cancelled, or Queued, LockedByUUID
-		// will be nil.  If the container was formerly Locked, moved
-		// back to Queued and then locked by another dispatcher,
-		// LockedByUUID will be different.  In either case, we want
-		// to stop monitoring it.
-		log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
-		dispatcher.notMine(container.UUID)
-		return
-	}
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+	params := arvadosclient.Dict{
+		"filters": filters,
+		"order":   []string{"priority desc"},
+		"limit":   "1000"}
 
-	if dispatcher.checkMine(container, true) {
-		// Already monitored, sent status update
+	var list arvados.ContainerList
+	err := d.Arv.List("containers", params, &list)
+	if err != nil {
+		log.Printf("Error getting list of containers: %q", err)
 		return
 	}
 
-	if container.State == Queued && container.Priority > 0 {
-		if !dispatcher.throttle.Check(container.UUID) {
-			return
-		}
-		// Try to take the lock
-		if err := dispatcher.Lock(container.UUID); err != nil {
-			return
+	if list.ItemsAvailable > len(list.Items) {
+		// TODO: support paging
+		log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+			list.ItemsAvailable,
+			len(list.Items))
+	}
+
+	d.mtx.Lock()
+	defer d.mtx.Unlock()
+	for _, c := range list.Items {
+		ex, running := d.running[c.UUID]
+		if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
+			log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+		} else if running {
+			switch c.State {
+			case Queued:
+				ex.close()
+			case Locked, Running:
+				ex.update(c)
+			case Cancelled, Complete:
+				ex.close()
+			}
+		} else {
+			switch c.State {
+			case Queued:
+				if err := d.lock(c.UUID); err != nil {
+					log.Printf("Error locking container %s: %s", c.UUID, err)
+				} else {
+					c.State = Locked
+					d.start(c).update(c)
+				}
+			case Locked, Running:
+				d.start(c).update(c)
+			case Cancelled, Complete:
+				ex.close()
+			}
 		}
-		container.State = Locked
-	}
-
-	if container.State == Locked || container.State == Running {
-		// Not currently monitored but in Locked or Running state and
-		// owned by this dispatcher, so start monitoring.
-		go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
 	}
 }
 
 // UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
-	err := dispatcher.Arv.Update("containers", uuid,
+func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
+	err := d.Arv.Update("containers", uuid,
 		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": newState}},
-		nil)
+			"container": arvadosclient.Dict{"state": state},
+		}, nil)
 	if err != nil {
-		log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
+		log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
 	}
 	return err
 }
 
 // Lock makes the lock API call which updates the state of a container to Locked.
-func (dispatcher *Dispatcher) Lock(uuid string) error {
-	err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
-	if err != nil {
-		log.Printf("Error locking container %s: %q", uuid, err)
-	}
-	return err
+func (d *Dispatcher) lock(uuid string) error {
+	return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
 }
 
 // Unlock makes the unlock API call which updates the state of a container to Queued.
-func (dispatcher *Dispatcher) Unlock(uuid string) error {
-	err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
-	if err != nil {
-		log.Printf("Error unlocking container %s: %q", uuid, err)
-	}
-	return err
-}
-
-// Stop causes Run to return after the current polling cycle.
-func (dispatcher *Dispatcher) Stop() {
-	if dispatcher.stop == nil {
-		// already stopped
-		return
-	}
-	close(dispatcher.stop)
-	dispatcher.stop = nil
-}
-
-// Run runs the main loop of the dispatcher.
-func (dispatcher *Dispatcher) Run() (err error) {
-	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
-	if err != nil {
-		log.Printf("Error getting my token UUID: %v", err)
-		return
-	}
-
-	dispatcher.mineMap = make(map[string]chan arvados.Container)
-	dispatcher.stop = make(chan struct{})
-	dispatcher.throttle.hold = dispatcher.MinRetryPeriod
-	dispatcher.pollContainers(dispatcher.stop)
-	return nil
+func (d *Dispatcher) Unlock(uuid string) error {
+	return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index bb3c05c..22f7d8b 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -3,10 +3,8 @@ package main
 // Dispatcher service for Crunch that runs containers locally.
 
 import (
+	"context"
 	"flag"
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"log"
 	"os"
 	"os/exec"
@@ -14,6 +12,10 @@ import (
 	"sync"
 	"syscall"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 )
 
 func main() {
@@ -61,7 +63,8 @@ func doMain() error {
 		PollPeriod:   time.Duration(*pollInterval) * time.Second,
 	}
 
-	err = dispatcher.Run()
+	ctx, cancel := context.WithCancel(context.Background())
+	err = dispatcher.Run(ctx)
 	if err != nil {
 		return err
 	}
@@ -72,7 +75,7 @@ func doMain() error {
 	log.Printf("Received %s, shutting down", sig)
 	signal.Stop(c)
 
-	dispatcher.Stop()
+	cancel()
 
 	runningCmdsMutex.Lock()
 	// Finished dispatching; interrupt any crunch jobs that are still running
@@ -103,7 +106,7 @@ var startCmd = startFunc
 // crunch-run terminates, mark the container as Cancelled.
 func run(dispatcher *dispatch.Dispatcher,
 	container arvados.Container,
-	status chan arvados.Container) {
+	status <-chan arvados.Container) {
 
 	uuid := container.UUID
 
@@ -170,8 +173,7 @@ func run(dispatcher *dispatch.Dispatcher,
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
-	if container.LockedByUUID == dispatcher.Auth.UUID &&
-		(container.State == dispatch.Locked || container.State == dispatch.Running) {
+	if container.State == dispatch.Locked || container.State == dispatch.Running {
 		log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
 			*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
 		dispatcher.UpdateState(uuid, dispatch.Cancelled)
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index ed13a41..a72d65b 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -2,11 +2,7 @@ package main
 
 import (
 	"bytes"
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-	"git.curoverse.com/arvados.git/sdk/go/dispatch"
-	. "gopkg.in/check.v1"
+	"context"
 	"io"
 	"log"
 	"net/http"
@@ -16,6 +12,12 @@ import (
 	"strings"
 	"testing"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
+	. "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -62,14 +64,13 @@ func (s *TestSuite) TestIntegration(c *C) {
 	echo := "echo"
 	crunchRunCommand = &echo
 
+	ctx, cancel := context.WithCancel(context.Background())
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Second,
-		RunContainer: func(dispatcher *dispatch.Dispatcher,
-			container arvados.Container,
-			status chan arvados.Container) {
-			run(dispatcher, container, status)
-			dispatcher.Stop()
+		RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+			run(d, c, s)
+			cancel()
 		},
 	}
 
@@ -79,8 +80,8 @@ func (s *TestSuite) TestIntegration(c *C) {
 		return cmd.Start()
 	}
 
-	err = dispatcher.Run()
-	c.Assert(err, IsNil)
+	err = dispatcher.Run(ctx)
+	c.Assert(err, Equals, context.Canceled)
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
@@ -165,14 +166,13 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	*crunchRunCommand = crunchCmd
 
+	ctx, cancel := context.WithCancel(context.Background())
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Duration(1) * time.Second,
-		RunContainer: func(dispatcher *dispatch.Dispatcher,
-			container arvados.Container,
-			status chan arvados.Container) {
-			run(dispatcher, container, status)
-			dispatcher.Stop()
+		RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+			run(d, c, s)
+			cancel()
 		},
 	}
 
@@ -186,11 +186,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		dispatcher.Stop()
+		cancel()
 	}()
 
-	err := dispatcher.Run()
-	c.Assert(err, IsNil)
+	err := dispatcher.Run(ctx)
+	c.Assert(err, Equals, context.Canceled)
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 476ca1f..617b076 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -4,6 +4,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"flag"
 	"fmt"
 	"log"
@@ -46,7 +47,7 @@ func main() {
 
 var (
 	theConfig Config
-	sqCheck   SqueueChecker
+	sqCheck   = &SqueueChecker{}
 )
 
 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -107,10 +108,10 @@ func doMain() error {
 	}
 	arv.Retries = 25
 
-	sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+	sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
 	defer sqCheck.Stop()
 
-	dispatcher := dispatch.Dispatcher{
+	dispatcher := &dispatch.Dispatcher{
 		Arv:            arv,
 		RunContainer:   run,
 		PollPeriod:     time.Duration(theConfig.PollPeriod),
@@ -121,7 +122,7 @@ func doMain() error {
 		log.Printf("Error notifying init daemon: %v", err)
 	}
 
-	return dispatcher.Run()
+	return dispatcher.Run(context.Background())
 }
 
 // sbatchCmd
@@ -184,97 +185,74 @@ func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunch
 	}
 }
 
-// If the container is marked as Locked, check if it is already in the slurm
-// queue.  If not, submit it.
-//
-// If the container is marked as Running, check if it is in the slurm queue.
-// If not, mark it as Cancelled.
-func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
-	submitted := false
-	for !*monitorDone {
-		if sqCheck.HasUUID(container.UUID) {
-			// Found in the queue, so continue monitoring
-			submitted = true
-		} else if container.State == dispatch.Locked && !submitted {
-			// Not in queue but in Locked state and we haven't
-			// submitted it yet, so submit it.
-
-			log.Printf("About to submit queued container %v", container.UUID)
-
-			if err := submit(dispatcher, container, theConfig.CrunchRunCommand); err != nil {
-				log.Printf("Error submitting container %s to slurm: %v",
-					container.UUID, err)
-				// maybe sbatch is broken, put it back to queued
-				dispatcher.Unlock(container.UUID)
-			}
-			submitted = true
-		} else {
-			// Not in queue and we are not going to submit it.
-			// Refresh the container state. If it is
-			// Complete/Cancelled, do nothing, if it is Locked then
-			// release it back to the Queue, if it is Running then
-			// clean up the record.
-
-			var con arvados.Container
-			err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
-			if err != nil {
-				log.Printf("Error getting final container state: %v", err)
-			}
+// Submit a container to the slurm queue (or resume monitoring if it's
+// already in the queue).  Cancel the slurm job if the container's
+// priority changes to zero or its state indicates it's no longer
+// running.
+func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+		log.Printf("Submitting container %s to slurm", ctr.UUID)
+		if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
+			log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+			disp.Unlock(ctr.UUID)
+			return
+		}
+	}
 
-			switch con.State {
-			case dispatch.Locked:
-				log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-					container.UUID, con.State, dispatch.Queued)
-				dispatcher.Unlock(container.UUID)
+	log.Printf("Start monitoring container %s", ctr.UUID)
+	defer log.Printf("Done monitoring container %s", ctr.UUID)
+
+	// If the container disappears from the slurm queue, there is
+	// no point in waiting for further dispatch updates: just
+	// clean up and return.
+	go func(uuid string) {
+		for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+		}
+		cancel()
+	}(ctr.UUID)
+
+	for {
+		select {
+		case <-ctx.Done():
+			// Disappeared from squeue
+			if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+				log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
+			}
+			switch ctr.State {
 			case dispatch.Running:
-				st := dispatch.Cancelled
-				log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-					container.UUID, con.State, st)
-				dispatcher.UpdateState(container.UUID, st)
-			default:
-				// Container state is Queued, Complete or Cancelled so stop monitoring it.
-				return
+				disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+			case dispatch.Locked:
+				disp.Unlock(ctr.UUID)
+			}
+			return
+		case updated, ok := <-status:
+			if !ok {
+				log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
+				scancel(ctr)
+			} else if updated.Priority == 0 {
+				log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+				scancel(ctr)
 			}
 		}
 	}
 }
 
-// Run or monitor a container.
-//
-// Monitor status updates.  If the priority changes to zero, cancel the
-// container using scancel.
-func run(dispatcher *dispatch.Dispatcher,
-	container arvados.Container,
-	status chan arvados.Container) {
-
-	log.Printf("Monitoring container %v started", container.UUID)
-	defer log.Printf("Monitoring container %v finished", container.UUID)
-
-	monitorDone := false
-	go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
-
-	for container = range status {
-		if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
-			log.Printf("Canceling container %s", container.UUID)
-			// Mutex between squeue sync and running sbatch or scancel.
-			sqCheck.L.Lock()
-			cmd := scancelCmd(container)
-			msg, err := cmd.CombinedOutput()
-			sqCheck.L.Unlock()
-
-			if err != nil {
-				log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
-				if sqCheck.HasUUID(container.UUID) {
-					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
-					continue
-				}
-			}
+func scancel(ctr arvados.Container) {
+	sqCheck.L.Lock()
+	cmd := scancelCmd(ctr)
+	msg, err := cmd.CombinedOutput()
+	sqCheck.L.Unlock()
 
-			// Ignore errors; if necessary, we'll try again next time
-			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-		}
+	if err != nil {
+		log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+		time.Sleep(time.Second)
+	} else if sqCheck.HasUUID(ctr.UUID) {
+		log.Printf("container %s is still in squeue after scancel", ctr.UUID)
+		time.Sleep(time.Second)
 	}
-	monitorDone = true
 }
 
 func readConfig(dst interface{}, path string) error {
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 8809e7b..e6abb16 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -2,11 +2,8 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"io"
 	"io/ioutil"
 	"log"
@@ -18,6 +15,10 @@ import (
 	"testing"
 	"time"
 
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	. "gopkg.in/check.v1"
 )
 
@@ -59,30 +60,50 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+	done := false
+	container := s.integrationTest(c,
+		func() *exec.Cmd {
+			if done {
+				return exec.Command("true")
+			} else {
+				return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+			}
+		},
 		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			time.Sleep(3 * time.Second)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
+			done = true
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
 func (s *TestSuite) TestIntegrationCancel(c *C) {
-
-	// Override sbatchCmd
+	var cmd *exec.Cmd
 	var scancelCmdLine []string
 	defer func(orig func(arvados.Container) *exec.Cmd) {
 		scancelCmd = orig
 	}(scancelCmd)
+	attempt := 0
 	scancelCmd = func(container arvados.Container) *exec.Cmd {
-		scancelCmdLine = scancelFunc(container).Args
-		return exec.Command("echo")
+		if attempt++; attempt == 1 {
+			return exec.Command("false")
+		} else {
+			scancelCmdLine = scancelFunc(container).Args
+			cmd = exec.Command("echo")
+			return cmd
+		}
 	}
 
 	container := s.integrationTest(c,
-		func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+		func() *exec.Cmd {
+			if cmd != nil && cmd.ProcessState != nil {
+				return exec.Command("true")
+			} else {
+				return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+			}
+		},
 		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -146,22 +167,21 @@ func (s *TestSuite) integrationTest(c *C,
 
 	theConfig.CrunchRunCommand = []string{"echo"}
 
+	ctx, cancel := context.WithCancel(context.Background())
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Duration(1) * time.Second,
-		RunContainer: func(dispatcher *dispatch.Dispatcher,
-			container arvados.Container,
-			status chan arvados.Container) {
-			go runContainer(dispatcher, container)
-			run(dispatcher, container, status)
-			dispatcher.Stop()
+		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+			go runContainer(disp, ctr)
+			run(disp, ctr, status)
+			cancel()
 		},
 	}
 
-	sqCheck = SqueueChecker{Period: 500 * time.Millisecond}
+	sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
 
-	err = dispatcher.Run()
-	c.Assert(err, IsNil)
+	err = dispatcher.Run(ctx)
+	c.Assert(err, Equals, context.Canceled)
 
 	sqCheck.Stop()
 
@@ -207,19 +227,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	theConfig.CrunchRunCommand = []string{crunchCmd}
 
+	ctx, cancel := context.WithCancel(context.Background())
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Duration(1) * time.Second,
-		RunContainer: func(dispatcher *dispatch.Dispatcher,
-			container arvados.Container,
-			status chan arvados.Container) {
+		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
 			go func() {
 				time.Sleep(1 * time.Second)
-				dispatcher.UpdateState(container.UUID, dispatch.Running)
-				dispatcher.UpdateState(container.UUID, dispatch.Complete)
+				disp.UpdateState(ctr.UUID, dispatch.Running)
+				disp.UpdateState(ctr.UUID, dispatch.Complete)
 			}()
-			run(dispatcher, container, status)
-			dispatcher.Stop()
+			run(disp, ctr, status)
+			cancel()
 		},
 	}
 
@@ -227,11 +246,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		dispatcher.Stop()
+		cancel()
 	}()
 
-	err := dispatcher.Run()
-	c.Assert(err, IsNil)
+	err := dispatcher.Run(ctx)
+	c.Assert(err, Equals, context.Canceled)
 
 	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }

commit 4fd89ed7b10f0860a6030c25e44d4df45a087b2e
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Feb 9 12:24:20 2017 -0500

    10701: Tidy up error handling.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index fe559f1..476ca1f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -151,18 +151,7 @@ var sbatchCmd = sbatchFunc
 var scancelCmd = scancelFunc
 
 // Submit job to slurm using sbatch.
-func submit(dispatcher *dispatch.Dispatcher,
-	container arvados.Container, crunchRunCommand []string) (submitErr error) {
-	defer func() {
-		// If we didn't get as far as submitting a slurm job,
-		// unlock the container and return it to the queue.
-		if submitErr == nil {
-			// OK, no cleanup needed
-			return
-		}
-		dispatcher.Unlock(container.UUID)
-	}()
-
+func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
 	cmd := sbatchCmd(container)
 
 	// Send a tiny script on stdin to execute the crunch-run
@@ -179,13 +168,18 @@ func submit(dispatcher *dispatch.Dispatcher,
 
 	log.Printf("exec sbatch %+q", cmd.Args)
 	err := cmd.Run()
+
 	switch err.(type) {
 	case nil:
 		log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
 		return nil
+
 	case *exec.ExitError:
+		dispatcher.Unlock(container.UUID)
 		return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
+
 	default:
+		dispatcher.Unlock(container.UUID)
 		return fmt.Errorf("exec failed: %v", err)
 	}
 }

commit 8ee2c7c4f231a55601fdc90b087e42985b52fb20
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Feb 9 12:15:30 2017 -0500

    10701: Fix edge case: scheduling_parameters.partitions=[]

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 725d6a4..fe559f1 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -134,7 +134,7 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
 	sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
 	sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
 	sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
-	if container.SchedulingParameters.Partitions != nil {
+	if len(container.SchedulingParameters.Partitions) > 0 {
 		sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
 	}
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list