[ARVADOS] created: e34a5060cfc1cc4821b431e8aa6778a31898e0eb

Git user git at public.curoverse.com
Thu Jan 26 14:49:46 EST 2017


        at  e34a5060cfc1cc4821b431e8aa6778a31898e0eb (commit)


commit e34a5060cfc1cc4821b431e8aa6778a31898e0eb
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Jan 26 14:40:30 2017 -0500

    10700: Rename PollInterval to PollPeriod in library to match commands and config files.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 171ce60..ce960c0 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -39,7 +39,7 @@ type Dispatcher struct {
 	RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
 
 	// Amount of time to wait between polling for updates.
-	PollInterval time.Duration
+	PollPeriod time.Duration
 
 	// Minimum time between two attempts to run the same container
 	MinRetryPeriod time.Duration
@@ -115,7 +115,7 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
 }
 
 func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
-	ticker := time.NewTicker(dispatcher.PollInterval)
+	ticker := time.NewTicker(dispatcher.PollPeriod)
 	defer ticker.Stop()
 
 	paramsQ := arvadosclient.Dict{
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index cfb0c7d..bb3c05c 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -58,7 +58,7 @@ func doMain() error {
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		RunContainer: run,
-		PollInterval: time.Duration(*pollInterval) * time.Second,
+		PollPeriod:   time.Duration(*pollInterval) * time.Second,
 	}
 
 	err = dispatcher.Run()
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 0454730..ed13a41 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -63,8 +63,8 @@ func (s *TestSuite) TestIntegration(c *C) {
 	crunchRunCommand = &echo
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:          arv,
-		PollInterval: time.Second,
+		Arv:        arv,
+		PollPeriod: time.Second,
 		RunContainer: func(dispatcher *dispatch.Dispatcher,
 			container arvados.Container,
 			status chan arvados.Container) {
@@ -166,8 +166,8 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	*crunchRunCommand = crunchCmd
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:          arv,
-		PollInterval: time.Duration(1) * time.Second,
+		Arv:        arv,
+		PollPeriod: time.Duration(1) * time.Second,
 		RunContainer: func(dispatcher *dispatch.Dispatcher,
 			container arvados.Container,
 			status chan arvados.Container) {
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 8cf2c00..86ae79a 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -104,7 +104,7 @@ func doMain() error {
 	dispatcher := dispatch.Dispatcher{
 		Arv:            arv,
 		RunContainer:   run,
-		PollInterval:   time.Duration(theConfig.PollPeriod),
+		PollPeriod:     time.Duration(theConfig.PollPeriod),
 		MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
 	}
 
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 86844dc..de25fc3 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -147,8 +147,8 @@ func (s *TestSuite) integrationTest(c *C,
 	theConfig.CrunchRunCommand = []string{"echo"}
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:          arv,
-		PollInterval: time.Duration(1) * time.Second,
+		Arv:        arv,
+		PollPeriod: time.Duration(1) * time.Second,
 		RunContainer: func(dispatcher *dispatch.Dispatcher,
 			container arvados.Container,
 			status chan arvados.Container) {
@@ -208,8 +208,8 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	theConfig.CrunchRunCommand = []string{crunchCmd}
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:          arv,
-		PollInterval: time.Duration(1) * time.Second,
+		Arv:        arv,
+		PollPeriod: time.Duration(1) * time.Second,
 		RunContainer: func(dispatcher *dispatch.Dispatcher,
 			container arvados.Container,
 			status chan arvados.Container) {

commit ae5eb12d3d9ab298a4c36412b4a4d83272574d25
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Jan 26 14:37:54 2017 -0500

    10704: Rate-limit startup attempts per container.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 4b6c813..f412dea 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -88,6 +88,7 @@ sdk/python
 sdk/ruby
 sdk/go/arvados
 sdk/go/arvadosclient
+sdk/go/dispatch
 sdk/go/keepclient
 sdk/go/httpserver
 sdk/go/manifest
@@ -769,6 +770,7 @@ gostuff=(
     sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/blockdigest
+    sdk/go/dispatch
     sdk/go/httpserver
     sdk/go/manifest
     sdk/go/streamer
diff --git a/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid b/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid
index a0d7f13..d01326a 100644
--- a/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid
+++ b/doc/install/crunch2-slurm/install-dispatch.html.textile.liquid
@@ -120,6 +120,18 @@ You can work around this issue by disabling the Docker daemon's systemd integrat
 
 {% include 'notebox_end' %}
 
+h3. MinRetryPeriod: Rate-limit repeated attempts to start containers
+
+If SLURM is unable to run a container, the dispatcher will submit it again after the next PollPeriod. If PollPeriod is very short, this can be excessive. If MinRetryPeriod is set, the dispatcher will avoid submitting the same container to SLURM more than once in the given time span.
+
+<notextile>
+<pre><code class="userinput">Client:
+  APIHost: <b>zzzzz.arvadosapi.com</b>
+  AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
+MinRetryPeriod: <b>30s</b>
+</code></pre>
+</notextile>
+
 h2. Restart the dispatcher
 
 {% include 'notebox_begin' %}
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 4129b24..171ce60 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -41,10 +41,15 @@ type Dispatcher struct {
 	// Amount of time to wait between polling for updates.
 	PollInterval time.Duration
 
+	// Minimum time between two attempts to run the same container
+	MinRetryPeriod time.Duration
+
 	mineMutex sync.Mutex
 	mineMap   map[string]chan arvados.Container
 	Auth      arvados.APIClientAuthorization
 
+	throttle throttle
+
 	stop chan struct{}
 }
 
@@ -171,6 +176,9 @@ func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
 	}
 
 	if container.State == Queued && container.Priority > 0 {
+		if !dispatcher.throttle.Check(container.UUID) {
+			return
+		}
 		// Try to take the lock
 		if err := dispatcher.Lock(container.UUID); err != nil {
 			return
@@ -235,6 +243,7 @@ func (dispatcher *Dispatcher) Run() (err error) {
 
 	dispatcher.mineMap = make(map[string]chan arvados.Container)
 	dispatcher.stop = make(chan struct{})
+	dispatcher.throttle.hold = dispatcher.MinRetryPeriod
 	dispatcher.pollContainers(dispatcher.stop)
 	return nil
 }
diff --git a/sdk/go/dispatch/throttle.go b/sdk/go/dispatch/throttle.go
new file mode 100644
index 0000000..5448078
--- /dev/null
+++ b/sdk/go/dispatch/throttle.go
@@ -0,0 +1,58 @@
+package dispatch
+
+import (
+	"sync"
+	"time"
+)
+
+type throttleEnt struct {
+	last time.Time // last attempt that was allowed
+}
+
+type throttle struct {
+	hold      time.Duration
+	seen      map[string]*throttleEnt
+	updated   sync.Cond
+	setupOnce sync.Once
+	mtx       sync.Mutex
+}
+
+// Check checks whether there have been too many recent attempts with
+// the given uuid, and returns true if it's OK to attempt [again] now.
+func (t *throttle) Check(uuid string) bool {
+	if t.hold == 0 {
+		return true
+	}
+	t.setupOnce.Do(t.setup)
+	t.mtx.Lock()
+	defer t.updated.Broadcast()
+	defer t.mtx.Unlock()
+	ent, ok := t.seen[uuid]
+	if !ok {
+		t.seen[uuid] = &throttleEnt{last: time.Now()}
+		return true
+	}
+	if time.Since(ent.last) < t.hold {
+		return false
+	}
+	ent.last = time.Now()
+	return true
+}
+
+func (t *throttle) setup() {
+	t.seen = make(map[string]*throttleEnt)
+	t.updated.L = &t.mtx
+	go func() {
+		for range time.NewTicker(t.hold).C {
+			t.mtx.Lock()
+			for uuid, ent := range t.seen {
+				if time.Since(ent.last) >= t.hold {
+					delete(t.seen, uuid)
+				}
+			}
+			// don't bother cleaning again until the next update
+			t.updated.Wait()
+			t.mtx.Unlock()
+		}
+	}()
+}
diff --git a/sdk/go/dispatch/throttle_test.go b/sdk/go/dispatch/throttle_test.go
new file mode 100644
index 0000000..ba65783
--- /dev/null
+++ b/sdk/go/dispatch/throttle_test.go
@@ -0,0 +1,38 @@
+package dispatch
+
+import (
+	"testing"
+	"time"
+
+	check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (*Suite) TestThrottle(c *check.C) {
+	uuid := "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+	t := throttle{}
+	c.Check(t.Check(uuid), check.Equals, true)
+	c.Check(t.Check(uuid), check.Equals, true)
+
+	t = throttle{hold: time.Nanosecond}
+	c.Check(t.Check(uuid), check.Equals, true)
+	time.Sleep(time.Microsecond)
+	c.Check(t.Check(uuid), check.Equals, true)
+
+	t = throttle{hold: time.Minute}
+	c.Check(t.Check(uuid), check.Equals, true)
+	c.Check(t.Check(uuid), check.Equals, false)
+	c.Check(t.Check(uuid), check.Equals, false)
+	t.seen[uuid].last = time.Now().Add(-time.Hour)
+	c.Check(t.Check(uuid), check.Equals, true)
+	c.Check(t.Check(uuid), check.Equals, false)
+}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 8e61462..8cf2c00 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -31,6 +31,9 @@ type Config struct {
 	//
 	// Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
 	CrunchRunCommand []string
+
+	// Minimum time between two attempts to run the same container
+	MinRetryPeriod arvados.Duration
 }
 
 func main() {
@@ -99,9 +102,10 @@ func doMain() error {
 	defer squeueUpdater.Done()
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:          arv,
-		RunContainer: run,
-		PollInterval: time.Duration(theConfig.PollPeriod),
+		Arv:            arv,
+		RunContainer:   run,
+		PollInterval:   time.Duration(theConfig.PollPeriod),
+		MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
 	}
 
 	if _, err := daemon.SdNotify(false, "READY=1"); err != nil {

commit 683f5374b0fc516579c1d6dc3379fc900d642322
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jan 25 16:37:26 2017 -0500

    10703: Un-pyramid run().

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 19ab5aa..8e61462 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -250,29 +250,30 @@ func run(dispatcher *dispatch.Dispatcher,
 	go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
 
 	for container = range status {
-		if container.State == dispatch.Locked || container.State == dispatch.Running {
-			if container.Priority == 0 {
-				log.Printf("Canceling container %s", container.UUID)
-
-				// Mutex between squeue sync and running sbatch or scancel.
-				squeueUpdater.SlurmLock.Lock()
-				cmd := scancelCmd(container)
-				msg, err := cmd.CombinedOutput()
-				squeueUpdater.SlurmLock.Unlock()
-
-				if err != nil {
-					log.Printf("Error stopping container %s with %v %v: %v %v",
-						container.UUID, cmd.Path, cmd.Args, err, string(msg))
-					if squeueUpdater.CheckSqueue(container.UUID) {
-						log.Printf("Container %s is still in squeue after scancel.",
-							container.UUID)
-						continue
-					}
-				}
-
-				err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		if !(container.State == dispatch.Locked || container.State == dispatch.Running) {
+			continue
+		}
+		if container.Priority != 0 {
+			continue
+		}
+		log.Printf("Canceling container %s", container.UUID)
+
+		// Mutex between squeue sync and running sbatch or scancel.
+		squeueUpdater.SlurmLock.Lock()
+		cmd := scancelCmd(container)
+		msg, err := cmd.CombinedOutput()
+		squeueUpdater.SlurmLock.Unlock()
+
+		if err != nil {
+			log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+			if squeueUpdater.CheckSqueue(container.UUID) {
+				log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+				continue
 			}
 		}
+
+		// Ignore errors; if necessary, we'll try again next time
+		dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
 	}
 	monitorDone = true
 }

commit 318dd887d108e0664ef22a8d38a34fa18e1b2657
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jan 25 16:04:30 2017 -0500

    10701: Remove unneeded complexity in squeue invocation.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index ae70800..19ab5aa 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,6 +3,7 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
+	"bytes"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -10,8 +11,6 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/config"
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"github.com/coreos/go-systemd/daemon"
-	"io"
-	"io/ioutil"
 	"log"
 	"math"
 	"os"
@@ -154,70 +153,31 @@ func submit(dispatcher *dispatch.Dispatcher,
 		}
 	}()
 
-	// Create the command and attach to stdin/stdout
 	cmd := sbatchCmd(container)
-	stdinWriter, stdinerr := cmd.StdinPipe()
-	if stdinerr != nil {
-		submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
-		return
-	}
 
-	stdoutReader, stdoutErr := cmd.StdoutPipe()
-	if stdoutErr != nil {
-		submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
-		return
-	}
+	// Send a tiny script on stdin to execute the crunch-run
+	// command (slurm requires this to be a #! script)
+	cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
 
-	stderrReader, stderrErr := cmd.StderrPipe()
-	if stderrErr != nil {
-		submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
-		return
-	}
+	var stdout, stderr bytes.Buffer
+	cmd.Stdout = &stdout
+	cmd.Stderr = &stderr
 
 	// Mutex between squeue sync and running sbatch or scancel.
 	squeueUpdater.SlurmLock.Lock()
 	defer squeueUpdater.SlurmLock.Unlock()
 
-	log.Printf("sbatch starting: %+q", cmd.Args)
-	err := cmd.Start()
-	if err != nil {
-		submitErr = fmt.Errorf("Error starting sbatch: %v", err)
-		return
+	log.Printf("exec sbatch %+q", cmd.Args)
+	err := cmd.Run()
+	switch err.(type) {
+	case nil:
+		log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
+		return nil
+	case *exec.ExitError:
+		return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr)
+	default:
+		return fmt.Errorf("exec failed: %v", err)
 	}
-
-	stdoutChan := make(chan []byte)
-	go func() {
-		b, _ := ioutil.ReadAll(stdoutReader)
-		stdoutReader.Close()
-		stdoutChan <- b
-		close(stdoutChan)
-	}()
-
-	stderrChan := make(chan []byte)
-	go func() {
-		b, _ := ioutil.ReadAll(stderrReader)
-		stderrReader.Close()
-		stderrChan <- b
-		close(stderrChan)
-	}()
-
-	// Send a tiny script on stdin to execute the crunch-run command
-	// slurm actually enforces that this must be a #! script
-	io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
-	stdinWriter.Close()
-
-	stdoutMsg := <-stdoutChan
-	stderrmsg := <-stderrChan
-
-	err = cmd.Wait()
-
-	if err != nil {
-		submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
-		return
-	}
-
-	log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
-	return
 }
 
 // If the container is marked as Locked, check if it is already in the slurm

commit e356309e05714cd65d88456c563cea606f820394
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jan 25 10:59:48 2017 -0500

    10703: Do not catch signals in crunch-dispatch-slurm. Simplify "stop dispatcher loop" API.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 4987c01..4129b24 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -7,10 +7,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"log"
-	"os"
-	"os/signal"
 	"sync"
-	"syscall"
 	"time"
 )
 
@@ -44,13 +41,11 @@ type Dispatcher struct {
 	// Amount of time to wait between polling for updates.
 	PollInterval time.Duration
 
-	// Channel used to signal that RunDispatcher loop should exit.
-	DoneProcessing chan struct{}
+	mineMutex sync.Mutex
+	mineMap   map[string]chan arvados.Container
+	Auth      arvados.APIClientAuthorization
 
-	mineMutex  sync.Mutex
-	mineMap    map[string]chan arvados.Container
-	Auth       arvados.APIClientAuthorization
-	containers chan arvados.Container
+	stop chan struct{}
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
@@ -110,12 +105,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
 	}
 	for _, container := range containers.Items {
 		touched[container.UUID] = true
-		dispatcher.containers <- container
+		dispatcher.handleUpdate(container)
 	}
 }
 
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
 	ticker := time.NewTicker(dispatcher.PollInterval)
+	defer ticker.Stop()
 
 	paramsQ := arvadosclient.Dict{
 		"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
@@ -126,26 +122,24 @@ func (dispatcher *Dispatcher) pollContainers() {
 		"limit":   "1000"}
 
 	for {
+		touched := make(map[string]bool)
+		dispatcher.getContainers(paramsQ, touched)
+		dispatcher.getContainers(paramsP, touched)
+		dispatcher.mineMutex.Lock()
+		var monitored []string
+		for k := range dispatcher.mineMap {
+			if _, ok := touched[k]; !ok {
+				monitored = append(monitored, k)
+			}
+		}
+		dispatcher.mineMutex.Unlock()
+		if monitored != nil {
+			dispatcher.getContainers(arvadosclient.Dict{
+				"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+		}
 		select {
 		case <-ticker.C:
-			touched := make(map[string]bool)
-			dispatcher.getContainers(paramsQ, touched)
-			dispatcher.getContainers(paramsP, touched)
-			dispatcher.mineMutex.Lock()
-			var monitored []string
-			for k := range dispatcher.mineMap {
-				if _, ok := touched[k]; !ok {
-					monitored = append(monitored, k)
-				}
-			}
-			dispatcher.mineMutex.Unlock()
-			if monitored != nil {
-				dispatcher.getContainers(arvadosclient.Dict{
-					"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
-			}
-		case <-dispatcher.DoneProcessing:
-			close(dispatcher.containers)
-			ticker.Stop()
+		case <-stop:
 			return
 		}
 	}
@@ -221,10 +215,18 @@ func (dispatcher *Dispatcher) Unlock(uuid string) error {
 	return err
 }
 
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+	if dispatcher.stop == nil {
+		// already stopped
+		return
+	}
+	close(dispatcher.stop)
+	dispatcher.stop = nil
+}
+
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
 	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
 	if err != nil {
 		log.Printf("Error getting my token UUID: %v", err)
@@ -232,26 +234,7 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
 	}
 
 	dispatcher.mineMap = make(map[string]chan arvados.Container)
-	dispatcher.containers = make(chan arvados.Container)
-
-	// Graceful shutdown on signal
-	sigChan := make(chan os.Signal)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			dispatcher.DoneProcessing <- struct{}{}
-		}
-	}(sigChan)
-
-	defer close(sigChan)
-	defer signal.Stop(sigChan)
-
-	go dispatcher.pollContainers()
-	for container := range dispatcher.containers {
-		dispatcher.handleUpdate(container)
-	}
-
+	dispatcher.stop = make(chan struct{})
+	dispatcher.pollContainers(dispatcher.stop)
 	return nil
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 0ca7651..cfb0c7d 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -10,7 +10,9 @@ import (
 	"log"
 	"os"
 	"os/exec"
+	"os/signal"
 	"sync"
+	"syscall"
 	"time"
 )
 
@@ -54,16 +56,24 @@ func doMain() error {
 	arv.Retries = 25
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:            arv,
-		RunContainer:   run,
-		PollInterval:   time.Duration(*pollInterval) * time.Second,
-		DoneProcessing: make(chan struct{})}
+		Arv:          arv,
+		RunContainer: run,
+		PollInterval: time.Duration(*pollInterval) * time.Second,
+	}
 
-	err = dispatcher.RunDispatcher()
+	err = dispatcher.Run()
 	if err != nil {
 		return err
 	}
 
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
+	sig := <-c
+	log.Printf("Received %s, shutting down", sig)
+	signal.Stop(c)
+
+	dispatcher.Stop()
+
 	runningCmdsMutex.Lock()
 	// Finished dispatching; interrupt any crunch jobs that are still running
 	for _, cmd := range runningCmds {
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index bcb406e..0454730 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -62,7 +62,6 @@ func (s *TestSuite) TestIntegration(c *C) {
 	echo := "echo"
 	crunchRunCommand = &echo
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Second,
@@ -70,9 +69,9 @@ func (s *TestSuite) TestIntegration(c *C) {
 			container arvados.Container,
 			status chan arvados.Container) {
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
 		dispatcher.UpdateState(container.UUID, "Running")
@@ -80,7 +79,7 @@ func (s *TestSuite) TestIntegration(c *C) {
 		return cmd.Start()
 	}
 
-	err = dispatcher.RunDispatcher()
+	err = dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	// Wait for all running crunch jobs to complete / terminate
@@ -166,7 +165,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	*crunchRunCommand = crunchCmd
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Duration(1) * time.Second,
@@ -174,9 +172,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 			container arvados.Container,
 			status chan arvados.Container) {
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
 		dispatcher.UpdateState(container.UUID, "Running")
@@ -188,10 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		dispatcher.DoneProcessing <- struct{}{}
+		dispatcher.Stop()
 	}()
 
-	err := dispatcher.RunDispatcher()
+	err := dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	// Wait for all running crunch jobs to complete / terminate
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index e768b50..ae70800 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -100,21 +100,16 @@ func doMain() error {
 	defer squeueUpdater.Done()
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:            arv,
-		RunContainer:   run,
-		PollInterval:   time.Duration(theConfig.PollPeriod),
-		DoneProcessing: make(chan struct{})}
+		Arv:          arv,
+		RunContainer: run,
+		PollInterval: time.Duration(theConfig.PollPeriod),
+	}
 
 	if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
 		log.Printf("Error notifying init daemon: %v", err)
 	}
 
-	err = dispatcher.RunDispatcher()
-	if err != nil {
-		return err
-	}
-
-	return nil
+	return dispatcher.Run()
 }
 
 // sbatchCmd
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 4046103..86844dc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -146,7 +146,6 @@ func (s *TestSuite) integrationTest(c *C,
 
 	theConfig.CrunchRunCommand = []string{"echo"}
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Duration(1) * time.Second,
@@ -155,13 +154,13 @@ func (s *TestSuite) integrationTest(c *C,
 			status chan arvados.Container) {
 			go runContainer(dispatcher, container)
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
 
-	err = dispatcher.RunDispatcher()
+	err = dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	squeueUpdater.Done()
@@ -208,7 +207,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	theConfig.CrunchRunCommand = []string{crunchCmd}
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Duration(1) * time.Second,
@@ -221,18 +219,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 				dispatcher.UpdateState(container.UUID, dispatch.Complete)
 			}()
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	go func() {
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		dispatcher.DoneProcessing <- struct{}{}
+		dispatcher.Stop()
 	}()
 
-	err := dispatcher.RunDispatcher()
+	err := dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list