[ARVADOS] created: ddee3839f8a82b889f84171e2354108cb20f93e0
Git user
git at public.curoverse.com
Fri Feb 10 01:25:54 EST 2017
at ddee3839f8a82b889f84171e2354108cb20f93e0 (commit)
commit ddee3839f8a82b889f84171e2354108cb20f93e0
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Feb 10 01:25:14 2017 -0500
10701: Refactor dispatch library.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index ce960c0..7342c3b 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -4,11 +4,14 @@
package dispatch
import (
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "context"
+ "fmt"
"log"
"sync"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
)
const (
@@ -19,231 +22,173 @@ const (
Cancelled = arvados.ContainerStateCancelled
)
-// Dispatcher holds the state of the dispatcher
-type Dispatcher struct {
- // The Arvados client
- Arv *arvadosclient.ArvadosClient
-
- // When a new queued container appears and is either already owned by
- // this dispatcher or is successfully locked, the dispatcher will call
- // go RunContainer(). The RunContainer() goroutine gets a channel over
- // which it will receive updates to the container state. The
- // RunContainer() goroutine should only assume status updates come when
- // the container record changes on the API server; if it needs to
- // monitor the job submission to the underlying slurm/grid engine/etc
- // queue it should spin up its own polling goroutines. When the
- // channel is closed, that means the container is no longer being
- // handled by this dispatcher and the goroutine should terminate. The
- // goroutine is responsible for draining the 'status' channel, failure
- // to do so may deadlock the dispatcher.
- RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
-
- // Amount of time to wait between polling for updates.
- PollPeriod time.Duration
-
- // Minimum time between two attempts to run the same container
- MinRetryPeriod time.Duration
-
- mineMutex sync.Mutex
- mineMap map[string]chan arvados.Container
- Auth arvados.APIClientAuthorization
-
- throttle throttle
-
- stop chan struct{}
+type runner struct {
+ closing bool
+ updates chan arvados.Container
}
-// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
-// for which this process is actively starting/monitoring. Returns channel to
-// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- if ch, ok := dispatcher.mineMap[uuid]; ok {
- return ch
+func (ex *runner) close() {
+ if !ex.closing {
+ close(ex.updates)
}
-
- ch := make(chan arvados.Container)
- dispatcher.mineMap[uuid] = ch
- return ch
+ ex.closing = true
}
-// Release a container which is no longer being monitored.
-func (dispatcher *Dispatcher) notMine(uuid string) {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- if ch, ok := dispatcher.mineMap[uuid]; ok {
- close(ch)
- delete(dispatcher.mineMap, uuid)
+func (ex *runner) update(c arvados.Container) {
+ if ex.closing {
+ return
}
-}
-
-// checkMine returns true if there is a channel for updates associated
-// with container c. If update is true, also send the container record on
-// the channel.
-func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
- dispatcher.mineMutex.Lock()
- defer dispatcher.mineMutex.Unlock()
- ch, ok := dispatcher.mineMap[c.UUID]
- if ok {
- if update {
- ch <- c
- }
- return true
+ select {
+ case <-ex.updates:
+ log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID)
+ default:
}
- return false
+ ex.updates <- c
}
-func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
- var containers arvados.ContainerList
- err := dispatcher.Arv.List("containers", params, &containers)
- if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- return
- }
+type Dispatcher struct {
+ Arv *arvadosclient.ArvadosClient
+ PollPeriod time.Duration
+ MinRetryPeriod time.Duration
+ RunContainer Runner
- if containers.ItemsAvailable > len(containers.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- containers.ItemsAvailable,
- len(containers.Items))
- }
- for _, container := range containers.Items {
- touched[container.UUID] = true
- dispatcher.handleUpdate(container)
- }
+ auth arvados.APIClientAuthorization
+ mtx sync.Mutex
+ running map[string]*runner
+ throttle throttle
}
-func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
- ticker := time.NewTicker(dispatcher.PollPeriod)
- defer ticker.Stop()
+// A Runner executes a container. If it starts any goroutines, it must
+// not return until it can guarantee that none of those goroutines
+// will do anything with this container.
+type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
- paramsQ := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
- "order": []string{"priority desc"},
- "limit": "1000"}
- paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
- "limit": "1000"}
+func (d *Dispatcher) Run(ctx context.Context) error {
+ err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
+ if err != nil {
+ return fmt.Errorf("error getting my token UUID: %v", err)
+ }
+
+ poll := time.NewTicker(d.PollPeriod)
+ defer poll.Stop()
for {
- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
- }
+ running := make([]string, 0, len(d.running))
+ d.mtx.Lock()
+ for uuid := range d.running {
+ running = append(running, uuid)
}
- dispatcher.mineMutex.Unlock()
- if monitored != nil {
- dispatcher.getContainers(arvadosclient.Dict{
- "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+ d.mtx.Unlock()
+ if len(running) == 0 {
+ // API bug: ["uuid", "not in", []] does not match everything
+ running = []string{"X"}
}
+ d.checkForUpdates([][]interface{}{
+ {"uuid", "in", running}})
+ d.checkForUpdates([][]interface{}{
+ {"state", "=", Queued},
+ {"priority", ">", "0"},
+ {"uuid", "not in", running}})
+ d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID},
+ {"uuid", "not in", running}})
select {
- case <-ticker.C:
- case <-stop:
- return
+ case <-poll.C:
+ continue
+ case <-ctx.Done():
+ return ctx.Err()
}
}
}
-func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
- if container.State == Queued && dispatcher.checkMine(container, false) {
- // If we previously started the job, something failed, and it
- // was re-queued, this dispatcher might still be monitoring it.
- // Stop the existing monitor, then try to lock and run it
- // again.
- dispatcher.notMine(container.UUID)
- }
+func (d *Dispatcher) start(c arvados.Container) *runner {
+ ex := &runner{
+ updates: make(chan arvados.Container, 1),
+ }
+ if d.running == nil {
+ d.running = make(map[string]*runner)
+ }
+ d.running[c.UUID] = ex
+ go func() {
+ d.RunContainer(d, c, ex.updates)
+ d.mtx.Lock()
+ delete(d.running, c.UUID)
+ d.mtx.Unlock()
+ }()
+ return ex
+}
- if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
- // If container is Complete, Cancelled, or Queued, LockedByUUID
- // will be nil. If the container was formerly Locked, moved
- // back to Queued and then locked by another dispatcher,
- // LockedByUUID will be different. In either case, we want
- // to stop monitoring it.
- log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
- dispatcher.notMine(container.UUID)
- return
- }
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+ params := arvadosclient.Dict{
+ "filters": filters,
+ "order": []string{"priority desc"},
+ "limit": "1000"}
- if dispatcher.checkMine(container, true) {
- // Already monitored, sent status update
+ var list arvados.ContainerList
+ err := d.Arv.List("containers", params, &list)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
return
}
- if container.State == Queued && container.Priority > 0 {
- if !dispatcher.throttle.Check(container.UUID) {
- return
- }
- // Try to take the lock
- if err := dispatcher.Lock(container.UUID); err != nil {
- return
+ if list.ItemsAvailable > len(list.Items) {
+ // TODO: support paging
+ log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+ list.ItemsAvailable,
+ len(list.Items))
+ }
+
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ for _, c := range list.Items {
+ ex, running := d.running[c.UUID]
+ if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
+ log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ } else if running {
+ switch c.State {
+ case Queued:
+ ex.close()
+ case Locked, Running:
+ ex.update(c)
+ case Cancelled, Complete:
+ ex.close()
+ }
+ } else {
+ switch c.State {
+ case Queued:
+ if err := d.lock(c.UUID); err != nil {
+ log.Printf("Error locking container %s: %s", c.UUID, err)
+ } else {
+ c.State = Locked
+ d.start(c).update(c)
+ }
+ case Locked, Running:
+ d.start(c).update(c)
+ case Cancelled, Complete:
+ ex.close()
+ }
}
- container.State = Locked
- }
-
- if container.State == Locked || container.State == Running {
- // Not currently monitored but in Locked or Running state and
- // owned by this dispatcher, so start monitoring.
- go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
}
}
// UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
- err := dispatcher.Arv.Update("containers", uuid,
+func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
+ err := d.Arv.Update("containers", uuid,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": newState}},
- nil)
+ "container": arvadosclient.Dict{"state": state},
+ }, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
+ log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
// Lock makes the lock API call which updates the state of a container to Locked.
-func (dispatcher *Dispatcher) Lock(uuid string) error {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
- if err != nil {
- log.Printf("Error locking container %s: %q", uuid, err)
- }
- return err
+func (d *Dispatcher) lock(uuid string) error {
+ return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
}
// Unlock makes the unlock API call which updates the state of a container to Queued.
-func (dispatcher *Dispatcher) Unlock(uuid string) error {
- err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
- if err != nil {
- log.Printf("Error unlocking container %s: %q", uuid, err)
- }
- return err
-}
-
-// Stop causes Run to return after the current polling cycle.
-func (dispatcher *Dispatcher) Stop() {
- if dispatcher.stop == nil {
- // already stopped
- return
- }
- close(dispatcher.stop)
- dispatcher.stop = nil
-}
-
-// Run runs the main loop of the dispatcher.
-func (dispatcher *Dispatcher) Run() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
- if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
- return
- }
-
- dispatcher.mineMap = make(map[string]chan arvados.Container)
- dispatcher.stop = make(chan struct{})
- dispatcher.throttle.hold = dispatcher.MinRetryPeriod
- dispatcher.pollContainers(dispatcher.stop)
- return nil
+func (d *Dispatcher) Unlock(uuid string) error {
+ return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index bb3c05c..22f7d8b 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -3,10 +3,8 @@ package main
// Dispatcher service for Crunch that runs containers locally.
import (
+ "context"
"flag"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
"log"
"os"
"os/exec"
@@ -14,6 +12,10 @@ import (
"sync"
"syscall"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
)
func main() {
@@ -61,7 +63,8 @@ func doMain() error {
PollPeriod: time.Duration(*pollInterval) * time.Second,
}
- err = dispatcher.Run()
+ ctx, cancel := context.WithCancel(context.Background())
+ err = dispatcher.Run(ctx)
if err != nil {
return err
}
@@ -72,7 +75,7 @@ func doMain() error {
log.Printf("Received %s, shutting down", sig)
signal.Stop(c)
- dispatcher.Stop()
+ cancel()
runningCmdsMutex.Lock()
// Finished dispatching; interrupt any crunch jobs that are still running
@@ -103,7 +106,7 @@ var startCmd = startFunc
// crunch-run terminates, mark the container as Cancelled.
func run(dispatcher *dispatch.Dispatcher,
container arvados.Container,
- status chan arvados.Container) {
+ status <-chan arvados.Container) {
uuid := container.UUID
@@ -170,8 +173,7 @@ func run(dispatcher *dispatch.Dispatcher,
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- if container.LockedByUUID == dispatcher.Auth.UUID &&
- (container.State == dispatch.Locked || container.State == dispatch.Running) {
+ if container.State == dispatch.Locked || container.State == dispatch.Running {
log.Printf("After %s process termination, container state for %v is %q. Updating it to %q",
*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index ed13a41..a72d65b 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -2,11 +2,7 @@ package main
import (
"bytes"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
- . "gopkg.in/check.v1"
+ "context"
"io"
"log"
"net/http"
@@ -16,6 +12,12 @@ import (
"strings"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ . "gopkg.in/check.v1"
)
// Gocheck boilerplate
@@ -62,14 +64,13 @@ func (s *TestSuite) TestIntegration(c *C) {
echo := "echo"
crunchRunCommand = &echo
+ ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Second,
- RunContainer: func(dispatcher *dispatch.Dispatcher,
- container arvados.Container,
- status chan arvados.Container) {
- run(dispatcher, container, status)
- dispatcher.Stop()
+ RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+ run(d, c, s)
+ cancel()
},
}
@@ -79,8 +80,8 @@ func (s *TestSuite) TestIntegration(c *C) {
return cmd.Start()
}
- err = dispatcher.Run()
- c.Assert(err, IsNil)
+ err = dispatcher.Run(ctx)
+ c.Assert(err, Equals, context.Canceled)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
@@ -165,14 +166,13 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
*crunchRunCommand = crunchCmd
+ ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
- RunContainer: func(dispatcher *dispatch.Dispatcher,
- container arvados.Container,
- status chan arvados.Container) {
- run(dispatcher, container, status)
- dispatcher.Stop()
+ RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+ run(d, c, s)
+ cancel()
},
}
@@ -186,11 +186,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- dispatcher.Stop()
+ cancel()
}()
- err := dispatcher.Run()
- c.Assert(err, IsNil)
+ err := dispatcher.Run(ctx)
+ c.Assert(err, Equals, context.Canceled)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 476ca1f..617b076 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -4,6 +4,7 @@ package main
import (
"bytes"
+ "context"
"flag"
"fmt"
"log"
@@ -46,7 +47,7 @@ func main() {
var (
theConfig Config
- sqCheck SqueueChecker
+ sqCheck = &SqueueChecker{}
)
const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -107,10 +108,10 @@ func doMain() error {
}
arv.Retries = 25
- sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+ sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
defer sqCheck.Stop()
- dispatcher := dispatch.Dispatcher{
+ dispatcher := &dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
PollPeriod: time.Duration(theConfig.PollPeriod),
@@ -121,7 +122,7 @@ func doMain() error {
log.Printf("Error notifying init daemon: %v", err)
}
- return dispatcher.Run()
+ return dispatcher.Run(context.Background())
}
// sbatchCmd
@@ -184,97 +185,74 @@ func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunch
}
}
-// If the container is marked as Locked, check if it is already in the slurm
-// queue. If not, submit it.
-//
-// If the container is marked as Running, check if it is in the slurm queue.
-// If not, mark it as Cancelled.
-func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
- submitted := false
- for !*monitorDone {
- if sqCheck.HasUUID(container.UUID) {
- // Found in the queue, so continue monitoring
- submitted = true
- } else if container.State == dispatch.Locked && !submitted {
- // Not in queue but in Locked state and we haven't
- // submitted it yet, so submit it.
-
- log.Printf("About to submit queued container %v", container.UUID)
-
- if err := submit(dispatcher, container, theConfig.CrunchRunCommand); err != nil {
- log.Printf("Error submitting container %s to slurm: %v",
- container.UUID, err)
- // maybe sbatch is broken, put it back to queued
- dispatcher.Unlock(container.UUID)
- }
- submitted = true
- } else {
- // Not in queue and we are not going to submit it.
- // Refresh the container state. If it is
- // Complete/Cancelled, do nothing, if it is Locked then
- // release it back to the Queue, if it is Running then
- // clean up the record.
-
- var con arvados.Container
- err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
- if err != nil {
- log.Printf("Error getting final container state: %v", err)
- }
+// Submit a container to the slurm queue (or resume monitoring if it's
+// already in the queue). Cancel the slurm job if the container's
+// priority changes to zero or its state indicates it's no longer
+// running.
+func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+ log.Printf("Submitting container %s to slurm", ctr.UUID)
+ if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
+ log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ disp.Unlock(ctr.UUID)
+ return
+ }
+ }
- switch con.State {
- case dispatch.Locked:
- log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
- container.UUID, con.State, dispatch.Queued)
- dispatcher.Unlock(container.UUID)
+ log.Printf("Start monitoring container %s", ctr.UUID)
+ defer log.Printf("Done monitoring container %s", ctr.UUID)
+
+ // If the container disappears from the slurm queue, there is
+ // no point in waiting for further dispatch updates: just
+ // clean up and return.
+ go func(uuid string) {
+ for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+ }
+ cancel()
+ }(ctr.UUID)
+
+ for {
+ select {
+ case <-ctx.Done():
+ // Disappeared from squeue
+ if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+ log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
+ }
+ switch ctr.State {
case dispatch.Running:
- st := dispatch.Cancelled
- log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
- container.UUID, con.State, st)
- dispatcher.UpdateState(container.UUID, st)
- default:
- // Container state is Queued, Complete or Cancelled so stop monitoring it.
- return
+ disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+ case dispatch.Locked:
+ disp.Unlock(ctr.UUID)
+ }
+ return
+ case updated, ok := <-status:
+ if !ok {
+ log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
+ scancel(ctr)
+ } else if updated.Priority == 0 {
+ log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+ scancel(ctr)
}
}
}
}
-// Run or monitor a container.
-//
-// Monitor status updates. If the priority changes to zero, cancel the
-// container using scancel.
-func run(dispatcher *dispatch.Dispatcher,
- container arvados.Container,
- status chan arvados.Container) {
-
- log.Printf("Monitoring container %v started", container.UUID)
- defer log.Printf("Monitoring container %v finished", container.UUID)
-
- monitorDone := false
- go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
-
- for container = range status {
- if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
- log.Printf("Canceling container %s", container.UUID)
- // Mutex between squeue sync and running sbatch or scancel.
- sqCheck.L.Lock()
- cmd := scancelCmd(container)
- msg, err := cmd.CombinedOutput()
- sqCheck.L.Unlock()
-
- if err != nil {
- log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
- if sqCheck.HasUUID(container.UUID) {
- log.Printf("Container %s is still in squeue after scancel.", container.UUID)
- continue
- }
- }
+func scancel(ctr arvados.Container) {
+ sqCheck.L.Lock()
+ cmd := scancelCmd(ctr)
+ msg, err := cmd.CombinedOutput()
+ sqCheck.L.Unlock()
- // Ignore errors; if necessary, we'll try again next time
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
- }
+ if err != nil {
+ log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ time.Sleep(time.Second)
+ } else if sqCheck.HasUUID(ctr.UUID) {
+ log.Printf("container %s is still in squeue after scancel", ctr.UUID)
+ time.Sleep(time.Second)
}
- monitorDone = true
}
func readConfig(dst interface{}, path string) error {
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 8809e7b..e6abb16 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -2,11 +2,8 @@ package main
import (
"bytes"
+ "context"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
"io"
"io/ioutil"
"log"
@@ -18,6 +15,10 @@ import (
"testing"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
. "gopkg.in/check.v1"
)
@@ -59,30 +60,50 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
}
func (s *TestSuite) TestIntegrationNormal(c *C) {
- container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ done := false
+ container := s.integrationTest(c,
+ func() *exec.Cmd {
+ if done {
+ return exec.Command("true")
+ } else {
+ return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+ }
+ },
[]string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ done = true
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
func (s *TestSuite) TestIntegrationCancel(c *C) {
-
- // Override sbatchCmd
+ var cmd *exec.Cmd
var scancelCmdLine []string
defer func(orig func(arvados.Container) *exec.Cmd) {
scancelCmd = orig
}(scancelCmd)
+ attempt := 0
scancelCmd = func(container arvados.Container) *exec.Cmd {
- scancelCmdLine = scancelFunc(container).Args
- return exec.Command("echo")
+ if attempt++; attempt == 1 {
+ return exec.Command("false")
+ } else {
+ scancelCmdLine = scancelFunc(container).Args
+ cmd = exec.Command("echo")
+ return cmd
+ }
}
container := s.integrationTest(c,
- func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ func() *exec.Cmd {
+ if cmd != nil && cmd.ProcessState != nil {
+ return exec.Command("true")
+ } else {
+ return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+ }
+ },
[]string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -146,22 +167,21 @@ func (s *TestSuite) integrationTest(c *C,
theConfig.CrunchRunCommand = []string{"echo"}
+ ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
- RunContainer: func(dispatcher *dispatch.Dispatcher,
- container arvados.Container,
- status chan arvados.Container) {
- go runContainer(dispatcher, container)
- run(dispatcher, container, status)
- dispatcher.Stop()
+ RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+ go runContainer(disp, ctr)
+ run(disp, ctr, status)
+ cancel()
},
}
- sqCheck = SqueueChecker{Period: 500 * time.Millisecond}
+ sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
- err = dispatcher.Run()
- c.Assert(err, IsNil)
+ err = dispatcher.Run(ctx)
+ c.Assert(err, Equals, context.Canceled)
sqCheck.Stop()
@@ -207,19 +227,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
theConfig.CrunchRunCommand = []string{crunchCmd}
+ ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
- RunContainer: func(dispatcher *dispatch.Dispatcher,
- container arvados.Container,
- status chan arvados.Container) {
+ RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
time.Sleep(1 * time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ disp.UpdateState(ctr.UUID, dispatch.Running)
+ disp.UpdateState(ctr.UUID, dispatch.Complete)
}()
- run(dispatcher, container, status)
- dispatcher.Stop()
+ run(disp, ctr, status)
+ cancel()
},
}
@@ -227,11 +246,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- dispatcher.Stop()
+ cancel()
}()
- err := dispatcher.Run()
- c.Assert(err, IsNil)
+ err := dispatcher.Run(ctx)
+ c.Assert(err, Equals, context.Canceled)
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
commit 4fd89ed7b10f0860a6030c25e44d4df45a087b2e
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Feb 9 12:24:20 2017 -0500
10701: Tidy up error handling.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index fe559f1..476ca1f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -151,18 +151,7 @@ var sbatchCmd = sbatchFunc
var scancelCmd = scancelFunc
// Submit job to slurm using sbatch.
-func submit(dispatcher *dispatch.Dispatcher,
- container arvados.Container, crunchRunCommand []string) (submitErr error) {
- defer func() {
- // If we didn't get as far as submitting a slurm job,
- // unlock the container and return it to the queue.
- if submitErr == nil {
- // OK, no cleanup needed
- return
- }
- dispatcher.Unlock(container.UUID)
- }()
-
+func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
cmd := sbatchCmd(container)
// Send a tiny script on stdin to execute the crunch-run
@@ -179,13 +168,18 @@ func submit(dispatcher *dispatch.Dispatcher,
log.Printf("exec sbatch %+q", cmd.Args)
err := cmd.Run()
+
switch err.(type) {
case nil:
log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
return nil
+
case *exec.ExitError:
+ dispatcher.Unlock(container.UUID)
return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
+
default:
+ dispatcher.Unlock(container.UUID)
return fmt.Errorf("exec failed: %v", err)
}
}
commit 8ee2c7c4f231a55601fdc90b087e42985b52fb20
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Feb 9 12:15:30 2017 -0500
10701: Fix edge case: scheduling_parameters.partitions=[]
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 725d6a4..fe559f1 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -134,7 +134,7 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
- if container.SchedulingParameters.Partitions != nil {
+ if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list