[ARVADOS] updated: 0158347f6951d3c25b5c740caaf91d2d1644c8d2
Git user
git at public.curoverse.com
Mon Jun 6 10:44:36 EDT 2016
Summary of changes:
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 17 +++--
.../crunch-dispatch-slurm_test.go | 88 +++++++++++++---------
services/crunch-dispatch-slurm/squeue.go | 71 +++++++----------
3 files changed, 90 insertions(+), 86 deletions(-)
via 0158347f6951d3c25b5c740caaf91d2d1644c8d2 (commit)
via d45be86b354adec485504bfc09f41e0e22241f34 (commit)
from bb10b7777ed6db229fbb35e6a829bec4e8efcd23 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0158347f6951d3c25b5c740caaf91d2d1644c8d2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 6 10:44:11 2016 -0400
9187: Remove "squeueError" because checkSqueue for a successful squeue run. Refactor tests a bit and add a test for canceling containers.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 1dada2f..f718fbc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -78,8 +78,14 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
fmt.Sprintf("--priority=%d", container.Priority))
}
+// scancelCmd
+func scancelFunc(container dispatch.Container) *exec.Cmd {
+ return exec.Command("scancel", "--name="+container.UUID)
+}
+
// Wrap these so that they can be overridden by tests
var sbatchCmd = sbatchFunc
+var scancelCmd = scancelFunc
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
@@ -178,10 +184,7 @@ func submit(dispatcher *dispatch.Dispatcher,
func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
- if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
- // Most recent run of squeue had an error, so do nothing.
- continue
- } else if inQ {
+ if squeueUpdater.CheckSqueue(container.UUID) {
// Found in the queue, so continue monitoring
submitted = true
} else if container.State == dispatch.Locked && !submitted {
@@ -249,15 +252,13 @@ func run(dispatcher *dispatch.Dispatcher,
// Mutex between squeue sync and running sbatch or scancel.
squeueUpdater.SlurmLock.Lock()
- err := exec.Command("scancel", "--name="+container.UUID).Run()
+ err := scancelCmd(container).Run()
squeueUpdater.SlurmLock.Unlock()
if err != nil {
log.Printf("Error stopping container %s with scancel: %v",
container.UUID, err)
- if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
- continue
- } else if inQ {
+ if squeueUpdater.CheckSqueue(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.",
container.UUID)
continue
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index be347e4..cddbe8c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -8,7 +8,6 @@ import (
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"io"
"log"
- "math"
"net/http"
"net/http/httptest"
"os"
@@ -58,14 +57,60 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
}
func (s *TestSuite) TestIntegrationNormal(c *C) {
- s.integrationTest(c, false)
+ container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ []string(nil),
+ func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(3 * time.Second)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, "Complete")
+}
+
+func (s *TestSuite) TestIntegrationCancel(c *C) {
+
+ // Override sbatchCmd
+ var scancelCmdLine []string
+ defer func(orig func(dispatch.Container) *exec.Cmd) {
+ scancelCmd = orig
+ }(scancelCmd)
+ scancelCmd = func(container dispatch.Container) *exec.Cmd {
+ scancelCmdLine = scancelFunc(container).Args
+ return exec.Command("echo")
+ }
+
+ container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ []string(nil),
+ func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(1 * time.Second)
+ dispatcher.Arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"priority": 0}},
+ nil)
+ })
+ c.Check(container.State, Equals, "Cancelled")
+ c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
}
func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- s.integrationTest(c, true)
+ container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
+ fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
+ fmt.Sprintf("--mem-per-cpu=%d", 2862),
+ fmt.Sprintf("--cpus-per-task=%d", 4),
+ fmt.Sprintf("--priority=%d", 1)},
+ func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(3 * time.Second)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, "Cancelled")
}
-func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+func (s *TestSuite) integrationTest(c *C,
+ newSqueueCmd func() *exec.Cmd,
+ sbatchCmdComps []string,
+ runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
arvadostest.ResetEnv()
arv, err := arvadosclient.MakeArvadosClient()
@@ -86,13 +131,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
defer func(orig func() *exec.Cmd) {
squeueCmd = orig
}(squeueCmd)
- squeueCmd = func() *exec.Cmd {
- if missingFromSqueue {
- return exec.Command("echo")
- } else {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
- }
- }
+ squeueCmd = newSqueueCmd
// There should be no queued containers now
params := arvadosclient.Dict{
@@ -113,11 +152,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
RunContainer: func(dispatcher *dispatch.Dispatcher,
container dispatch.Container,
status chan dispatch.Container) {
- go func() {
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(3 * time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
- }()
+ go runContainer(dispatcher, container)
run(dispatcher, container, status)
doneProcessing <- struct{}{}
},
@@ -130,20 +165,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
squeueUpdater.Done()
- item := containers.Items[0]
- sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
- fmt.Sprintf("--job-name=%s", item.UUID),
- fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
- fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
- fmt.Sprintf("--priority=%d", item.Priority)}
-
- if missingFromSqueue {
- // not in squeue when run() started, so it will have called sbatch
- c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
- } else {
- // already in squeue when run() started, will have just monitored it instead
- c.Check(sbatchCmdLine, DeepEquals, []string(nil))
- }
+ c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
// There should be no queued containers now
err = arv.List("containers", params, &containers)
@@ -154,11 +176,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- if missingFromSqueue {
- c.Check(container.State, Equals, "Cancelled")
- } else {
- c.Check(container.State, Equals, "Complete")
- }
+ return container
}
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 3ee8b6f..68983b8 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -13,7 +13,6 @@ import (
type Squeue struct {
squeueContents []string
squeueDone chan struct{}
- squeueError error
squeueCond *sync.Cond
SlurmLock sync.Mutex
}
@@ -25,10 +24,10 @@ func squeueFunc() *exec.Cmd {
var squeueCmd = squeueFunc
-// RunSqueue runs squeue once and captures the output. If there is an error,
-// set "squeueError". If it succeeds, set "squeueContents" and then wake up
-// any goroutines waiting squeueCond in CheckSqueue().
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output. If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue(). If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
var newSqueueContents []string
// Mutex between squeue sync and running sbatch or scancel. This
@@ -39,15 +38,12 @@ func (squeue *Squeue) RunSqueue() error {
defer squeue.SlurmLock.Unlock()
// Also ensure unlock on all return paths
- defer squeue.squeueCond.L.Unlock()
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
cmd.Start()
scanner := bufio.NewScanner(sq)
@@ -57,48 +53,37 @@ func (squeue *Squeue) RunSqueue() error {
if err := scanner.Err(); err != nil {
cmd.Wait()
log.Printf("Error reading from squeue pipe: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
err = cmd.Wait()
if err != nil {
log.Printf("Error running squeue: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
squeue.squeueCond.L.Lock()
- squeue.squeueError = nil
squeue.squeueContents = newSqueueContents
squeue.squeueCond.Broadcast()
-
- return nil
+ squeue.squeueCond.L.Unlock()
}
// CheckSqueue checks if a given container UUID is in the slurm queue. This
// does not run squeue directly, but instead blocks until woken up by next
// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
squeue.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
squeue.squeueCond.Wait()
- if squeue.squeueError != nil {
- e := squeue.squeueError
- squeue.squeueCond.L.Unlock()
- return false, e
- }
contents := squeue.squeueContents
squeue.squeueCond.L.Unlock()
for _, k := range contents {
if k == uuid {
- return true, nil
+ return true
}
}
- return false, nil
+ return false
}
// StartMonitor starts the squeue monitoring goroutine.
commit d45be86b354adec485504bfc09f41e0e22241f34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jun 3 17:57:48 2016 -0400
9187: Fix refactoring messup
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index e157469..3ee8b6f 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -35,18 +35,18 @@ func (squeue *Squeue) RunSqueue() error {
// establishes a sequence so that squeue doesn't run concurrently with
// sbatch or scancel; the next update of squeue will occur only after
// sbatch or scancel has completed.
- squeueUpdater.SlurmLock.Lock()
- defer squeueUpdater.SlurmLock.Unlock()
+ squeue.SlurmLock.Lock()
+ defer squeue.SlurmLock.Unlock()
// Also ensure unlock on all return paths
- defer squeueUpdater.squeueCond.L.Unlock()
+ defer squeue.squeueCond.L.Unlock()
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
cmd.Start()
@@ -57,23 +57,23 @@ func (squeue *Squeue) RunSqueue() error {
if err := scanner.Err(); err != nil {
cmd.Wait()
log.Printf("Error reading from squeue pipe: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
err = cmd.Wait()
if err != nil {
log.Printf("Error running squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = nil
- squeueUpdater.squeueContents = newSqueueContents
- squeueUpdater.squeueCond.Broadcast()
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = nil
+ squeue.squeueContents = newSqueueContents
+ squeue.squeueCond.Broadcast()
return nil
}
@@ -82,16 +82,16 @@ func (squeue *Squeue) RunSqueue() error {
// does not run squeue directly, but instead blocks until woken up by next
// successful update of squeue.
func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
- squeueUpdater.squeueCond.L.Lock()
+ squeue.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
- squeueUpdater.squeueCond.Wait()
- if squeueUpdater.squeueError != nil {
- e := squeueUpdater.squeueError
- squeueUpdater.squeueCond.L.Unlock()
+ squeue.squeueCond.Wait()
+ if squeue.squeueError != nil {
+ e := squeue.squeueError
+ squeue.squeueCond.L.Unlock()
return false, e
}
- contents := squeueUpdater.squeueContents
- squeueUpdater.squeueCond.L.Unlock()
+ contents := squeue.squeueContents
+ squeue.squeueCond.L.Unlock()
for _, k := range contents {
if k == uuid {
@@ -103,16 +103,16 @@ func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
// StartMonitor starts the squeue monitoring goroutine.
func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
- squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
- squeueUpdater.squeueDone = make(chan struct{})
- squeueUpdater.RunSqueue()
- go squeueUpdater.SyncSqueue(pollInterval)
+ squeue.squeueCond = sync.NewCond(&sync.Mutex{})
+ squeue.squeueDone = make(chan struct{})
+ squeue.RunSqueue()
+ go squeue.SyncSqueue(pollInterval)
}
// Done stops the squeue monitoring goroutine.
func (squeue *Squeue) Done() {
- squeueUpdater.squeueDone <- struct{}{}
- close(squeueUpdater.squeueDone)
+ squeue.squeueDone <- struct{}{}
+ close(squeue.squeueDone)
}
// SyncSqueue periodically polls RunSqueue() at the given duration until
@@ -121,10 +121,10 @@ func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
ticker := time.NewTicker(pollInterval)
for {
select {
- case <-squeueUpdater.squeueDone:
+ case <-squeue.squeueDone:
return
case <-ticker.C:
- squeueUpdater.RunSqueue()
+ squeue.RunSqueue()
}
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list