[ARVADOS] created: c405873d87e0764acf3855c559c85fa6d7a63cfb
Git user
git at public.curoverse.com
Mon Jun 6 14:12:04 EDT 2016
at c405873d87e0764acf3855c559c85fa6d7a63cfb (commit)
commit c405873d87e0764acf3855c559c85fa6d7a63cfb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 6 10:44:11 2016 -0400
9187: Remove "squeueError" because checkSqueue for a successful squeue run. Refactor tests a bit and add a test for canceling containers.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 1dada2f..f718fbc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -78,8 +78,14 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
fmt.Sprintf("--priority=%d", container.Priority))
}
+// scancelCmd
+func scancelFunc(container dispatch.Container) *exec.Cmd {
+ return exec.Command("scancel", "--name="+container.UUID)
+}
+
// Wrap these so that they can be overridden by tests
var sbatchCmd = sbatchFunc
+var scancelCmd = scancelFunc
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
@@ -178,10 +184,7 @@ func submit(dispatcher *dispatch.Dispatcher,
func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
- if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
- // Most recent run of squeue had an error, so do nothing.
- continue
- } else if inQ {
+ if squeueUpdater.CheckSqueue(container.UUID) {
// Found in the queue, so continue monitoring
submitted = true
} else if container.State == dispatch.Locked && !submitted {
@@ -249,15 +252,13 @@ func run(dispatcher *dispatch.Dispatcher,
// Mutex between squeue sync and running sbatch or scancel.
squeueUpdater.SlurmLock.Lock()
- err := exec.Command("scancel", "--name="+container.UUID).Run()
+ err := scancelCmd(container).Run()
squeueUpdater.SlurmLock.Unlock()
if err != nil {
log.Printf("Error stopping container %s with scancel: %v",
container.UUID, err)
- if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
- continue
- } else if inQ {
+ if squeueUpdater.CheckSqueue(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.",
container.UUID)
continue
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index be347e4..cddbe8c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -8,7 +8,6 @@ import (
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"io"
"log"
- "math"
"net/http"
"net/http/httptest"
"os"
@@ -58,14 +57,60 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
}
func (s *TestSuite) TestIntegrationNormal(c *C) {
- s.integrationTest(c, false)
+ container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ []string(nil),
+ func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(3 * time.Second)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, "Complete")
+}
+
+func (s *TestSuite) TestIntegrationCancel(c *C) {
+
+ // Override sbatchCmd
+ var scancelCmdLine []string
+ defer func(orig func(dispatch.Container) *exec.Cmd) {
+ scancelCmd = orig
+ }(scancelCmd)
+ scancelCmd = func(container dispatch.Container) *exec.Cmd {
+ scancelCmdLine = scancelFunc(container).Args
+ return exec.Command("echo")
+ }
+
+ container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ []string(nil),
+ func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(1 * time.Second)
+ dispatcher.Arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"priority": 0}},
+ nil)
+ })
+ c.Check(container.State, Equals, "Cancelled")
+ c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
}
func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- s.integrationTest(c, true)
+ container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
+ fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
+ fmt.Sprintf("--mem-per-cpu=%d", 2862),
+ fmt.Sprintf("--cpus-per-task=%d", 4),
+ fmt.Sprintf("--priority=%d", 1)},
+ func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(3 * time.Second)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, "Cancelled")
}
-func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+func (s *TestSuite) integrationTest(c *C,
+ newSqueueCmd func() *exec.Cmd,
+ sbatchCmdComps []string,
+ runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
arvadostest.ResetEnv()
arv, err := arvadosclient.MakeArvadosClient()
@@ -86,13 +131,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
defer func(orig func() *exec.Cmd) {
squeueCmd = orig
}(squeueCmd)
- squeueCmd = func() *exec.Cmd {
- if missingFromSqueue {
- return exec.Command("echo")
- } else {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
- }
- }
+ squeueCmd = newSqueueCmd
// There should be no queued containers now
params := arvadosclient.Dict{
@@ -113,11 +152,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
RunContainer: func(dispatcher *dispatch.Dispatcher,
container dispatch.Container,
status chan dispatch.Container) {
- go func() {
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(3 * time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
- }()
+ go runContainer(dispatcher, container)
run(dispatcher, container, status)
doneProcessing <- struct{}{}
},
@@ -130,20 +165,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
squeueUpdater.Done()
- item := containers.Items[0]
- sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
- fmt.Sprintf("--job-name=%s", item.UUID),
- fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
- fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
- fmt.Sprintf("--priority=%d", item.Priority)}
-
- if missingFromSqueue {
- // not in squeue when run() started, so it will have called sbatch
- c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
- } else {
- // already in squeue when run() started, will have just monitored it instead
- c.Check(sbatchCmdLine, DeepEquals, []string(nil))
- }
+ c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
// There should be no queued containers now
err = arv.List("containers", params, &containers)
@@ -154,11 +176,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- if missingFromSqueue {
- c.Check(container.State, Equals, "Cancelled")
- } else {
- c.Check(container.State, Equals, "Complete")
- }
+ return container
}
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 3ee8b6f..34e6632 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -13,7 +13,6 @@ import (
type Squeue struct {
squeueContents []string
squeueDone chan struct{}
- squeueError error
squeueCond *sync.Cond
SlurmLock sync.Mutex
}
@@ -25,10 +24,10 @@ func squeueFunc() *exec.Cmd {
var squeueCmd = squeueFunc
-// RunSqueue runs squeue once and captures the output. If there is an error,
-// set "squeueError". If it succeeds, set "squeueContents" and then wake up
-// any goroutines waiting squeueCond in CheckSqueue().
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output. If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue(). If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
var newSqueueContents []string
// Mutex between squeue sync and running sbatch or scancel. This
@@ -39,15 +38,12 @@ func (squeue *Squeue) RunSqueue() error {
defer squeue.SlurmLock.Unlock()
// Also ensure unlock on all return paths
- defer squeue.squeueCond.L.Unlock()
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
cmd.Start()
scanner := bufio.NewScanner(sq)
@@ -57,55 +53,43 @@ func (squeue *Squeue) RunSqueue() error {
if err := scanner.Err(); err != nil {
cmd.Wait()
log.Printf("Error reading from squeue pipe: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
err = cmd.Wait()
if err != nil {
log.Printf("Error running squeue: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
squeue.squeueCond.L.Lock()
- squeue.squeueError = nil
squeue.squeueContents = newSqueueContents
squeue.squeueCond.Broadcast()
-
- return nil
+ squeue.squeueCond.L.Unlock()
}
// CheckSqueue checks if a given container UUID is in the slurm queue. This
// does not run squeue directly, but instead blocks until woken up by next
// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
squeue.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
squeue.squeueCond.Wait()
- if squeue.squeueError != nil {
- e := squeue.squeueError
- squeue.squeueCond.L.Unlock()
- return false, e
- }
contents := squeue.squeueContents
squeue.squeueCond.L.Unlock()
for _, k := range contents {
if k == uuid {
- return true, nil
+ return true
}
}
- return false, nil
+ return false
}
// StartMonitor starts the squeue monitoring goroutine.
func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
squeue.squeueCond = sync.NewCond(&sync.Mutex{})
squeue.squeueDone = make(chan struct{})
- squeue.RunSqueue()
go squeue.SyncSqueue(pollInterval)
}
commit d45be86b354adec485504bfc09f41e0e22241f34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jun 3 17:57:48 2016 -0400
9187: Fix refactoring messup
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index e157469..3ee8b6f 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -35,18 +35,18 @@ func (squeue *Squeue) RunSqueue() error {
// establishes a sequence so that squeue doesn't run concurrently with
// sbatch or scancel; the next update of squeue will occur only after
// sbatch or scancel has completed.
- squeueUpdater.SlurmLock.Lock()
- defer squeueUpdater.SlurmLock.Unlock()
+ squeue.SlurmLock.Lock()
+ defer squeue.SlurmLock.Unlock()
// Also ensure unlock on all return paths
- defer squeueUpdater.squeueCond.L.Unlock()
+ defer squeue.squeueCond.L.Unlock()
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
cmd.Start()
@@ -57,23 +57,23 @@ func (squeue *Squeue) RunSqueue() error {
if err := scanner.Err(); err != nil {
cmd.Wait()
log.Printf("Error reading from squeue pipe: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
err = cmd.Wait()
if err != nil {
log.Printf("Error running squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = nil
- squeueUpdater.squeueContents = newSqueueContents
- squeueUpdater.squeueCond.Broadcast()
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = nil
+ squeue.squeueContents = newSqueueContents
+ squeue.squeueCond.Broadcast()
return nil
}
@@ -82,16 +82,16 @@ func (squeue *Squeue) RunSqueue() error {
// does not run squeue directly, but instead blocks until woken up by next
// successful update of squeue.
func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
- squeueUpdater.squeueCond.L.Lock()
+ squeue.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
- squeueUpdater.squeueCond.Wait()
- if squeueUpdater.squeueError != nil {
- e := squeueUpdater.squeueError
- squeueUpdater.squeueCond.L.Unlock()
+ squeue.squeueCond.Wait()
+ if squeue.squeueError != nil {
+ e := squeue.squeueError
+ squeue.squeueCond.L.Unlock()
return false, e
}
- contents := squeueUpdater.squeueContents
- squeueUpdater.squeueCond.L.Unlock()
+ contents := squeue.squeueContents
+ squeue.squeueCond.L.Unlock()
for _, k := range contents {
if k == uuid {
@@ -103,16 +103,16 @@ func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
// StartMonitor starts the squeue monitoring goroutine.
func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
- squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
- squeueUpdater.squeueDone = make(chan struct{})
- squeueUpdater.RunSqueue()
- go squeueUpdater.SyncSqueue(pollInterval)
+ squeue.squeueCond = sync.NewCond(&sync.Mutex{})
+ squeue.squeueDone = make(chan struct{})
+ squeue.RunSqueue()
+ go squeue.SyncSqueue(pollInterval)
}
// Done stops the squeue monitoring goroutine.
func (squeue *Squeue) Done() {
- squeueUpdater.squeueDone <- struct{}{}
- close(squeueUpdater.squeueDone)
+ squeue.squeueDone <- struct{}{}
+ close(squeue.squeueDone)
}
// SyncSqueue periodically polls RunSqueue() at the given duration until
@@ -121,10 +121,10 @@ func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
ticker := time.NewTicker(pollInterval)
for {
select {
- case <-squeueUpdater.squeueDone:
+ case <-squeue.squeueDone:
return
case <-ticker.C:
- squeueUpdater.RunSqueue()
+ squeue.RunSqueue()
}
}
}
commit bb10b7777ed6db229fbb35e6a829bec4e8efcd23
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 2 22:46:55 2016 -0400
9187: Fix comment typo
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 785b6ec..fb7b5fb 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -176,7 +176,7 @@ func (dispatcher *Dispatcher) pollContainers() {
func (dispatcher *Dispatcher) handleUpdate(container Container) {
if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
// If container is Complete, Cancelled, or Queued, LockedByUUID
- // will be nil. If the container was formally Locked, moved
+ // will be nil. If the container was formerly Locked, moved
// back to Queued and then locked by another dispatcher,
// LockedByUUID will be different. In either case, we want
// to stop monitoring it.
commit 5cf8c18e735bb15da3f131e7ae57bb4b222bb4ed
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 2 22:18:55 2016 -0400
9187: Add documentation comments to Squeue functions.
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index b86a4d9..e157469 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -8,6 +8,8 @@ import (
"time"
)
+// Squeue implements asynchronous polling monitor of the SLURM queue using the
+// command 'squeue'.
type Squeue struct {
squeueContents []string
squeueDone chan struct{}
@@ -23,6 +25,9 @@ func squeueFunc() *exec.Cmd {
var squeueCmd = squeueFunc
+// RunSqueue runs squeue once and captures the output. If there is an error,
+// set "squeueError". If it succeeds, set "squeueContents" and then wake up
+// any goroutines waiting squeueCond in CheckSqueue().
func (squeue *Squeue) RunSqueue() error {
var newSqueueContents []string
@@ -73,8 +78,9 @@ func (squeue *Squeue) RunSqueue() error {
return nil
}
-// Check if a container UUID is in the slurm queue. This will block until the
-// next successful update from SLURM.
+// CheckSqueue checks if a given container UUID is in the slurm queue. This
+// does not run squeue directly, but instead blocks until woken up by next
+// successful update of squeue.
func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
squeueUpdater.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
@@ -95,6 +101,7 @@ func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
return false, nil
}
+// StartMonitor starts the squeue monitoring goroutine.
func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
squeueUpdater.squeueDone = make(chan struct{})
@@ -102,11 +109,14 @@ func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
go squeueUpdater.SyncSqueue(pollInterval)
}
+// Done stops the squeue monitoring goroutine.
func (squeue *Squeue) Done() {
squeueUpdater.squeueDone <- struct{}{}
close(squeueUpdater.squeueDone)
}
+// SyncSqueue periodically polls RunSqueue() at the given duration until
+// terminated by calling Done().
func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
ticker := time.NewTicker(pollInterval)
for {
commit d77c4cc58d393c48ce46b987f6eada7c7cc381c6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 2 17:59:20 2016 -0400
9187: Improve squeue synchronization
* Put squeue functions into separate file.
* CheckSqueue() now blocks on a condition variable until the next successful
update of squeue, which then wakes up all goroutines waiting on CheckSqueue().
* Never do anything when squeue returns an error.
* Merge submitting, monitoring, and cleanup behaviors into a single goroutine
which updates based on CheckSqueue() instead of a ticker.
* Introduce a lock on squeue, sbatch and scancel operations, so that on next
wakeup the queue is guaranteed to reflect most recent sbatch/scancel
operations.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 3e14820..1dada2f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,7 +3,6 @@ package main
// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "bufio"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -14,16 +13,9 @@ import (
"os"
"os/exec"
"strings"
- "sync"
"time"
)
-type Squeue struct {
- sync.Mutex
- squeueContents []string
- SqueueDone chan struct{}
-}
-
func main() {
err := doMain()
if err != nil {
@@ -59,23 +51,20 @@ func doMain() error {
}
arv.Retries = 25
+ squeueUpdater.StartMonitor(time.Duration(*pollInterval) * time.Second)
+ defer squeueUpdater.Done()
+
dispatcher := dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
PollInterval: time.Duration(*pollInterval) * time.Second,
DoneProcessing: make(chan struct{})}
- squeueUpdater.SqueueDone = make(chan struct{})
- go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
-
err = dispatcher.RunDispatcher()
if err != nil {
return err
}
- squeueUpdater.SqueueDone <- struct{}{}
- close(squeueUpdater.SqueueDone)
-
return nil
}
@@ -89,14 +78,8 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
fmt.Sprintf("--priority=%d", container.Priority))
}
-// squeueFunc
-func squeueFunc() *exec.Cmd {
- return exec.Command("squeue", "--format=%j")
-}
-
// Wrap these so that they can be overridden by tests
var sbatchCmd = sbatchFunc
-var squeueCmd = squeueFunc
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
@@ -139,6 +122,10 @@ func submit(dispatcher *dispatch.Dispatcher,
return
}
+ // Mutex between squeue sync and running sbatch or scancel.
+ squeueUpdater.SlurmLock.Lock()
+ defer squeueUpdater.SlurmLock.Unlock()
+
err := cmd.Start()
if err != nil {
submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
@@ -183,90 +170,24 @@ func submit(dispatcher *dispatch.Dispatcher,
return
}
-func (squeue *Squeue) runSqueue() ([]string, error) {
- var newSqueueContents []string
-
- cmd := squeueCmd()
- sq, err := cmd.StdoutPipe()
- if err != nil {
- return nil, err
- }
- cmd.Start()
- scanner := bufio.NewScanner(sq)
- for scanner.Scan() {
- newSqueueContents = append(newSqueueContents, scanner.Text())
- }
- if err := scanner.Err(); err != nil {
- cmd.Wait()
- return nil, err
- }
-
- err = cmd.Wait()
- if err != nil {
- return nil, err
- }
-
- return newSqueueContents, nil
-}
-
-func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
- if check {
- n, err := squeue.runSqueue()
- if err != nil {
- return false, err
- }
- squeue.Lock()
- squeue.squeueContents = n
- squeue.Unlock()
- }
-
- if uuid != "" {
- squeue.Lock()
- defer squeue.Unlock()
- for _, k := range squeue.squeueContents {
- if k == uuid {
- return true, nil
- }
- }
- }
- return false, nil
-}
-
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
- // TODO: considering using "squeue -i" instead of polling squeue.
- ticker := time.NewTicker(pollInterval)
- for {
- select {
- case <-squeueUpdater.SqueueDone:
- return
- case <-ticker.C:
- squeue.CheckSqueue("", true)
- }
- }
-}
-
-// Run or monitor a container.
-//
// If the container is marked as Locked, check if it is already in the slurm
// queue. If not, submit it.
//
// If the container is marked as Running, check if it is in the slurm queue.
// If not, mark it as Cancelled.
-//
-// Monitor status updates. If the priority changes to zero, cancel the
-// container using scancel.
-func run(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
+ submitted := false
+ for !*monitorDone {
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
+ // Most recent run of squeue had an error, so do nothing.
+ continue
+ } else if inQ {
+ // Found in the queue, so continue monitoring
+ submitted = true
+ } else if container.State == dispatch.Locked && !submitted {
+ // Not in queue but in Locked state and we haven't
+ // submitted it yet, so submit it.
- uuid := container.UUID
-
- if container.State == dispatch.Locked {
- if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
- // maybe squeue is broken, put it back in the queue
- log.Printf("Error running squeue: %v", err)
- dispatcher.UpdateState(container.UUID, dispatch.Queued)
- } else if !inQ {
log.Printf("About to submit queued container %v", container.UUID)
if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
@@ -275,64 +196,66 @@ func run(dispatcher *dispatch.Dispatcher,
// maybe sbatch is broken, put it back to queued
dispatcher.UpdateState(container.UUID, dispatch.Queued)
}
+ submitted = true
+ } else {
+ // Not in queue and we are not going to submit it.
+ // Refresh the container state. If it is
+ // Complete/Cancelled, do nothing, if it is Locked then
+ // release it back to the Queue, if it is Running then
+ // clean up the record.
+
+ var con dispatch.Container
+ err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
+ if err != nil {
+ log.Printf("Error getting final container state: %v", err)
+ }
+
+ var st string
+ switch con.State {
+ case dispatch.Locked:
+ st = dispatch.Queued
+ case dispatch.Running:
+ st = dispatch.Cancelled
+ default:
+ // Container state is Queued, Complete or Cancelled so stop monitoring it.
+ return
+ }
+
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, st)
+ dispatcher.UpdateState(container.UUID, st)
}
}
+}
- log.Printf("Monitoring container %v started", uuid)
-
- // periodically check squeue
- doneSqueue := make(chan struct{})
- go func() {
- squeueUpdater.CheckSqueue(container.UUID, true)
- ticker := time.NewTicker(dispatcher.PollInterval)
- for {
- select {
- case <-ticker.C:
- if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
- log.Printf("Error running squeue: %v", err)
- // don't cancel, just leave it the way it is
- } else if !inQ {
- var con dispatch.Container
- err := dispatcher.Arv.Get("containers", uuid, nil, &con)
- if err != nil {
- log.Printf("Error getting final container state: %v", err)
- }
+// Run or monitor a container.
+//
+// Monitor status updates. If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
- var st string
- switch con.State {
- case dispatch.Locked:
- st = dispatch.Queued
- case dispatch.Running:
- st = dispatch.Cancelled
- default:
- st = ""
- }
+ log.Printf("Monitoring container %v started", container.UUID)
+ defer log.Printf("Monitoring container %v finished", container.UUID)
- if st != "" {
- log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
- uuid, con.State, st)
- dispatcher.UpdateState(uuid, st)
- }
- }
- case <-doneSqueue:
- close(doneSqueue)
- ticker.Stop()
- return
- }
- }
- }()
+ monitorDone := false
+ go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
for container = range status {
if container.State == dispatch.Locked || container.State == dispatch.Running {
if container.Priority == 0 {
log.Printf("Canceling container %s", container.UUID)
+ // Mutex between squeue sync and running sbatch or scancel.
+ squeueUpdater.SlurmLock.Lock()
err := exec.Command("scancel", "--name="+container.UUID).Run()
+ squeueUpdater.SlurmLock.Unlock()
+
if err != nil {
log.Printf("Error stopping container %s with scancel: %v",
container.UUID, err)
- if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
- log.Printf("Error running squeue: %v", err)
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
continue
} else if inQ {
log.Printf("Container %s is still in squeue after scancel.",
@@ -345,8 +268,5 @@ func run(dispatcher *dispatch.Dispatcher,
}
}
}
-
- doneSqueue <- struct{}{}
-
- log.Printf("Monitoring container %v finished", uuid)
+ monitorDone = true
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index d30c5df..be347e4 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -123,14 +123,12 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
},
DoneProcessing: doneProcessing}
- squeueUpdater.SqueueDone = make(chan struct{})
- go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+ squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
err = dispatcher.RunDispatcher()
c.Assert(err, IsNil)
- squeueUpdater.SqueueDone <- struct{}{}
- close(squeueUpdater.SqueueDone)
+ squeueUpdater.Done()
item := containers.Items[0]
sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
new file mode 100644
index 0000000..b86a4d9
--- /dev/null
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -0,0 +1,120 @@
+package main
+
+import (
+ "bufio"
+ "log"
+ "os/exec"
+ "sync"
+ "time"
+)
+
+type Squeue struct {
+ squeueContents []string
+ squeueDone chan struct{}
+ squeueError error
+ squeueCond *sync.Cond
+ SlurmLock sync.Mutex
+}
+
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+ return exec.Command("squeue", "--format=%j")
+}
+
+var squeueCmd = squeueFunc
+
+func (squeue *Squeue) RunSqueue() error {
+ var newSqueueContents []string
+
+ // Mutex between squeue sync and running sbatch or scancel. This
+ // establishes a sequence so that squeue doesn't run concurrently with
+ // sbatch or scancel; the next update of squeue will occur only after
+ // sbatch or scancel has completed.
+ squeueUpdater.SlurmLock.Lock()
+ defer squeueUpdater.SlurmLock.Unlock()
+
+ // Also ensure unlock on all return paths
+ defer squeueUpdater.squeueCond.L.Unlock()
+
+ cmd := squeueCmd()
+ sq, err := cmd.StdoutPipe()
+ if err != nil {
+ log.Printf("Error creating stdout pipe for squeue: %v", err)
+ squeueUpdater.squeueCond.L.Lock()
+ squeueUpdater.squeueError = err
+ return err
+ }
+ cmd.Start()
+ scanner := bufio.NewScanner(sq)
+ for scanner.Scan() {
+ newSqueueContents = append(newSqueueContents, scanner.Text())
+ }
+ if err := scanner.Err(); err != nil {
+ cmd.Wait()
+ log.Printf("Error reading from squeue pipe: %v", err)
+ squeueUpdater.squeueCond.L.Lock()
+ squeueUpdater.squeueError = err
+ return err
+ }
+
+ err = cmd.Wait()
+ if err != nil {
+ log.Printf("Error running squeue: %v", err)
+ squeueUpdater.squeueCond.L.Lock()
+ squeueUpdater.squeueError = err
+ return err
+ }
+
+ squeueUpdater.squeueCond.L.Lock()
+ squeueUpdater.squeueError = nil
+ squeueUpdater.squeueContents = newSqueueContents
+ squeueUpdater.squeueCond.Broadcast()
+
+ return nil
+}
+
+// Check if a container UUID is in the slurm queue. This will block until the
+// next successful update from SLURM.
+func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+ squeueUpdater.squeueCond.L.Lock()
+ // block until next squeue broadcast signaling an update.
+ squeueUpdater.squeueCond.Wait()
+ if squeueUpdater.squeueError != nil {
+ e := squeueUpdater.squeueError
+ squeueUpdater.squeueCond.L.Unlock()
+ return false, e
+ }
+ contents := squeueUpdater.squeueContents
+ squeueUpdater.squeueCond.L.Unlock()
+
+ for _, k := range contents {
+ if k == uuid {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
+ squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
+ squeueUpdater.squeueDone = make(chan struct{})
+ squeueUpdater.RunSqueue()
+ go squeueUpdater.SyncSqueue(pollInterval)
+}
+
+func (squeue *Squeue) Done() {
+ squeueUpdater.squeueDone <- struct{}{}
+ close(squeueUpdater.squeueDone)
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+ ticker := time.NewTicker(pollInterval)
+ for {
+ select {
+ case <-squeueUpdater.squeueDone:
+ return
+ case <-ticker.C:
+ squeueUpdater.RunSqueue()
+ }
+ }
+}
commit 3ae9a789410e93eeb31ca5670c17a6d03d77f608
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 1 16:06:26 2016 -0400
9187: Slurm dispatcher improvements around squeue
* Clarify that status updates are not guaranteed to be delivered on a
heartbeat.
* Refactor slurm dispatcher to monitor the container in squeue in a separate
goroutine.
* Refactor polling squeue to a single goroutine and cache the results so that
monitoring 100 containers doesn't result in 100 calls to squeue.
* No longer set up strigger to cancel job on finish, instead cancel running
jobs not in squeue.
* Test both cases where a job is/is not in squeue.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index a27971f..785b6ec 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,3 +1,6 @@
+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
package dispatch
import (
@@ -28,7 +31,7 @@ type apiClientAuthorizationList struct {
Items []apiClientAuthorization `json:"items"`
}
-// Container data
+// Represents an Arvados container record
type Container struct {
UUID string `json:"uuid"`
State string `json:"state"`
@@ -45,9 +48,27 @@ type ContainerList struct {
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
- Arv arvadosclient.ArvadosClient
- RunContainer func(*Dispatcher, Container, chan Container)
- PollInterval time.Duration
+ // The Arvados client
+ Arv arvadosclient.ArvadosClient
+
+ // When a new queued container appears and is either already owned by
+ // this dispatcher or is successfully locked, the dispatcher will call
+ // go RunContainer(). The RunContainer() goroutine gets a channel over
+ // which it will receive updates to the container state. The
+ // RunContainer() goroutine should only assume status updates come when
+ // the container record changes on the API server; if it needs to
+ // monitor the job submission to the underlying slurm/grid engine/etc
+ // queue it should spin up its own polling goroutines. When the
+ // channel is closed, that means the container is no longer being
+ // handled by this dispatcher and the goroutine should terminate. The
+ // goroutine is responsible for draining the 'status' channel, failure
+ // to do so may deadlock the dispatcher.
+ RunContainer func(*Dispatcher, Container, chan Container)
+
+ // Amount of time to wait between polling for updates.
+ PollInterval time.Duration
+
+ // Channel used to signal that RunDispatcher loop should exit.
DoneProcessing chan struct{}
mineMutex sync.Mutex
@@ -159,7 +180,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
// back to Queued and then locked by another dispatcher,
// LockedByUUID will be different. In either case, we want
// to stop monitoring it.
- log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+ log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
dispatcher.notMine(container.UUID)
return
}
@@ -191,7 +212,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
"container": arvadosclient.Dict{"state": newState}},
nil)
if err != nil {
- log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+ log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
}
return err
}
@@ -199,14 +220,6 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
// RunDispatcher runs the main loop of the dispatcher until receiving a message
// on the dispatcher.DoneProcessing channel. It also installs a signal handler
// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer(). If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer(). The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
func (dispatcher *Dispatcher) RunDispatcher() (err error) {
err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index aca60e9..0248f18 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -115,7 +115,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
+ testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to state \"Locked\"")
}
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -123,7 +123,7 @@ func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
apiStubResponses["/arvados/v1/containers"] =
arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
- arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+ arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
testWithServerStub(c, apiStubResponses, "echo",
`After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2". Updating it to "Cancelled"`)
@@ -142,7 +142,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
- arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+ arvadostest.StubResponse{200, string(`{"uuid": "` + arvadostest.Dispatch1AuthUUID + `", "api_token": "xyz"}`)}
apiStub := arvadostest.ServerStub{apiStubResponses}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 641b4bc..3e14820 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -14,9 +14,16 @@ import (
"os"
"os/exec"
"strings"
+ "sync"
"time"
)
+type Squeue struct {
+ sync.Mutex
+ squeueContents []string
+ SqueueDone chan struct{}
+}
+
func main() {
err := doMain()
if err != nil {
@@ -26,7 +33,7 @@ func main() {
var (
crunchRunCommand *string
- finishCommand *string
+ squeueUpdater Squeue
)
func doMain() error {
@@ -42,11 +49,6 @@ func doMain() error {
"/usr/bin/crunch-run",
"Crunch command to run container")
- finishCommand = flags.String(
- "finish-command",
- "/usr/bin/crunch-finish-slurm.sh",
- "Command to run from strigger when job is finished")
-
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
@@ -63,11 +65,17 @@ func doMain() error {
PollInterval: time.Duration(*pollInterval) * time.Second,
DoneProcessing: make(chan struct{})}
+ squeueUpdater.SqueueDone = make(chan struct{})
+ go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
+
err = dispatcher.RunDispatcher()
if err != nil {
return err
}
+ squeueUpdater.SqueueDone <- struct{}{}
+ close(squeueUpdater.SqueueDone)
+
return nil
}
@@ -81,19 +89,12 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
fmt.Sprintf("--priority=%d", container.Priority))
}
-// striggerCmd
-func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
- return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
- fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
-}
-
// squeueFunc
func squeueFunc() *exec.Cmd {
return exec.Command("squeue", "--format=%j")
}
// Wrap these so that they can be overridden by tests
-var striggerCmd = striggerFunc
var sbatchCmd = sbatchFunc
var squeueCmd = squeueFunc
@@ -182,44 +183,66 @@ func submit(dispatcher *dispatch.Dispatcher,
return
}
-// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
-// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
- insecure := "0"
- if arv.ApiInsecure {
- insecure = "1"
- }
- cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- err := cmd.Run()
- if err != nil {
- log.Printf("While setting up strigger: %v", err)
- // BUG: we drop the error here and forget about it. A
- // human has to notice the container is stuck in
- // Running state, and fix it manually.
- }
-}
+func (squeue *Squeue) runSqueue() ([]string, error) {
+ var newSqueueContents []string
-func checkSqueue(uuid string) (bool, error) {
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
- return false, err
+ return nil, err
}
cmd.Start()
- defer cmd.Wait()
scanner := bufio.NewScanner(sq)
- found := false
for scanner.Scan() {
- if scanner.Text() == uuid {
- found = true
- }
+ newSqueueContents = append(newSqueueContents, scanner.Text())
}
if err := scanner.Err(); err != nil {
- return false, err
+ cmd.Wait()
+ return nil, err
+ }
+
+ err = cmd.Wait()
+ if err != nil {
+ return nil, err
+ }
+
+ return newSqueueContents, nil
+}
+
+func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
+ if check {
+ n, err := squeue.runSqueue()
+ if err != nil {
+ return false, err
+ }
+ squeue.Lock()
+ squeue.squeueContents = n
+ squeue.Unlock()
+ }
+
+ if uuid != "" {
+ squeue.Lock()
+ defer squeue.Unlock()
+ for _, k := range squeue.squeueContents {
+ if k == uuid {
+ return true, nil
+ }
+ }
+ }
+ return false, nil
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+ // TODO: considering using "squeue -i" instead of polling squeue.
+ ticker := time.NewTicker(pollInterval)
+ for {
+ select {
+ case <-squeueUpdater.SqueueDone:
+ return
+ case <-ticker.C:
+ squeue.CheckSqueue("", true)
+ }
}
- return found, nil
}
// Run or monitor a container.
@@ -239,50 +262,91 @@ func run(dispatcher *dispatch.Dispatcher,
uuid := container.UUID
if container.State == dispatch.Locked {
- if inQ, err := checkSqueue(container.UUID); err != nil {
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+ // maybe squeue is broken, put it back in the queue
log.Printf("Error running squeue: %v", err)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ dispatcher.UpdateState(container.UUID, dispatch.Queued)
} else if !inQ {
log.Printf("About to submit queued container %v", container.UUID)
- jobid, err := submit(dispatcher, container, *crunchRunCommand)
- if err != nil {
- log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
- } else {
- finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+ if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+ log.Printf("Error submitting container %s to slurm: %v",
+ container.UUID, err)
+ // maybe sbatch is broken, put it back to queued
+ dispatcher.UpdateState(container.UUID, dispatch.Queued)
}
}
- } else if container.State == dispatch.Running {
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
- } else if !inQ {
- log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
- }
}
log.Printf("Monitoring container %v started", uuid)
- for container = range status {
- if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
- log.Printf("Canceling container %s", container.UUID)
-
- err := exec.Command("scancel", "--name="+container.UUID).Run()
- if err != nil {
- log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
- if inQ, err := checkSqueue(container.UUID); err != nil {
+ // periodically check squeue
+ doneSqueue := make(chan struct{})
+ go func() {
+ squeueUpdater.CheckSqueue(container.UUID, true)
+ ticker := time.NewTicker(dispatcher.PollInterval)
+ for {
+ select {
+ case <-ticker.C:
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
log.Printf("Error running squeue: %v", err)
- continue
- } else if inQ {
- log.Printf("Container %s is still in squeue after scancel.", container.UUID)
- continue
+ // don't cancel, just leave it the way it is
+ } else if !inQ {
+ var con dispatch.Container
+ err := dispatcher.Arv.Get("containers", uuid, nil, &con)
+ if err != nil {
+ log.Printf("Error getting final container state: %v", err)
+ }
+
+ var st string
+ switch con.State {
+ case dispatch.Locked:
+ st = dispatch.Queued
+ case dispatch.Running:
+ st = dispatch.Cancelled
+ default:
+ st = ""
+ }
+
+ if st != "" {
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ uuid, con.State, st)
+ dispatcher.UpdateState(uuid, st)
+ }
}
+ case <-doneSqueue:
+ close(doneSqueue)
+ ticker.Stop()
+ return
}
+ }
+ }()
- err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ for container = range status {
+ if container.State == dispatch.Locked || container.State == dispatch.Running {
+ if container.Priority == 0 {
+ log.Printf("Canceling container %s", container.UUID)
+
+ err := exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v",
+ container.UUID, err)
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue after scancel.",
+ container.UUID)
+ continue
+ }
+ }
+
+ err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
}
}
+ doneSqueue <- struct{}{}
+
log.Printf("Monitoring container %v finished", uuid)
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 348d5e4..d30c5df 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -1,12 +1,12 @@
package main
import (
+ "bytes"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
-
- "bytes"
- "fmt"
+ "io"
"log"
"math"
"net/http"
@@ -35,35 +35,43 @@ var initialArgs []string
func (s *TestSuite) SetUpSuite(c *C) {
initialArgs = os.Args
- arvadostest.StartAPI()
}
func (s *TestSuite) TearDownSuite(c *C) {
- arvadostest.StopAPI()
}
func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-slurm"}
os.Args = args
+ arvadostest.StartAPI()
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
func (s *TestSuite) TearDownTest(c *C) {
- arvadostest.ResetEnv()
os.Args = initialArgs
+ arvadostest.StopAPI()
}
func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) TestIntegration(c *C) {
+func (s *TestSuite) TestIntegrationNormal(c *C) {
+ s.integrationTest(c, false)
+}
+
+func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
+ s.integrationTest(c, true)
+}
+
+func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+ arvadostest.ResetEnv()
+
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, IsNil)
var sbatchCmdLine []string
- var striggerCmdLine []string
// Override sbatchCmd
defer func(orig func(dispatch.Container) *exec.Cmd) {
@@ -74,30 +82,16 @@ func (s *TestSuite) TestIntegration(c *C) {
return exec.Command("sh")
}
- // Override striggerCmd
- defer func(orig func(jobid, containerUUID, finishCommand,
- apiHost, apiToken, apiInsecure string) *exec.Cmd) {
- striggerCmd = orig
- }(striggerCmd)
- striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
- striggerCmdLine = striggerFunc(jobid, containerUUID, finishCommand,
- apiHost, apiToken, apiInsecure).Args
- go func() {
- time.Sleep(5 * time.Second)
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": dispatch.Complete}},
- nil)
- }()
- return exec.Command("echo", striggerCmdLine...)
- }
-
// Override squeueCmd
defer func(orig func() *exec.Cmd) {
squeueCmd = orig
}(squeueCmd)
squeueCmd = func() *exec.Cmd {
- return exec.Command("echo")
+ if missingFromSqueue {
+ return exec.Command("echo")
+ } else {
+ return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+ }
}
// There should be no queued containers now
@@ -111,8 +105,6 @@ func (s *TestSuite) TestIntegration(c *C) {
echo := "echo"
crunchRunCommand = &echo
- finishCmd := "/usr/bin/crunch-finish-slurm.sh"
- finishCommand = &finishCmd
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
@@ -122,8 +114,8 @@ func (s *TestSuite) TestIntegration(c *C) {
container dispatch.Container,
status chan dispatch.Container) {
go func() {
- time.Sleep(1)
dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}()
run(dispatcher, container, status)
@@ -131,19 +123,29 @@ func (s *TestSuite) TestIntegration(c *C) {
},
DoneProcessing: doneProcessing}
+ squeueUpdater.SqueueDone = make(chan struct{})
+ go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+
err = dispatcher.RunDispatcher()
c.Assert(err, IsNil)
+ squeueUpdater.SqueueDone <- struct{}{}
+ close(squeueUpdater.SqueueDone)
+
item := containers.Items[0]
sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", item.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
fmt.Sprintf("--priority=%d", item.Priority)}
- c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
- c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
- "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
+ if missingFromSqueue {
+ // not in squeue when run() started, so it will have called sbatch
+ c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+ } else {
+ // already in squeue when run() started, will have just monitored it instead
+ c.Check(sbatchCmdLine, DeepEquals, []string(nil))
+ }
// There should be no queued containers now
err = arv.List("containers", params, &containers)
@@ -154,7 +156,11 @@ func (s *TestSuite) TestIntegration(c *C) {
var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- c.Check(container.State, Equals, "Complete")
+ if missingFromSqueue {
+ c.Check(container.State, Equals, "Cancelled")
+ } else {
+ c.Check(container.State, Equals, "Complete")
+ }
}
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -180,12 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(buf)
+ log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
crunchRunCommand = &crunchCmd
- finishCmd := "/usr/bin/crunch-finish-slurm.sh"
- finishCommand = &finishCmd
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
@@ -195,7 +199,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
container dispatch.Container,
status chan dispatch.Container) {
go func() {
- time.Sleep(1)
+ time.Sleep(1 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}()
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
deleted file mode 100755
index 95a37ba..0000000
--- a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/sh
-
-# Script to be called by strigger when a job finishes. This ensures the job
-# record has the correct state "Complete" even if the node running the job
-# failed.
-
-ARVADOS_API_HOST=$1
-ARVADOS_API_TOKEN=$2
-ARVADOS_API_HOST_INSECURE=$3
-uuid=$4
-jobid=$5
-
-# If it is possible to attach metadata to job records we could look up the
-# above information instead of getting it on the command line. For example,
-# this is the recipe for getting the job name (container uuid) from the job id.
-#uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
-
-export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE
-
-exec arv container update --uuid $uuid --container '{"state": "Complete"}'
commit 3a3910fdc8a5003c182f68e3423c96327a136175
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 27 17:30:07 2016 -0400
9187: Check LockedByUUID on container updates and terminate status updates if
not equal to dispatcher token.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 355ed7c..a27971f 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -52,7 +52,7 @@ type Dispatcher struct {
mineMutex sync.Mutex
mineMap map[string]chan Container
- auth apiClientAuthorization
+ Auth apiClientAuthorization
containers chan Container
}
@@ -100,17 +100,18 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
- } else {
- if containers.ItemsAvailable > len(containers.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- containers.ItemsAvailable,
- len(containers.Items))
- }
- for _, container := range containers.Items {
- touched[container.UUID] = true
- dispatcher.containers <- container
- }
+ return
+ }
+
+ if containers.ItemsAvailable > len(containers.Items) {
+ // TODO: support paging
+ log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+ containers.ItemsAvailable,
+ len(containers.Items))
+ }
+ for _, container := range containers.Items {
+ touched[container.UUID] = true
+ dispatcher.containers <- container
}
}
@@ -122,7 +123,7 @@ func (dispatcher *Dispatcher) pollContainers() {
"order": []string{"priority desc"},
"limit": "1000"}
paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+ "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
"limit": "1000"}
for {
@@ -152,11 +153,19 @@ func (dispatcher *Dispatcher) pollContainers() {
}
func (dispatcher *Dispatcher) handleUpdate(container Container) {
+ if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+ // If container is Complete, Cancelled, or Queued, LockedByUUID
+ // will be nil. If the container was formally Locked, moved
+ // back to Queued and then locked by another dispatcher,
+ // LockedByUUID will be different. In either case, we want
+ // to stop monitoring it.
+ log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+ dispatcher.notMine(container.UUID)
+ return
+ }
+
if dispatcher.updateMine(container) {
- if container.State == Complete || container.State == Cancelled {
- log.Printf("Container %v now in state %v", container.UUID, container.State)
- dispatcher.notMine(container.UUID)
- }
+ // Already monitored, sent status update
return
}
@@ -169,6 +178,8 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
}
if container.State == Locked || container.State == Running {
+ // Not currently monitored but in Locked or Running state and
+ // owned by this dispatcher, so start monitoring.
go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
}
}
@@ -197,7 +208,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
// The callback is responsible for draining the channel, if it fails to do so
// it will deadlock the dispatcher.
func (dispatcher *Dispatcher) RunDispatcher() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
log.Printf("Error getting my token UUID: %v", err)
return
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index cc472a4..73a3895 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -159,7 +159,8 @@ func run(dispatcher *dispatch.Dispatcher,
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+ if container.LockedByUUID == dispatcher.Auth.UUID &&
+ (container.State == dispatch.Locked || container.State == dispatch.Running) {
log.Printf("After %s process termination, container state for %v is %q. Updating it to %q",
*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
commit 4153cb6cfad920ed0b1a4b818d3bcc8de492d134
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 19 14:12:42 2016 -0400
9187: Refactor dispatcher support into common library and update to use Locking API.
New dispatcher package in Go SDK provides framework for monitoring list of
queued/locked/running containers. Try to lock containers in the queue; locked
or running containers are passed to RunContainer goroutine supplied by the
specific dispatcher. Refactor existing dispatchers (-local and -slurm) to use
this framework. Dispatchers have crash recovery behavior, can put containers
which are unaccounted in cancelled state.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
new file mode 100644
index 0000000..355ed7c
--- /dev/null
+++ b/sdk/go/dispatch/dispatch.go
@@ -0,0 +1,229 @@
+package dispatch
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "log"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+)
+
+// Constants for container states
+const (
+ Queued = "Queued"
+ Locked = "Locked"
+ Running = "Running"
+ Complete = "Complete"
+ Cancelled = "Cancelled"
+)
+
+type apiClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+ Items []apiClientAuthorization `json:"items"`
+}
+
+// Container data
+type Container struct {
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+ LockedByUUID string `json:"locked_by_uuid"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+ Items []Container `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+}
+
+// Dispatcher holds the state of the dispatcher
+type Dispatcher struct {
+ Arv arvadosclient.ArvadosClient
+ RunContainer func(*Dispatcher, Container, chan Container)
+ PollInterval time.Duration
+ DoneProcessing chan struct{}
+
+ mineMutex sync.Mutex
+ mineMap map[string]chan Container
+ auth apiClientAuthorization
+ containers chan Container
+}
+
+// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
+// for which this process is actively starting/monitoring. Returns channel to
+// be used to send container status updates.
+func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+ dispatcher.mineMutex.Lock()
+ defer dispatcher.mineMutex.Unlock()
+ if ch, ok := dispatcher.mineMap[uuid]; ok {
+ return ch
+ }
+
+ ch := make(chan Container)
+ dispatcher.mineMap[uuid] = ch
+ return ch
+}
+
+// Release a container which is no longer being monitored.
+func (dispatcher *Dispatcher) notMine(uuid string) {
+ dispatcher.mineMutex.Lock()
+ defer dispatcher.mineMutex.Unlock()
+ if ch, ok := dispatcher.mineMap[uuid]; ok {
+ close(ch)
+ delete(dispatcher.mineMap, uuid)
+ }
+}
+
+// Check if there is a channel for updates associated with this container. If
+// so send the container record on the channel and return true, if not return
+// false.
+func (dispatcher *Dispatcher) updateMine(c Container) bool {
+ dispatcher.mineMutex.Lock()
+ defer dispatcher.mineMutex.Unlock()
+ ch, ok := dispatcher.mineMap[c.UUID]
+ if ok {
+ ch <- c
+ return true
+ }
+ return false
+}
+
+func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
+ var containers ContainerList
+ err := dispatcher.Arv.List("containers", params, &containers)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
+ } else {
+ if containers.ItemsAvailable > len(containers.Items) {
+ // TODO: support paging
+ log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+ containers.ItemsAvailable,
+ len(containers.Items))
+ }
+ for _, container := range containers.Items {
+ touched[container.UUID] = true
+ dispatcher.containers <- container
+ }
+ }
+}
+
+func (dispatcher *Dispatcher) pollContainers() {
+ ticker := time.NewTicker(dispatcher.PollInterval)
+
+ paramsQ := arvadosclient.Dict{
+ "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
+ "order": []string{"priority desc"},
+ "limit": "1000"}
+ paramsP := arvadosclient.Dict{
+ "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+ "limit": "1000"}
+
+ for {
+ select {
+ case <-ticker.C:
+ touched := make(map[string]bool)
+ dispatcher.getContainers(paramsQ, touched)
+ dispatcher.getContainers(paramsP, touched)
+ dispatcher.mineMutex.Lock()
+ var monitored []string
+ for k := range dispatcher.mineMap {
+ if _, ok := touched[k]; !ok {
+ monitored = append(monitored, k)
+ }
+ }
+ dispatcher.mineMutex.Unlock()
+ if monitored != nil {
+ dispatcher.getContainers(arvadosclient.Dict{
+ "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+ }
+ case <-dispatcher.DoneProcessing:
+ close(dispatcher.containers)
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func (dispatcher *Dispatcher) handleUpdate(container Container) {
+ if dispatcher.updateMine(container) {
+ if container.State == Complete || container.State == Cancelled {
+ log.Printf("Container %v now in state %v", container.UUID, container.State)
+ dispatcher.notMine(container.UUID)
+ }
+ return
+ }
+
+ if container.State == Queued {
+ // Try to take the lock
+ if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+ return
+ }
+ container.State = Locked
+ }
+
+ if container.State == Locked || container.State == Running {
+ go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+ }
+}
+
+// UpdateState makes an API call to change the state of a container.
+func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+ err := dispatcher.Arv.Update("containers", uuid,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": newState}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+ }
+ return err
+}
+
+// RunDispatcher runs the main loop of the dispatcher until receiving a message
+// on the dispatcher.DoneProcessing channel. It also installs a signal handler
+// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
+//
+// When a new queued container appears and is successfully locked, the
+// dispatcher will call RunContainer() followed by MonitorContainer(). If a
+// container appears that is Locked or Running but not known to the dispatcher,
+// it will only call monitorContainer(). The monitorContainer() callback is
+// passed a channel over which it will receive updates to the container state.
+// The callback is responsible for draining the channel, if it fails to do so
+// it will deadlock the dispatcher.
+func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+ if err != nil {
+ log.Printf("Error getting my token UUID: %v", err)
+ return
+ }
+
+ dispatcher.mineMap = make(map[string]chan Container)
+ dispatcher.containers = make(chan Container)
+
+ // Graceful shutdown on signal
+ sigChan := make(chan os.Signal)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ log.Printf("Caught signal: %v", sig)
+ dispatcher.DoneProcessing <- struct{}{}
+ }
+ }(sigChan)
+
+ defer close(sigChan)
+ defer signal.Stop(sigChan)
+
+ go dispatcher.pollContainers()
+ for container := range dispatcher.containers {
+ dispatcher.handleUpdate(container)
+ }
+
+ return nil
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 4023870..cc472a4 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -1,14 +1,15 @@
package main
+// Dispatcher service for Crunch that runs containers locally.
+
import (
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"log"
"os"
"os/exec"
- "os/signal"
"sync"
- "syscall"
"time"
)
@@ -20,12 +21,10 @@ func main() {
}
var (
- arv arvadosclient.ArvadosClient
runningCmds map[string]*exec.Cmd
runningCmdsMutex sync.Mutex
waitGroup sync.WaitGroup
- doneProcessing chan bool
- sigChan chan os.Signal
+ crunchRunCommand *string
)
func doMain() error {
@@ -36,12 +35,7 @@ func doMain() error {
10,
"Interval in seconds to poll for queued containers")
- priorityPollInterval := flags.Int(
- "container-priority-poll-interval",
- 60,
- "Interval in seconds to check priority of a dispatched container")
-
- crunchRunCommand := flags.String(
+ crunchRunCommand = flags.String(
"crunch-run-command",
"/usr/bin/crunch-run",
"Crunch command to run container")
@@ -49,35 +43,32 @@ func doMain() error {
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
+ runningCmds = make(map[string]*exec.Cmd)
+
+ arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
+ log.Printf("Error making Arvados client: %v", err)
return err
}
+ arv.Retries = 25
- // Channel to terminate
- doneProcessing = make(chan bool)
-
- // Map of running crunch jobs
- runningCmds = make(map[string]*exec.Cmd)
-
- // Graceful shutdown
- sigChan = make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- doneProcessing <- true
- }
- }(sigChan)
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ RunContainer: run,
+ PollInterval: time.Duration(*pollInterval) * time.Second,
+ DoneProcessing: make(chan struct{})}
- // Run all queued containers
- runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
+ err = dispatcher.RunDispatcher()
+ if err != nil {
+ return err
+ }
+ runningCmdsMutex.Lock()
// Finished dispatching; interrupt any crunch jobs that are still running
for _, cmd := range runningCmds {
cmd.Process.Signal(os.Interrupt)
}
+ runningCmdsMutex.Unlock()
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
@@ -85,166 +76,98 @@ func doMain() error {
return nil
}
-// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
- ticker := time.NewTicker(pollInterval)
-
- for {
- select {
- case <-ticker.C:
- dispatchLocal(priorityPollInterval, crunchRunCommand)
- case <-doneProcessing:
- ticker.Stop()
- return
- }
- }
+func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+ return cmd.Start()
}
-// Container data
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
+var startCmd = startFunc
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
- params := arvadosclient.Dict{
- "filters": [][]string{[]string{"state", "=", "Queued"}},
- }
+// Run a container.
+//
+// If the container is Locked, start a new crunch-run process and wait until
+// crunch-run completes. If the priority is set to zero, set an interrupt
+// signal to the crunch-run process.
+//
+// If the container is in any other state, or is not Complete/Cancelled after
+// crunch-run terminates, mark the container as Cancelled.
+func run(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
- var containers ContainerList
- err := arv.List("containers", params, &containers)
- if err != nil {
- log.Printf("Error getting list of queued containers: %q", err)
- return
- }
+ uuid := container.UUID
- for _, c := range containers.Items {
- log.Printf("About to run queued container %v", c.UUID)
- // Run the container
+ if container.State == dispatch.Locked {
waitGroup.Add(1)
- go func(c Container) {
- run(c.UUID, crunchRunCommand, pollInterval)
- waitGroup.Done()
- }(c)
- }
-}
-
-func updateState(uuid, newState string) error {
- err := arv.Update("containers", uuid,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": newState}},
- nil)
- if err != nil {
- log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
- }
- return err
-}
-
-// Run queued container:
-// Set container state to Locked
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
- if err := updateState(uuid, "Locked"); err != nil {
- return
- }
- cmd := exec.Command(crunchRunCommand, uuid)
- cmd.Stdin = nil
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stderr
+ cmd := exec.Command(*crunchRunCommand, uuid)
+ cmd.Stdin = nil
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stderr
- // Add this crunch job to the list of runningCmds only if we
- // succeed in starting crunch-run.
- runningCmdsMutex.Lock()
- if err := cmd.Start(); err != nil {
- log.Printf("Error starting crunch-run for %v: %q", uuid, err)
- runningCmdsMutex.Unlock()
- updateState(uuid, "Queued")
- return
- }
- runningCmds[uuid] = cmd
- runningCmdsMutex.Unlock()
+ log.Printf("Starting container %v", uuid)
- defer func() {
- setFinalState(uuid)
+ // Add this crunch job to the list of runningCmds only if we
+ // succeed in starting crunch-run.
- // Remove the crunch job from runningCmds
runningCmdsMutex.Lock()
- delete(runningCmds, uuid)
- runningCmdsMutex.Unlock()
- }()
-
- log.Printf("Starting container %v", uuid)
-
- updateState(uuid, "Running")
+ if err := startCmd(container, cmd); err != nil {
+ runningCmdsMutex.Unlock()
+ log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
+ dispatcher.UpdateState(uuid, dispatch.Cancelled)
+ } else {
+ runningCmds[uuid] = cmd
+ runningCmdsMutex.Unlock()
+
+ // Need to wait for crunch-run to exit
+ done := make(chan struct{})
+
+ go func() {
+ if _, err := cmd.Process.Wait(); err != nil {
+ log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+ }
+ log.Printf("sending done")
+ done <- struct{}{}
+ }()
+
+ Loop:
+ for {
+ select {
+ case <-done:
+ break Loop
+ case c := <-status:
+ // Interrupt the child process if priority changes to 0
+ if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
+ }
+ }
+ }
+ close(done)
- cmdExited := make(chan struct{})
+ log.Printf("Finished container run for %v", uuid)
- // Kill the child process if container priority changes to zero
- go func() {
- ticker := time.NewTicker(pollInterval)
- defer ticker.Stop()
- for {
- select {
- case <-cmdExited:
- return
- case <-ticker.C:
- }
- var container Container
- err := arv.Get("containers", uuid, nil, &container)
- if err != nil {
- log.Printf("Error getting container %v: %q", uuid, err)
- continue
- }
- if container.Priority == 0 {
- log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
- cmd.Process.Signal(os.Interrupt)
- }
+ // Remove the crunch job from runningCmds
+ runningCmdsMutex.Lock()
+ delete(runningCmds, uuid)
+ runningCmdsMutex.Unlock()
}
- }()
-
- // Wait for crunch-run to exit
- if _, err := cmd.Process.Wait(); err != nil {
- log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+ waitGroup.Done()
}
- close(cmdExited)
-
- log.Printf("Finished container run for %v", uuid)
-}
-func setFinalState(uuid string) {
- // The container state should now be 'Complete' if everything
- // went well. If it started but crunch-run didn't change its
- // final state to 'Running', fix that now. If it never even
- // started, cancel it as unrunnable. (TODO: Requeue instead,
- // and fix tests so they can tell something happened even if
- // the final state is Queued.)
- var container Container
- err := arv.Get("containers", uuid, nil, &container)
+ // If the container is not finalized, then change it to "Cancelled".
+ err := dispatcher.Arv.Get("containers", uuid, nil, &container)
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- fixState := map[string]string{
- "Running": "Complete",
- "Locked": "Cancelled",
+ if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+ log.Printf("After %s process termination, container state for %v is %q. Updating it to %q",
+ *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
+ dispatcher.UpdateState(uuid, dispatch.Cancelled)
}
- if newState, ok := fixState[container.State]; ok {
- log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
- updateState(uuid, newState)
+
+ // drain any subsequent status changes
+ for _ = range status {
}
+
+ log.Printf("Finalized container %v", uuid)
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index e3ab3a4..aca60e9 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -1,19 +1,20 @@
package main
import (
+ "bytes"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-
- "bytes"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ . "gopkg.in/check.v1"
+ "io"
"log"
"net/http"
"net/http/httptest"
"os"
- "syscall"
+ "os/exec"
+ "strings"
"testing"
"time"
-
- . "gopkg.in/check.v1"
)
// Gocheck boilerplate
@@ -32,6 +33,7 @@ var initialArgs []string
func (s *TestSuite) SetUpSuite(c *C) {
initialArgs = os.Args
arvadostest.StartAPI()
+ runningCmds = make(map[string]*exec.Cmd)
}
func (s *TestSuite) TearDownSuite(c *C) {
@@ -41,12 +43,6 @@ func (s *TestSuite) TearDownSuite(c *C) {
func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-local"}
os.Args = args
-
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- c.Fatalf("Error making arvados client: %s", err)
- }
}
func (s *TestSuite) TearDownTest(c *C) {
@@ -58,29 +54,48 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) Test_doMain(c *C) {
- args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
- os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
+
+ echo := "echo"
+ crunchRunCommand = &echo
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
+ startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ dispatcher.UpdateState(container.UUID, "Running")
+ dispatcher.UpdateState(container.UUID, "Complete")
+ return cmd.Start()
+ }
- go func() {
- time.Sleep(5 * time.Second)
- sigChan <- syscall.SIGINT
- }()
+ err = dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
- err := doMain()
- c.Check(err, IsNil)
+ // Wait for all running crunch jobs to complete / terminate
+ waitGroup.Wait()
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers ContainerList
+ var containers dispatch.ContainerList
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container Container
+ var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
c.Check(container.State, Equals, "Complete")
@@ -90,13 +105,13 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
@@ -106,31 +121,35 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
testWithServerStub(c, apiStubResponses, "echo",
- "After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+ `After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2". Updating it to "Cancelled"`)
}
func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`)}
+
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
- testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
+ testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting nosuchcommand for zzzzz-dz642-xxxxxxxxxxxxxx3")
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+ apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
+ arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+
apiStub := arvadostest.ServerStub{apiStubResponses}
api := httptest.NewServer(&apiStub)
defer api.Close()
- arv = arvadosclient.ArvadosClient{
+ arv := arvadosclient.ArvadosClient{
Scheme: "http",
ApiServer: api.URL[7:],
ApiToken: "abc123",
@@ -139,15 +158,38 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(buf)
+ log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
+ *crunchRunCommand = crunchCmd
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
+ startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ dispatcher.UpdateState(container.UUID, "Running")
+ dispatcher.UpdateState(container.UUID, "Complete")
+ return cmd.Start()
+ }
+
go func() {
- time.Sleep(2 * time.Second)
- sigChan <- syscall.SIGTERM
+ for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+ time.Sleep(100 * time.Millisecond)
+ }
+ dispatcher.DoneProcessing <- struct{}{}
}()
- runQueuedContainers(time.Second, time.Second, crunchCmd)
+ err := dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 53e4705..641b4bc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -1,19 +1,19 @@
package main
+// Dispatcher service for Crunch that submits containers to the slurm queue.
+
import (
"bufio"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"io/ioutil"
"log"
"math"
"os"
"os/exec"
- "os/signal"
- "strconv"
- "sync"
- "syscall"
+ "strings"
"time"
)
@@ -25,12 +25,8 @@ func main() {
}
var (
- arv arvadosclient.ArvadosClient
- runningCmds map[string]*exec.Cmd
- runningCmdsMutex sync.Mutex
- waitGroup sync.WaitGroup
- doneProcessing chan bool
- sigChan chan os.Signal
+ crunchRunCommand *string
+ finishCommand *string
)
func doMain() error {
@@ -41,17 +37,12 @@ func doMain() error {
10,
"Interval in seconds to poll for queued containers")
- priorityPollInterval := flags.Int(
- "container-priority-poll-interval",
- 60,
- "Interval in seconds to check priority of a dispatched container")
-
- crunchRunCommand := flags.String(
+ crunchRunCommand = flags.String(
"crunch-run-command",
"/usr/bin/crunch-run",
"Crunch command to run container")
- finishCommand := flags.String(
+ finishCommand = flags.String(
"finish-command",
"/usr/bin/crunch-finish-slurm.sh",
"Command to run from strigger when job is finished")
@@ -59,142 +50,56 @@ func doMain() error {
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
+ arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
+ log.Printf("Error making Arvados client: %v", err)
return err
}
+ arv.Retries = 25
- // Channel to terminate
- doneProcessing = make(chan bool)
-
- // Graceful shutdown
- sigChan = make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- doneProcessing <- true
- }
- }(sigChan)
-
- // Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
-
- // Wait for all running crunch jobs to complete / terminate
- waitGroup.Wait()
-
- return nil
-}
-
-type apiClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
- Items []apiClientAuthorization `json:"items"`
-}
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ RunContainer: run,
+ PollInterval: time.Duration(*pollInterval) * time.Second,
+ DoneProcessing: make(chan struct{})}
-// Poll for queued containers using pollInterval.
-// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
- var auth apiClientAuthorization
- err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+ err = dispatcher.RunDispatcher()
if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
- return
- }
-
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
- for {
- select {
- case <-ticker.C:
- dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
- case <-doneProcessing:
- ticker.Stop()
- return
- }
- }
-}
-
-// Container data
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run
-// for each container.
-func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
- params := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
- }
-
- var containers ContainerList
- err := arv.List("containers", params, &containers)
- if err != nil {
- log.Printf("Error getting list of queued containers: %q", err)
- return
+ return err
}
- for _, container := range containers.Items {
- if container.State == "Locked" {
- if container.LockedByUUID != auth.UUID {
- // Locked by a different dispatcher
- continue
- } else if checkMine(container.UUID) {
- // I already have a goroutine running
- // for this container: it just hasn't
- // gotten past Locked state yet.
- continue
- }
- log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
- "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
- container.UUID, auth.UUID)
- setMine(container.UUID, true)
- go func() {
- waitContainer(container, pollInterval)
- setMine(container.UUID, false)
- }()
- }
- go run(container, crunchRunCommand, finishCommand, pollInterval)
- }
+ return nil
}
// sbatchCmd
-func sbatchFunc(container Container) *exec.Cmd {
+func sbatchFunc(container dispatch.Container) *exec.Cmd {
memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
return exec.Command("sbatch", "--share", "--parsable",
- "--job-name="+container.UUID,
- "--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
- "--cpus-per-task="+strconv.Itoa(int(container.RuntimeConstraints["vcpus"])))
+ fmt.Sprintf("--job-name=%s", container.UUID),
+ fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
+ fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--priority=%d", container.Priority))
}
-var sbatchCmd = sbatchFunc
-
// striggerCmd
func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
}
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+ return exec.Command("squeue", "--format=%j")
+}
+
+// Wrap these so that they can be overridden by tests
var striggerCmd = striggerFunc
+var sbatchCmd = sbatchFunc
+var squeueCmd = squeueFunc
// Submit job to slurm using sbatch.
-func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+func submit(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
defer func() {
@@ -204,7 +109,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
// OK, no cleanup needed
return
}
- err := arv.Update("containers", container.UUID,
+ err := dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Queued"}},
nil)
@@ -244,7 +149,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
b, _ := ioutil.ReadAll(stdoutReader)
stdoutReader.Close()
stdoutChan <- b
- close(stdoutChan)
}()
stderrChan := make(chan []byte)
@@ -252,7 +156,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
b, _ := ioutil.ReadAll(stderrReader)
stderrReader.Close()
stderrChan <- b
- close(stderrChan)
}()
// Send a tiny script on stdin to execute the crunch-run command
@@ -265,21 +168,28 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
stdoutMsg := <-stdoutChan
stderrmsg := <-stderrChan
+ close(stdoutChan)
+ close(stderrChan)
+
if err != nil {
submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
return
}
// If everything worked out, got the jobid on stdout
- jobid = string(stdoutMsg)
+ jobid = strings.TrimSpace(string(stdoutMsg))
return
}
// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
- cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
+ insecure := "0"
+ if arv.ApiInsecure {
+ insecure = "1"
+ }
+ cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@@ -291,104 +201,8 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
}
}
-// Run a queued container: [1] Set container state to locked. [2]
-// Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
- setMine(container.UUID, true)
- defer setMine(container.UUID, false)
-
- // Update container status to Locked. This will fail if
- // another dispatcher (token) has already locked it. It will
- // succeed if *this* dispatcher has already locked it.
- err := arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Locked"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
- return
- }
-
- log.Printf("About to submit queued container %v", container.UUID)
-
- jobid, err := submit(container, crunchRunCommand)
- if err != nil {
- log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
- return
- }
-
- insecure := "0"
- if arv.ApiInsecure {
- insecure = "1"
- }
- finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-
- // Update container status to Running. This will fail if
- // another dispatcher (token) has already locked it. It will
- // succeed if *this* dispatcher has already locked it.
- err = arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Running"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
- }
- log.Printf("Submitted container %v to slurm", container.UUID)
- waitContainer(container, pollInterval)
-}
-
-// Wait for a container to finish. Cancel the slurm job if the
-// container priority changes to zero before it ends.
-func waitContainer(container Container, pollInterval time.Duration) {
- log.Printf("Monitoring container %v started", container.UUID)
- defer log.Printf("Monitoring container %v finished", container.UUID)
-
- pollTicker := time.NewTicker(pollInterval)
- defer pollTicker.Stop()
- for _ = range pollTicker.C {
- var updated Container
- err := arv.Get("containers", container.UUID, nil, &updated)
- if err != nil {
- log.Printf("Error getting container %s: %q", container.UUID, err)
- continue
- }
- if updated.State == "Complete" || updated.State == "Cancelled" {
- return
- }
- if updated.Priority != 0 {
- continue
- }
-
- // Priority is zero, but state is Running or Locked
- log.Printf("Canceling container %s", container.UUID)
-
- err = exec.Command("scancel", "--name="+container.UUID).Run()
- if err != nil {
- log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- continue
- } else if inQ {
- log.Printf("Container %s is still in squeue; will retry", container.UUID)
- continue
- }
- }
-
- err = arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Cancelled"}},
- nil)
- if err != nil {
- log.Printf("Error updating state for container %s: %s", container.UUID, err)
- continue
- }
-
- return
- }
-}
-
func checkSqueue(uuid string) (bool, error) {
- cmd := exec.Command("squeue", "--format=%j")
+ cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
return false, err
@@ -408,25 +222,67 @@ func checkSqueue(uuid string) (bool, error) {
return found, nil
}
-var mineMutex sync.RWMutex
-var mineMap = make(map[string]bool)
-
-// Goroutine-safely add/remove uuid to the set of "my" containers,
-// i.e., ones for which this process has a goroutine running.
-func setMine(uuid string, t bool) {
- mineMutex.Lock()
- if t {
- mineMap[uuid] = true
- } else {
- delete(mineMap, uuid)
+// Run or monitor a container.
+//
+// If the container is marked as Locked, check if it is already in the slurm
+// queue. If not, submit it.
+//
+// If the container is marked as Running, check if it is in the slurm queue.
+// If not, mark it as Cancelled.
+//
+// Monitor status updates. If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+
+ uuid := container.UUID
+
+ if container.State == dispatch.Locked {
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ } else if !inQ {
+ log.Printf("About to submit queued container %v", container.UUID)
+
+ jobid, err := submit(dispatcher, container, *crunchRunCommand)
+ if err != nil {
+ log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
+ } else {
+ finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+ }
+ }
+ } else if container.State == dispatch.Running {
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ } else if !inQ {
+ log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
+ }
+
+ log.Printf("Monitoring container %v started", uuid)
+
+ for container = range status {
+ if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
+ log.Printf("Canceling container %s", container.UUID)
+
+ err := exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+ continue
+ }
+ }
+
+ err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
}
- mineMutex.Unlock()
-}
-// Check whether there is already a goroutine running for this
-// container.
-func checkMine(uuid string) bool {
- mineMutex.RLocker().Lock()
- defer mineMutex.RLocker().Unlock()
- return mineMap[uuid]
+ log.Printf("Monitoring container %v finished", uuid)
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3dfb7d5..348d5e4 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -3,6 +3,7 @@ package main
import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"bytes"
"fmt"
@@ -12,9 +13,7 @@ import (
"net/http/httptest"
"os"
"os/exec"
- "strconv"
"strings"
- "syscall"
"testing"
"time"
@@ -47,11 +46,6 @@ func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-slurm"}
os.Args = args
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- c.Fatalf("Error making arvados client: %s", err)
- }
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
@@ -64,18 +58,18 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) Test_doMain(c *C) {
- args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
- os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
var sbatchCmdLine []string
var striggerCmdLine []string
// Override sbatchCmd
- defer func(orig func(Container) *exec.Cmd) {
+ defer func(orig func(dispatch.Container) *exec.Cmd) {
sbatchCmd = orig
}(sbatchCmd)
- sbatchCmd = func(container Container) *exec.Cmd {
+ sbatchCmd = func(container dispatch.Container) *exec.Cmd {
sbatchCmdLine = sbatchFunc(container).Args
return exec.Command("sh")
}
@@ -90,41 +84,65 @@ func (s *TestSuite) Test_doMain(c *C) {
apiHost, apiToken, apiInsecure).Args
go func() {
time.Sleep(5 * time.Second)
- for _, state := range []string{"Running", "Complete"} {
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": state}},
- nil)
- }
+ arv.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": dispatch.Complete}},
+ nil)
}()
- return exec.Command("echo", "strigger")
+ return exec.Command("echo", striggerCmdLine...)
}
- go func() {
- time.Sleep(8 * time.Second)
- sigChan <- syscall.SIGINT
- }()
+ // Override squeueCmd
+ defer func(orig func() *exec.Cmd) {
+ squeueCmd = orig
+ }(squeueCmd)
+ squeueCmd = func() *exec.Cmd {
+ return exec.Command("echo")
+ }
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers ContainerList
- err := arv.List("containers", params, &containers)
+ var containers dispatch.ContainerList
+ err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
- err = doMain()
- c.Check(err, IsNil)
+ echo := "echo"
+ crunchRunCommand = &echo
+ finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+ finishCommand = &finishCmd
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ go func() {
+ time.Sleep(1)
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ }()
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
+ err = dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
item := containers.Items[0]
sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", item.UUID),
- fmt.Sprintf("--mem-per-cpu=%s", strconv.Itoa(int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576))))),
- fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
+ fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
+ fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--priority=%d", item.Priority)}
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
- c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
+ c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
// There should be no queued containers now
@@ -133,7 +151,7 @@ func (s *TestSuite) Test_doMain(c *C) {
c.Check(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container Container
+ var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
c.Check(container.State, Equals, "Complete")
@@ -144,7 +162,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -153,7 +171,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
api := httptest.NewServer(&apiStub)
defer api.Close()
- arv = arvadosclient.ArvadosClient{
+ arv := arvadosclient.ArvadosClient{
Scheme: "http",
ApiServer: api.URL[7:],
ApiToken: "abc123",
@@ -165,14 +183,36 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
log.SetOutput(buf)
defer log.SetOutput(os.Stderr)
+ crunchRunCommand = &crunchCmd
+ finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+ finishCommand = &finishCmd
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ go func() {
+ time.Sleep(1)
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ }()
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
go func() {
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- sigChan <- syscall.SIGTERM
+ dispatcher.DoneProcessing <- struct{}{}
}()
- runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+ err := dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list