[ARVADOS] updated: d77c4cc58d393c48ce46b987f6eada7c7cc381c6

Git user git at public.curoverse.com
Thu Jun 2 22:07:10 EDT 2016


Summary of changes:
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 210 +++++++--------------
 .../crunch-dispatch-slurm_test.go                  |   6 +-
 services/crunch-dispatch-slurm/squeue.go           | 120 ++++++++++++
 3 files changed, 187 insertions(+), 149 deletions(-)
 create mode 100644 services/crunch-dispatch-slurm/squeue.go

       via  d77c4cc58d393c48ce46b987f6eada7c7cc381c6 (commit)
      from  3ae9a789410e93eeb31ca5670c17a6d03d77f608 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d77c4cc58d393c48ce46b987f6eada7c7cc381c6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 2 17:59:20 2016 -0400

    9187: Improve squeue synchronization
    
    * Put squeue functions into separate file.
    
    * CheckSqueue() now blocks on a condition variable until the next successful
    update of squeue, which then wakes up all goroutines waiting on CheckSqueue().
    
    * Never do anything when squeue returns an error.
    
    * Merge submitting, monitoring, and cleanup behaviors into a single goroutine
    which updates based on CheckSqueue() instead of a ticker.
    
    * Introduce a lock on squeue, sbatch and scancel operations, so that on next
    wakeup the queue is guaranteed to reflect most recent sbatch/scancel
    operations.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 3e14820..1dada2f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,7 +3,6 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-	"bufio"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -14,16 +13,9 @@ import (
 	"os"
 	"os/exec"
 	"strings"
-	"sync"
 	"time"
 )
 
-type Squeue struct {
-	sync.Mutex
-	squeueContents []string
-	SqueueDone     chan struct{}
-}
-
 func main() {
 	err := doMain()
 	if err != nil {
@@ -59,23 +51,20 @@ func doMain() error {
 	}
 	arv.Retries = 25
 
+	squeueUpdater.StartMonitor(time.Duration(*pollInterval) * time.Second)
+	defer squeueUpdater.Done()
+
 	dispatcher := dispatch.Dispatcher{
 		Arv:            arv,
 		RunContainer:   run,
 		PollInterval:   time.Duration(*pollInterval) * time.Second,
 		DoneProcessing: make(chan struct{})}
 
-	squeueUpdater.SqueueDone = make(chan struct{})
-	go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
-
 	err = dispatcher.RunDispatcher()
 	if err != nil {
 		return err
 	}
 
-	squeueUpdater.SqueueDone <- struct{}{}
-	close(squeueUpdater.SqueueDone)
-
 	return nil
 }
 
@@ -89,14 +78,8 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
 		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
-// squeueFunc
-func squeueFunc() *exec.Cmd {
-	return exec.Command("squeue", "--format=%j")
-}
-
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
-var squeueCmd = squeueFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher,
@@ -139,6 +122,10 @@ func submit(dispatcher *dispatch.Dispatcher,
 		return
 	}
 
+	// Mutex between squeue sync and running sbatch or scancel.
+	squeueUpdater.SlurmLock.Lock()
+	defer squeueUpdater.SlurmLock.Unlock()
+
 	err := cmd.Start()
 	if err != nil {
 		submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
@@ -183,90 +170,24 @@ func submit(dispatcher *dispatch.Dispatcher,
 	return
 }
 
-func (squeue *Squeue) runSqueue() ([]string, error) {
-	var newSqueueContents []string
-
-	cmd := squeueCmd()
-	sq, err := cmd.StdoutPipe()
-	if err != nil {
-		return nil, err
-	}
-	cmd.Start()
-	scanner := bufio.NewScanner(sq)
-	for scanner.Scan() {
-		newSqueueContents = append(newSqueueContents, scanner.Text())
-	}
-	if err := scanner.Err(); err != nil {
-		cmd.Wait()
-		return nil, err
-	}
-
-	err = cmd.Wait()
-	if err != nil {
-		return nil, err
-	}
-
-	return newSqueueContents, nil
-}
-
-func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
-	if check {
-		n, err := squeue.runSqueue()
-		if err != nil {
-			return false, err
-		}
-		squeue.Lock()
-		squeue.squeueContents = n
-		squeue.Unlock()
-	}
-
-	if uuid != "" {
-		squeue.Lock()
-		defer squeue.Unlock()
-		for _, k := range squeue.squeueContents {
-			if k == uuid {
-				return true, nil
-			}
-		}
-	}
-	return false, nil
-}
-
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
-	// TODO: considering using "squeue -i" instead of polling squeue.
-	ticker := time.NewTicker(pollInterval)
-	for {
-		select {
-		case <-squeueUpdater.SqueueDone:
-			return
-		case <-ticker.C:
-			squeue.CheckSqueue("", true)
-		}
-	}
-}
-
-// Run or monitor a container.
-//
 // If the container is marked as Locked, check if it is already in the slurm
 // queue.  If not, submit it.
 //
 // If the container is marked as Running, check if it is in the slurm queue.
 // If not, mark it as Cancelled.
-//
-// Monitor status updates.  If the priority changes to zero, cancel the
-// container using scancel.
-func run(dispatcher *dispatch.Dispatcher,
-	container dispatch.Container,
-	status chan dispatch.Container) {
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
+	submitted := false
+	for !*monitorDone {
+		if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
+			// Most recent run of squeue had an error, so do nothing.
+			continue
+		} else if inQ {
+			// Found in the queue, so continue monitoring
+			submitted = true
+		} else if container.State == dispatch.Locked && !submitted {
+			// Not in queue but in Locked state and we haven't
+			// submitted it yet, so submit it.
 
-	uuid := container.UUID
-
-	if container.State == dispatch.Locked {
-		if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
-			// maybe squeue is broken, put it back in the queue
-			log.Printf("Error running squeue: %v", err)
-			dispatcher.UpdateState(container.UUID, dispatch.Queued)
-		} else if !inQ {
 			log.Printf("About to submit queued container %v", container.UUID)
 
 			if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
@@ -275,64 +196,66 @@ func run(dispatcher *dispatch.Dispatcher,
 				// maybe sbatch is broken, put it back to queued
 				dispatcher.UpdateState(container.UUID, dispatch.Queued)
 			}
+			submitted = true
+		} else {
+			// Not in queue and we are not going to submit it.
+			// Refresh the container state. If it is
+			// Complete/Cancelled, do nothing, if it is Locked then
+			// release it back to the Queue, if it is Running then
+			// clean up the record.
+
+			var con dispatch.Container
+			err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
+			if err != nil {
+				log.Printf("Error getting final container state: %v", err)
+			}
+
+			var st string
+			switch con.State {
+			case dispatch.Locked:
+				st = dispatch.Queued
+			case dispatch.Running:
+				st = dispatch.Cancelled
+			default:
+				// Container state is Queued, Complete or Cancelled so stop monitoring it.
+				return
+			}
+
+			log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+				container.UUID, con.State, st)
+			dispatcher.UpdateState(container.UUID, st)
 		}
 	}
+}
 
-	log.Printf("Monitoring container %v started", uuid)
-
-	// periodically check squeue
-	doneSqueue := make(chan struct{})
-	go func() {
-		squeueUpdater.CheckSqueue(container.UUID, true)
-		ticker := time.NewTicker(dispatcher.PollInterval)
-		for {
-			select {
-			case <-ticker.C:
-				if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
-					log.Printf("Error running squeue: %v", err)
-					// don't cancel, just leave it the way it is
-				} else if !inQ {
-					var con dispatch.Container
-					err := dispatcher.Arv.Get("containers", uuid, nil, &con)
-					if err != nil {
-						log.Printf("Error getting final container state: %v", err)
-					}
+// Run or monitor a container.
+//
+// Monitor status updates.  If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+	container dispatch.Container,
+	status chan dispatch.Container) {
 
-					var st string
-					switch con.State {
-					case dispatch.Locked:
-						st = dispatch.Queued
-					case dispatch.Running:
-						st = dispatch.Cancelled
-					default:
-						st = ""
-					}
+	log.Printf("Monitoring container %v started", container.UUID)
+	defer log.Printf("Monitoring container %v finished", container.UUID)
 
-					if st != "" {
-						log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-							uuid, con.State, st)
-						dispatcher.UpdateState(uuid, st)
-					}
-				}
-			case <-doneSqueue:
-				close(doneSqueue)
-				ticker.Stop()
-				return
-			}
-		}
-	}()
+	monitorDone := false
+	go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
 
 	for container = range status {
 		if container.State == dispatch.Locked || container.State == dispatch.Running {
 			if container.Priority == 0 {
 				log.Printf("Canceling container %s", container.UUID)
 
+				// Mutex between squeue sync and running sbatch or scancel.
+				squeueUpdater.SlurmLock.Lock()
 				err := exec.Command("scancel", "--name="+container.UUID).Run()
+				squeueUpdater.SlurmLock.Unlock()
+
 				if err != nil {
 					log.Printf("Error stopping container %s with scancel: %v",
 						container.UUID, err)
-					if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
-						log.Printf("Error running squeue: %v", err)
+					if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
 						continue
 					} else if inQ {
 						log.Printf("Container %s is still in squeue after scancel.",
@@ -345,8 +268,5 @@ func run(dispatcher *dispatch.Dispatcher,
 			}
 		}
 	}
-
-	doneSqueue <- struct{}{}
-
-	log.Printf("Monitoring container %v finished", uuid)
+	monitorDone = true
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index d30c5df..be347e4 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -123,14 +123,12 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 		},
 		DoneProcessing: doneProcessing}
 
-	squeueUpdater.SqueueDone = make(chan struct{})
-	go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+	squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
 
 	err = dispatcher.RunDispatcher()
 	c.Assert(err, IsNil)
 
-	squeueUpdater.SqueueDone <- struct{}{}
-	close(squeueUpdater.SqueueDone)
+	squeueUpdater.Done()
 
 	item := containers.Items[0]
 	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
new file mode 100644
index 0000000..b86a4d9
--- /dev/null
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -0,0 +1,120 @@
+package main
+
+import (
+	"bufio"
+	"log"
+	"os/exec"
+	"sync"
+	"time"
+)
+
+type Squeue struct {
+	squeueContents []string
+	squeueDone     chan struct{}
+	squeueError    error
+	squeueCond     *sync.Cond
+	SlurmLock      sync.Mutex
+}
+
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+	return exec.Command("squeue", "--format=%j")
+}
+
+var squeueCmd = squeueFunc
+
+func (squeue *Squeue) RunSqueue() error {
+	var newSqueueContents []string
+
+	// Mutex between squeue sync and running sbatch or scancel.  This
+	// establishes a sequence so that squeue doesn't run concurrently with
+	// sbatch or scancel; the next update of squeue will occur only after
+	// sbatch or scancel has completed.
+	squeueUpdater.SlurmLock.Lock()
+	defer squeueUpdater.SlurmLock.Unlock()
+
+	// Also ensure unlock on all return paths
+	defer squeueUpdater.squeueCond.L.Unlock()
+
+	cmd := squeueCmd()
+	sq, err := cmd.StdoutPipe()
+	if err != nil {
+		log.Printf("Error creating stdout pipe for squeue: %v", err)
+		squeueUpdater.squeueCond.L.Lock()
+		squeueUpdater.squeueError = err
+		return err
+	}
+	cmd.Start()
+	scanner := bufio.NewScanner(sq)
+	for scanner.Scan() {
+		newSqueueContents = append(newSqueueContents, scanner.Text())
+	}
+	if err := scanner.Err(); err != nil {
+		cmd.Wait()
+		log.Printf("Error reading from squeue pipe: %v", err)
+		squeueUpdater.squeueCond.L.Lock()
+		squeueUpdater.squeueError = err
+		return err
+	}
+
+	err = cmd.Wait()
+	if err != nil {
+		log.Printf("Error running squeue: %v", err)
+		squeueUpdater.squeueCond.L.Lock()
+		squeueUpdater.squeueError = err
+		return err
+	}
+
+	squeueUpdater.squeueCond.L.Lock()
+	squeueUpdater.squeueError = nil
+	squeueUpdater.squeueContents = newSqueueContents
+	squeueUpdater.squeueCond.Broadcast()
+
+	return nil
+}
+
+// Check if a container UUID is in the slurm queue.  This will block until the
+// next successful update from SLURM.
+func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+	squeueUpdater.squeueCond.L.Lock()
+	// block until next squeue broadcast signaling an update.
+	squeueUpdater.squeueCond.Wait()
+	if squeueUpdater.squeueError != nil {
+		e := squeueUpdater.squeueError
+		squeueUpdater.squeueCond.L.Unlock()
+		return false, e
+	}
+	contents := squeueUpdater.squeueContents
+	squeueUpdater.squeueCond.L.Unlock()
+
+	for _, k := range contents {
+		if k == uuid {
+			return true, nil
+		}
+	}
+	return false, nil
+}
+
+func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
+	squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
+	squeueUpdater.squeueDone = make(chan struct{})
+	squeueUpdater.RunSqueue()
+	go squeueUpdater.SyncSqueue(pollInterval)
+}
+
+func (squeue *Squeue) Done() {
+	squeueUpdater.squeueDone <- struct{}{}
+	close(squeueUpdater.squeueDone)
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+	ticker := time.NewTicker(pollInterval)
+	for {
+		select {
+		case <-squeueUpdater.squeueDone:
+			return
+		case <-ticker.C:
+			squeueUpdater.RunSqueue()
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list