[ARVADOS] updated: a19ffad966b25b3869e666f749f7c6da187bef68

Git user git at public.curoverse.com
Mon Jan 30 19:24:28 EST 2017


Summary of changes:
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go |  51 +++----
 .../crunch-dispatch-slurm_test.go                  |   4 +-
 services/crunch-dispatch-slurm/squeue.go           | 164 ++++++++-------------
 3 files changed, 85 insertions(+), 134 deletions(-)

       via  a19ffad966b25b3869e666f749f7c6da187bef68 (commit)
       via  1bc602ad5480b9b1ed78b318e9d3d9749d2b83ab (commit)
       via  65123c5a66fe155d6dad2cee3a1e0b90f7b7f3f2 (commit)
      from  e34a5060cfc1cc4821b431e8aa6778a31898e0eb (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a19ffad966b25b3869e666f749f7c6da187bef68
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Jan 30 19:19:49 2017 -0500

    10700: Rename squeue identifiers (sqCheck = SqueueChecker{})

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 3cc0f8f..60dc607 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -44,8 +44,8 @@ func main() {
 }
 
 var (
-	theConfig     Config
-	squeueUpdater Squeue
+	theConfig Config
+	sqCheck   SqueueChecker
 )
 
 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -98,8 +98,8 @@ func doMain() error {
 	}
 	arv.Retries = 25
 
-	squeueUpdater = Squeue{Period: time.Duration(theConfig.PollPeriod)}
-	defer squeueUpdater.Stop()
+	sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+	defer sqCheck.Stop()
 
 	dispatcher := dispatch.Dispatcher{
 		Arv:            arv,
@@ -168,8 +168,8 @@ func submit(dispatcher *dispatch.Dispatcher,
 	cmd.Stderr = &stderr
 
 	// Mutex between squeue sync and running sbatch or scancel.
-	squeueUpdater.L.Lock()
-	defer squeueUpdater.L.Unlock()
+	sqCheck.L.Lock()
+	defer sqCheck.L.Unlock()
 
 	log.Printf("exec sbatch %+q", cmd.Args)
 	err := cmd.Run()
@@ -192,7 +192,7 @@ func submit(dispatcher *dispatch.Dispatcher,
 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
 	submitted := false
 	for !*monitorDone {
-		if squeueUpdater.HasUUID(container.UUID) {
+		if sqCheck.HasUUID(container.UUID) {
 			// Found in the queue, so continue monitoring
 			submitted = true
 		} else if container.State == dispatch.Locked && !submitted {
@@ -257,14 +257,14 @@ func run(dispatcher *dispatch.Dispatcher,
 		if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
 			log.Printf("Canceling container %s", container.UUID)
 			// Mutex between squeue sync and running sbatch or scancel.
-			squeueUpdater.L.Lock()
+			sqCheck.L.Lock()
 			cmd := scancelCmd(container)
 			msg, err := cmd.CombinedOutput()
-			squeueUpdater.L.Unlock()
+			sqCheck.L.Unlock()
 
 			if err != nil {
 				log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
-				if squeueUpdater.HasUUID(container.UUID) {
+				if sqCheck.HasUUID(container.UUID) {
 					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
 					continue
 				}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 152e2e0..8809e7b 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -158,12 +158,12 @@ func (s *TestSuite) integrationTest(c *C,
 		},
 	}
 
-	squeueUpdater = Squeue{Period: 500 * time.Millisecond}
+	sqCheck = SqueueChecker{Period: 500 * time.Millisecond}
 
 	err = dispatcher.Run()
 	c.Assert(err, IsNil)
 
-	squeueUpdater.Stop()
+	sqCheck.Stop()
 
 	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index c1bbe92..3bebe56 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -11,7 +11,7 @@ import (
 
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
-type Squeue struct {
+type SqueueChecker struct {
 	Period    time.Duration
 	hasUUID   map[string]bool
 	startOnce sync.Once
@@ -28,35 +28,35 @@ var squeueCmd = squeueFunc
 // HasUUID checks if a given container UUID is in the slurm queue.
 // This does not run squeue directly, but instead blocks until woken
 // up by next successful update of squeue.
-func (squeue *Squeue) HasUUID(uuid string) bool {
-	squeue.startOnce.Do(squeue.start)
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+	sqc.startOnce.Do(sqc.start)
 
-	squeue.L.Lock()
-	defer squeue.L.Unlock()
+	sqc.L.Lock()
+	defer sqc.L.Unlock()
 
 	// block until next squeue broadcast signaling an update.
-	squeue.Wait()
-	return squeue.hasUUID[uuid]
+	sqc.Wait()
+	return sqc.hasUUID[uuid]
 }
 
 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
 // after calling Stop.
-func (squeue *Squeue) Stop() {
-	if squeue.done != nil {
-		close(squeue.done)
+func (sqc *SqueueChecker) Stop() {
+	if sqc.done != nil {
+		close(sqc.done)
 	}
 }
 
 // check gets the names of jobs in the SLURM queue (running and
 // queued). If it succeeds, it updates squeue.hasUUID and wakes up any
 // goroutines that are waiting in HasUUID().
-func (squeue *Squeue) check() {
+func (sqc *SqueueChecker) check() {
 	// Mutex between squeue sync and running sbatch or scancel.  This
 	// establishes a sequence so that squeue doesn't run concurrently with
 	// sbatch or scancel; the next update of squeue will occur only after
 	// sbatch or scancel has completed.
-	squeue.L.Lock()
-	defer squeue.L.Unlock()
+	sqc.L.Lock()
+	defer sqc.L.Unlock()
 
 	cmd := squeueCmd()
 	stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
@@ -67,27 +67,27 @@ func (squeue *Squeue) check() {
 	}
 
 	uuids := strings.Split(stdout.String(), "\n")
-	squeue.hasUUID = make(map[string]bool, len(uuids))
+	sqc.hasUUID = make(map[string]bool, len(uuids))
 	for _, uuid := range uuids {
-		squeue.hasUUID[uuid] = true
+		sqc.hasUUID[uuid] = true
 	}
-	squeue.Broadcast()
+	sqc.Broadcast()
 }
 
 // Initialize, and start a goroutine to call check() once per
 // squeue.Period until terminated by calling Stop().
-func (squeue *Squeue) start() {
-	squeue.L = &sync.Mutex{}
-	squeue.done = make(chan struct{})
+func (sqc *SqueueChecker) start() {
+	sqc.L = &sync.Mutex{}
+	sqc.done = make(chan struct{})
 	go func() {
-		ticker := time.NewTicker(squeue.Period)
+		ticker := time.NewTicker(sqc.Period)
 		for {
 			select {
-			case <-squeue.done:
+			case <-sqc.done:
 				ticker.Stop()
 				return
 			case <-ticker.C:
-				squeue.check()
+				sqc.check()
 			}
 		}
 	}()

commit 1bc602ad5480b9b1ed78b318e9d3d9749d2b83ab
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Jan 30 18:32:49 2017 -0500

    10700: Simplify squeue checker.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index f430aed..3cc0f8f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -98,8 +98,8 @@ func doMain() error {
 	}
 	arv.Retries = 25
 
-	squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
-	defer squeueUpdater.Done()
+	squeueUpdater = Squeue{Period: time.Duration(theConfig.PollPeriod)}
+	defer squeueUpdater.Stop()
 
 	dispatcher := dispatch.Dispatcher{
 		Arv:            arv,
@@ -168,8 +168,8 @@ func submit(dispatcher *dispatch.Dispatcher,
 	cmd.Stderr = &stderr
 
 	// Mutex between squeue sync and running sbatch or scancel.
-	squeueUpdater.SlurmLock.Lock()
-	defer squeueUpdater.SlurmLock.Unlock()
+	squeueUpdater.L.Lock()
+	defer squeueUpdater.L.Unlock()
 
 	log.Printf("exec sbatch %+q", cmd.Args)
 	err := cmd.Run()
@@ -192,7 +192,7 @@ func submit(dispatcher *dispatch.Dispatcher,
 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
 	submitted := false
 	for !*monitorDone {
-		if squeueUpdater.CheckSqueue(container.UUID) {
+		if squeueUpdater.HasUUID(container.UUID) {
 			// Found in the queue, so continue monitoring
 			submitted = true
 		} else if container.State == dispatch.Locked && !submitted {
@@ -257,14 +257,14 @@ func run(dispatcher *dispatch.Dispatcher,
 		if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
 			log.Printf("Canceling container %s", container.UUID)
 			// Mutex between squeue sync and running sbatch or scancel.
-			squeueUpdater.SlurmLock.Lock()
+			squeueUpdater.L.Lock()
 			cmd := scancelCmd(container)
 			msg, err := cmd.CombinedOutput()
-			squeueUpdater.SlurmLock.Unlock()
+			squeueUpdater.L.Unlock()
 
 			if err != nil {
 				log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
-				if squeueUpdater.CheckSqueue(container.UUID) {
+				if squeueUpdater.HasUUID(container.UUID) {
 					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
 					continue
 				}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index de25fc3..152e2e0 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -158,12 +158,12 @@ func (s *TestSuite) integrationTest(c *C,
 		},
 	}
 
-	squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
+	squeueUpdater = Squeue{Period: 500 * time.Millisecond}
 
 	err = dispatcher.Run()
 	c.Assert(err, IsNil)
 
-	squeueUpdater.Done()
+	squeueUpdater.Stop()
 
 	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 45d06c8..c1bbe92 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -1,11 +1,10 @@
 package main
 
 import (
-	"bufio"
-	"io"
-	"io/ioutil"
+	"bytes"
 	"log"
 	"os/exec"
+	"strings"
 	"sync"
 	"time"
 )
@@ -13,126 +12,83 @@ import (
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
 type Squeue struct {
-	squeueContents []string
-	squeueDone     chan struct{}
-	squeueCond     *sync.Cond
-	SlurmLock      sync.Mutex
+	Period    time.Duration
+	hasUUID   map[string]bool
+	startOnce sync.Once
+	done      chan struct{}
+	sync.Cond
 }
 
-// squeueFunc
 func squeueFunc() *exec.Cmd {
 	return exec.Command("squeue", "--all", "--format=%j")
 }
 
 var squeueCmd = squeueFunc
 
-// RunSqueue runs squeue once and captures the output.  If it succeeds, set
-// "squeueContents" and then wake up any goroutines waiting squeueCond in
-// CheckSqueue().  If there was an error, log it and leave the threads blocked.
-func (squeue *Squeue) RunSqueue() {
-	var newSqueueContents []string
+// HasUUID checks if a given container UUID is in the slurm queue.
+// This does not run squeue directly, but instead blocks until woken
+// up by next successful update of squeue.
+func (squeue *Squeue) HasUUID(uuid string) bool {
+	squeue.startOnce.Do(squeue.start)
 
+	squeue.L.Lock()
+	defer squeue.L.Unlock()
+
+	// block until next squeue broadcast signaling an update.
+	squeue.Wait()
+	return squeue.hasUUID[uuid]
+}
+
+// Stop stops the squeue monitoring goroutine. Do not call HasUUID
+// after calling Stop.
+func (squeue *Squeue) Stop() {
+	if squeue.done != nil {
+		close(squeue.done)
+	}
+}
+
+// check gets the names of jobs in the SLURM queue (running and
+// queued). If it succeeds, it updates squeue.hasUUID and wakes up any
+// goroutines that are waiting in HasUUID().
+func (squeue *Squeue) check() {
 	// Mutex between squeue sync and running sbatch or scancel.  This
 	// establishes a sequence so that squeue doesn't run concurrently with
 	// sbatch or scancel; the next update of squeue will occur only after
 	// sbatch or scancel has completed.
-	squeue.SlurmLock.Lock()
-	defer squeue.SlurmLock.Unlock()
-
-	// Also ensure unlock on all return paths
+	squeue.L.Lock()
+	defer squeue.L.Unlock()
 
 	cmd := squeueCmd()
-	sq, err := cmd.StdoutPipe()
-	if err != nil {
-		log.Printf("Error creating stdout pipe for squeue: %v", err)
+	stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
+	cmd.Stdout, cmd.Stderr = stdout, stderr
+	if err := cmd.Run(); err != nil {
+		log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
 		return
 	}
 
-	stderrReader, err := cmd.StderrPipe()
-	if err != nil {
-		log.Printf("Error creating stderr pipe for squeue: %v", err)
-		return
+	uuids := strings.Split(stdout.String(), "\n")
+	squeue.hasUUID = make(map[string]bool, len(uuids))
+	for _, uuid := range uuids {
+		squeue.hasUUID[uuid] = true
 	}
-
-	err = cmd.Start()
-	if err != nil {
-		log.Printf("Error running squeue: %v", err)
-		return
-	}
-
-	stderrChan := make(chan []byte)
-	go func() {
-		b, _ := ioutil.ReadAll(stderrReader)
-		stderrChan <- b
-		close(stderrChan)
-	}()
-
-	scanner := bufio.NewScanner(sq)
-	for scanner.Scan() {
-		newSqueueContents = append(newSqueueContents, scanner.Text())
-	}
-	io.Copy(ioutil.Discard, sq)
-
-	stderrmsg := <-stderrChan
-
-	err = cmd.Wait()
-
-	if scanner.Err() != nil {
-		log.Printf("Error reading from squeue pipe: %v", err)
-	}
-	if err != nil {
-		log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
-	}
-
-	if scanner.Err() == nil && err == nil {
-		squeue.squeueCond.L.Lock()
-		squeue.squeueContents = newSqueueContents
-		squeue.squeueCond.Broadcast()
-		squeue.squeueCond.L.Unlock()
-	}
-}
-
-// CheckSqueue checks if a given container UUID is in the slurm queue.  This
-// does not run squeue directly, but instead blocks until woken up by next
-// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) bool {
-	squeue.squeueCond.L.Lock()
-	// block until next squeue broadcast signaling an update.
-	squeue.squeueCond.Wait()
-	contents := squeue.squeueContents
-	squeue.squeueCond.L.Unlock()
-
-	for _, k := range contents {
-		if k == uuid {
-			return true
-		}
-	}
-	return false
+	squeue.Broadcast()
 }
 
-// StartMonitor starts the squeue monitoring goroutine.
-func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
-	squeue.squeueCond = sync.NewCond(&sync.Mutex{})
-	squeue.squeueDone = make(chan struct{})
-	go squeue.SyncSqueue(pollInterval)
-}
-
-// Done stops the squeue monitoring goroutine.
-func (squeue *Squeue) Done() {
-	squeue.squeueDone <- struct{}{}
-	close(squeue.squeueDone)
-}
-
-// SyncSqueue periodically polls RunSqueue() at the given duration until
-// terminated by calling Done().
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
-	ticker := time.NewTicker(pollInterval)
-	for {
-		select {
-		case <-squeue.squeueDone:
-			return
-		case <-ticker.C:
-			squeue.RunSqueue()
+// Initialize, and start a goroutine to call check() once per
+// squeue.Period until terminated by calling Stop().
+func (squeue *Squeue) start() {
+	squeue.L = &sync.Mutex{}
+	squeue.done = make(chan struct{})
+	go func() {
+		ticker := time.NewTicker(squeue.Period)
+		for {
+			select {
+			case <-squeue.done:
+				ticker.Stop()
+				return
+			case <-ticker.C:
+				squeue.check()
+			}
 		}
-	}
+	}()
 }

commit 65123c5a66fe155d6dad2cee3a1e0b90f7b7f3f2
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Jan 30 11:33:28 2017 -0500

    10700: Rephrase "should cancel" condition to be less unclear.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 86ae79a..f430aed 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -254,30 +254,25 @@ func run(dispatcher *dispatch.Dispatcher,
 	go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
 
 	for container = range status {
-		if !(container.State == dispatch.Locked || container.State == dispatch.Running) {
-			continue
-		}
-		if container.Priority != 0 {
-			continue
-		}
-		log.Printf("Canceling container %s", container.UUID)
+		if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
+			log.Printf("Canceling container %s", container.UUID)
+			// Mutex between squeue sync and running sbatch or scancel.
+			squeueUpdater.SlurmLock.Lock()
+			cmd := scancelCmd(container)
+			msg, err := cmd.CombinedOutput()
+			squeueUpdater.SlurmLock.Unlock()
 
-		// Mutex between squeue sync and running sbatch or scancel.
-		squeueUpdater.SlurmLock.Lock()
-		cmd := scancelCmd(container)
-		msg, err := cmd.CombinedOutput()
-		squeueUpdater.SlurmLock.Unlock()
-
-		if err != nil {
-			log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
-			if squeueUpdater.CheckSqueue(container.UUID) {
-				log.Printf("Container %s is still in squeue after scancel.", container.UUID)
-				continue
+			if err != nil {
+				log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+				if squeueUpdater.CheckSqueue(container.UUID) {
+					log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+					continue
+				}
 			}
-		}
 
-		// Ignore errors; if necessary, we'll try again next time
-		dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+			// Ignore errors; if necessary, we'll try again next time
+			dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		}
 	}
 	monitorDone = true
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list