[ARVADOS] updated: a19ffad966b25b3869e666f749f7c6da187bef68
Git user
git at public.curoverse.com
Mon Jan 30 19:24:28 EST 2017
Summary of changes:
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 51 +++----
.../crunch-dispatch-slurm_test.go | 4 +-
services/crunch-dispatch-slurm/squeue.go | 164 ++++++++-------------
3 files changed, 85 insertions(+), 134 deletions(-)
via a19ffad966b25b3869e666f749f7c6da187bef68 (commit)
via 1bc602ad5480b9b1ed78b318e9d3d9749d2b83ab (commit)
via 65123c5a66fe155d6dad2cee3a1e0b90f7b7f3f2 (commit)
from e34a5060cfc1cc4821b431e8aa6778a31898e0eb (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a19ffad966b25b3869e666f749f7c6da187bef68
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jan 30 19:19:49 2017 -0500
10700: Rename squeue identifiers (sqCheck = SqueueChecker{})
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 3cc0f8f..60dc607 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -44,8 +44,8 @@ func main() {
}
var (
- theConfig Config
- squeueUpdater Squeue
+ theConfig Config
+ sqCheck SqueueChecker
)
const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -98,8 +98,8 @@ func doMain() error {
}
arv.Retries = 25
- squeueUpdater = Squeue{Period: time.Duration(theConfig.PollPeriod)}
- defer squeueUpdater.Stop()
+ sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+ defer sqCheck.Stop()
dispatcher := dispatch.Dispatcher{
Arv: arv,
@@ -168,8 +168,8 @@ func submit(dispatcher *dispatch.Dispatcher,
cmd.Stderr = &stderr
// Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.L.Lock()
- defer squeueUpdater.L.Unlock()
+ sqCheck.L.Lock()
+ defer sqCheck.L.Unlock()
log.Printf("exec sbatch %+q", cmd.Args)
err := cmd.Run()
@@ -192,7 +192,7 @@ func submit(dispatcher *dispatch.Dispatcher,
func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
- if squeueUpdater.HasUUID(container.UUID) {
+ if sqCheck.HasUUID(container.UUID) {
// Found in the queue, so continue monitoring
submitted = true
} else if container.State == dispatch.Locked && !submitted {
@@ -257,14 +257,14 @@ func run(dispatcher *dispatch.Dispatcher,
if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
log.Printf("Canceling container %s", container.UUID)
// Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.L.Lock()
+ sqCheck.L.Lock()
cmd := scancelCmd(container)
msg, err := cmd.CombinedOutput()
- squeueUpdater.L.Unlock()
+ sqCheck.L.Unlock()
if err != nil {
log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
- if squeueUpdater.HasUUID(container.UUID) {
+ if sqCheck.HasUUID(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.", container.UUID)
continue
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 152e2e0..8809e7b 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -158,12 +158,12 @@ func (s *TestSuite) integrationTest(c *C,
},
}
- squeueUpdater = Squeue{Period: 500 * time.Millisecond}
+ sqCheck = SqueueChecker{Period: 500 * time.Millisecond}
err = dispatcher.Run()
c.Assert(err, IsNil)
- squeueUpdater.Stop()
+ sqCheck.Stop()
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index c1bbe92..3bebe56 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -11,7 +11,7 @@ import (
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
-type Squeue struct {
+type SqueueChecker struct {
Period time.Duration
hasUUID map[string]bool
startOnce sync.Once
@@ -28,35 +28,35 @@ var squeueCmd = squeueFunc
// HasUUID checks if a given container UUID is in the slurm queue.
// This does not run squeue directly, but instead blocks until woken
// up by next successful update of squeue.
-func (squeue *Squeue) HasUUID(uuid string) bool {
- squeue.startOnce.Do(squeue.start)
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+ sqc.startOnce.Do(sqc.start)
- squeue.L.Lock()
- defer squeue.L.Unlock()
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
// block until next squeue broadcast signaling an update.
- squeue.Wait()
- return squeue.hasUUID[uuid]
+ sqc.Wait()
+ return sqc.hasUUID[uuid]
}
// Stop stops the squeue monitoring goroutine. Do not call HasUUID
// after calling Stop.
-func (squeue *Squeue) Stop() {
- if squeue.done != nil {
- close(squeue.done)
+func (sqc *SqueueChecker) Stop() {
+ if sqc.done != nil {
+ close(sqc.done)
}
}
// check gets the names of jobs in the SLURM queue (running and
// queued). If it succeeds, it updates squeue.hasUUID and wakes up any
// goroutines that are waiting in HasUUID().
-func (squeue *Squeue) check() {
+func (sqc *SqueueChecker) check() {
// Mutex between squeue sync and running sbatch or scancel. This
// establishes a sequence so that squeue doesn't run concurrently with
// sbatch or scancel; the next update of squeue will occur only after
// sbatch or scancel has completed.
- squeue.L.Lock()
- defer squeue.L.Unlock()
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
cmd := squeueCmd()
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
@@ -67,27 +67,27 @@ func (squeue *Squeue) check() {
}
uuids := strings.Split(stdout.String(), "\n")
- squeue.hasUUID = make(map[string]bool, len(uuids))
+ sqc.hasUUID = make(map[string]bool, len(uuids))
for _, uuid := range uuids {
- squeue.hasUUID[uuid] = true
+ sqc.hasUUID[uuid] = true
}
- squeue.Broadcast()
+ sqc.Broadcast()
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
-func (squeue *Squeue) start() {
- squeue.L = &sync.Mutex{}
- squeue.done = make(chan struct{})
+func (sqc *SqueueChecker) start() {
+ sqc.L = &sync.Mutex{}
+ sqc.done = make(chan struct{})
go func() {
- ticker := time.NewTicker(squeue.Period)
+ ticker := time.NewTicker(sqc.Period)
for {
select {
- case <-squeue.done:
+ case <-sqc.done:
ticker.Stop()
return
case <-ticker.C:
- squeue.check()
+ sqc.check()
}
}
}()
commit 1bc602ad5480b9b1ed78b318e9d3d9749d2b83ab
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jan 30 18:32:49 2017 -0500
10700: Simplify squeue checker.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index f430aed..3cc0f8f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -98,8 +98,8 @@ func doMain() error {
}
arv.Retries = 25
- squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
- defer squeueUpdater.Done()
+ squeueUpdater = Squeue{Period: time.Duration(theConfig.PollPeriod)}
+ defer squeueUpdater.Stop()
dispatcher := dispatch.Dispatcher{
Arv: arv,
@@ -168,8 +168,8 @@ func submit(dispatcher *dispatch.Dispatcher,
cmd.Stderr = &stderr
// Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.SlurmLock.Lock()
- defer squeueUpdater.SlurmLock.Unlock()
+ squeueUpdater.L.Lock()
+ defer squeueUpdater.L.Unlock()
log.Printf("exec sbatch %+q", cmd.Args)
err := cmd.Run()
@@ -192,7 +192,7 @@ func submit(dispatcher *dispatch.Dispatcher,
func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
- if squeueUpdater.CheckSqueue(container.UUID) {
+ if squeueUpdater.HasUUID(container.UUID) {
// Found in the queue, so continue monitoring
submitted = true
} else if container.State == dispatch.Locked && !submitted {
@@ -257,14 +257,14 @@ func run(dispatcher *dispatch.Dispatcher,
if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
log.Printf("Canceling container %s", container.UUID)
// Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.SlurmLock.Lock()
+ squeueUpdater.L.Lock()
cmd := scancelCmd(container)
msg, err := cmd.CombinedOutput()
- squeueUpdater.SlurmLock.Unlock()
+ squeueUpdater.L.Unlock()
if err != nil {
log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
- if squeueUpdater.CheckSqueue(container.UUID) {
+ if squeueUpdater.HasUUID(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.", container.UUID)
continue
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index de25fc3..152e2e0 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -158,12 +158,12 @@ func (s *TestSuite) integrationTest(c *C,
},
}
- squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
+ squeueUpdater = Squeue{Period: 500 * time.Millisecond}
err = dispatcher.Run()
c.Assert(err, IsNil)
- squeueUpdater.Done()
+ squeueUpdater.Stop()
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 45d06c8..c1bbe92 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -1,11 +1,10 @@
package main
import (
- "bufio"
- "io"
- "io/ioutil"
+ "bytes"
"log"
"os/exec"
+ "strings"
"sync"
"time"
)
@@ -13,126 +12,83 @@ import (
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type Squeue struct {
- squeueContents []string
- squeueDone chan struct{}
- squeueCond *sync.Cond
- SlurmLock sync.Mutex
+ Period time.Duration
+ hasUUID map[string]bool
+ startOnce sync.Once
+ done chan struct{}
+ sync.Cond
}
-// squeueFunc
func squeueFunc() *exec.Cmd {
return exec.Command("squeue", "--all", "--format=%j")
}
var squeueCmd = squeueFunc
-// RunSqueue runs squeue once and captures the output. If it succeeds, set
-// "squeueContents" and then wake up any goroutines waiting squeueCond in
-// CheckSqueue(). If there was an error, log it and leave the threads blocked.
-func (squeue *Squeue) RunSqueue() {
- var newSqueueContents []string
+// HasUUID checks if a given container UUID is in the slurm queue.
+// This does not run squeue directly, but instead blocks until woken
+// up by next successful update of squeue.
+func (squeue *Squeue) HasUUID(uuid string) bool {
+ squeue.startOnce.Do(squeue.start)
+ squeue.L.Lock()
+ defer squeue.L.Unlock()
+
+ // block until next squeue broadcast signaling an update.
+ squeue.Wait()
+ return squeue.hasUUID[uuid]
+}
+
+// Stop stops the squeue monitoring goroutine. Do not call HasUUID
+// after calling Stop.
+func (squeue *Squeue) Stop() {
+ if squeue.done != nil {
+ close(squeue.done)
+ }
+}
+
+// check gets the names of jobs in the SLURM queue (running and
+// queued). If it succeeds, it updates squeue.hasUUID and wakes up any
+// goroutines that are waiting in HasUUID().
+func (squeue *Squeue) check() {
// Mutex between squeue sync and running sbatch or scancel. This
// establishes a sequence so that squeue doesn't run concurrently with
// sbatch or scancel; the next update of squeue will occur only after
// sbatch or scancel has completed.
- squeue.SlurmLock.Lock()
- defer squeue.SlurmLock.Unlock()
-
- // Also ensure unlock on all return paths
+ squeue.L.Lock()
+ defer squeue.L.Unlock()
cmd := squeueCmd()
- sq, err := cmd.StdoutPipe()
- if err != nil {
- log.Printf("Error creating stdout pipe for squeue: %v", err)
+ stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
+ cmd.Stdout, cmd.Stderr = stdout, stderr
+ if err := cmd.Run(); err != nil {
+ log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
return
}
- stderrReader, err := cmd.StderrPipe()
- if err != nil {
- log.Printf("Error creating stderr pipe for squeue: %v", err)
- return
+ uuids := strings.Split(stdout.String(), "\n")
+ squeue.hasUUID = make(map[string]bool, len(uuids))
+ for _, uuid := range uuids {
+ squeue.hasUUID[uuid] = true
}
-
- err = cmd.Start()
- if err != nil {
- log.Printf("Error running squeue: %v", err)
- return
- }
-
- stderrChan := make(chan []byte)
- go func() {
- b, _ := ioutil.ReadAll(stderrReader)
- stderrChan <- b
- close(stderrChan)
- }()
-
- scanner := bufio.NewScanner(sq)
- for scanner.Scan() {
- newSqueueContents = append(newSqueueContents, scanner.Text())
- }
- io.Copy(ioutil.Discard, sq)
-
- stderrmsg := <-stderrChan
-
- err = cmd.Wait()
-
- if scanner.Err() != nil {
- log.Printf("Error reading from squeue pipe: %v", err)
- }
- if err != nil {
- log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
- }
-
- if scanner.Err() == nil && err == nil {
- squeue.squeueCond.L.Lock()
- squeue.squeueContents = newSqueueContents
- squeue.squeueCond.Broadcast()
- squeue.squeueCond.L.Unlock()
- }
-}
-
-// CheckSqueue checks if a given container UUID is in the slurm queue. This
-// does not run squeue directly, but instead blocks until woken up by next
-// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) bool {
- squeue.squeueCond.L.Lock()
- // block until next squeue broadcast signaling an update.
- squeue.squeueCond.Wait()
- contents := squeue.squeueContents
- squeue.squeueCond.L.Unlock()
-
- for _, k := range contents {
- if k == uuid {
- return true
- }
- }
- return false
+ squeue.Broadcast()
}
-// StartMonitor starts the squeue monitoring goroutine.
-func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
- squeue.squeueCond = sync.NewCond(&sync.Mutex{})
- squeue.squeueDone = make(chan struct{})
- go squeue.SyncSqueue(pollInterval)
-}
-
-// Done stops the squeue monitoring goroutine.
-func (squeue *Squeue) Done() {
- squeue.squeueDone <- struct{}{}
- close(squeue.squeueDone)
-}
-
-// SyncSqueue periodically polls RunSqueue() at the given duration until
-// terminated by calling Done().
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
- ticker := time.NewTicker(pollInterval)
- for {
- select {
- case <-squeue.squeueDone:
- return
- case <-ticker.C:
- squeue.RunSqueue()
+// Initialize, and start a goroutine to call check() once per
+// squeue.Period until terminated by calling Stop().
+func (squeue *Squeue) start() {
+ squeue.L = &sync.Mutex{}
+ squeue.done = make(chan struct{})
+ go func() {
+ ticker := time.NewTicker(squeue.Period)
+ for {
+ select {
+ case <-squeue.done:
+ ticker.Stop()
+ return
+ case <-ticker.C:
+ squeue.check()
+ }
}
- }
+ }()
}
commit 65123c5a66fe155d6dad2cee3a1e0b90f7b7f3f2
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jan 30 11:33:28 2017 -0500
10700: Rephrase "should cancel" condition to be less unclear.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 86ae79a..f430aed 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -254,30 +254,25 @@ func run(dispatcher *dispatch.Dispatcher,
go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
for container = range status {
- if !(container.State == dispatch.Locked || container.State == dispatch.Running) {
- continue
- }
- if container.Priority != 0 {
- continue
- }
- log.Printf("Canceling container %s", container.UUID)
+ if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
+ log.Printf("Canceling container %s", container.UUID)
+ // Mutex between squeue sync and running sbatch or scancel.
+ squeueUpdater.SlurmLock.Lock()
+ cmd := scancelCmd(container)
+ msg, err := cmd.CombinedOutput()
+ squeueUpdater.SlurmLock.Unlock()
- // Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.SlurmLock.Lock()
- cmd := scancelCmd(container)
- msg, err := cmd.CombinedOutput()
- squeueUpdater.SlurmLock.Unlock()
-
- if err != nil {
- log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
- if squeueUpdater.CheckSqueue(container.UUID) {
- log.Printf("Container %s is still in squeue after scancel.", container.UUID)
- continue
+ if err != nil {
+ log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+ if squeueUpdater.CheckSqueue(container.UUID) {
+ log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+ continue
+ }
}
- }
- // Ignore errors; if necessary, we'll try again next time
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ // Ignore errors; if necessary, we'll try again next time
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
}
monitorDone = true
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list