[ARVADOS] updated: 0158347f6951d3c25b5c740caaf91d2d1644c8d2

Git user git at public.curoverse.com
Mon Jun 6 10:44:36 EDT 2016


Summary of changes:
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 17 +++--
 .../crunch-dispatch-slurm_test.go                  | 88 +++++++++++++---------
 services/crunch-dispatch-slurm/squeue.go           | 71 +++++++----------
 3 files changed, 90 insertions(+), 86 deletions(-)

       via  0158347f6951d3c25b5c740caaf91d2d1644c8d2 (commit)
       via  d45be86b354adec485504bfc09f41e0e22241f34 (commit)
      from  bb10b7777ed6db229fbb35e6a829bec4e8efcd23 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0158347f6951d3c25b5c740caaf91d2d1644c8d2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 10:44:11 2016 -0400

    9187: Remove "squeueError" because checkSqueue for a successful squeue run.  Refactor tests a bit and add a test for canceling containers.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 1dada2f..f718fbc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -78,8 +78,14 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
 		fmt.Sprintf("--priority=%d", container.Priority))
 }
 
+// scancelCmd
+func scancelFunc(container dispatch.Container) *exec.Cmd {
+	return exec.Command("scancel", "--name="+container.UUID)
+}
+
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
+var scancelCmd = scancelFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher,
@@ -178,10 +184,7 @@ func submit(dispatcher *dispatch.Dispatcher,
 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
 	submitted := false
 	for !*monitorDone {
-		if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
-			// Most recent run of squeue had an error, so do nothing.
-			continue
-		} else if inQ {
+		if squeueUpdater.CheckSqueue(container.UUID) {
 			// Found in the queue, so continue monitoring
 			submitted = true
 		} else if container.State == dispatch.Locked && !submitted {
@@ -249,15 +252,13 @@ func run(dispatcher *dispatch.Dispatcher,
 
 				// Mutex between squeue sync and running sbatch or scancel.
 				squeueUpdater.SlurmLock.Lock()
-				err := exec.Command("scancel", "--name="+container.UUID).Run()
+				err := scancelCmd(container).Run()
 				squeueUpdater.SlurmLock.Unlock()
 
 				if err != nil {
 					log.Printf("Error stopping container %s with scancel: %v",
 						container.UUID, err)
-					if inQ, err := squeueUpdater.CheckSqueue(container.UUID); err != nil {
-						continue
-					} else if inQ {
+					if squeueUpdater.CheckSqueue(container.UUID) {
 						log.Printf("Container %s is still in squeue after scancel.",
 							container.UUID)
 						continue
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index be347e4..cddbe8c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -8,7 +8,6 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"io"
 	"log"
-	"math"
 	"net/http"
 	"net/http/httptest"
 	"os"
@@ -58,14 +57,60 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-	s.integrationTest(c, false)
+	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+		[]string(nil),
+		func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+			dispatcher.UpdateState(container.UUID, dispatch.Running)
+			time.Sleep(3 * time.Second)
+			dispatcher.UpdateState(container.UUID, dispatch.Complete)
+		})
+	c.Check(container.State, Equals, "Complete")
+}
+
+func (s *TestSuite) TestIntegrationCancel(c *C) {
+
+	// Override sbatchCmd
+	var scancelCmdLine []string
+	defer func(orig func(dispatch.Container) *exec.Cmd) {
+		scancelCmd = orig
+	}(scancelCmd)
+	scancelCmd = func(container dispatch.Container) *exec.Cmd {
+		scancelCmdLine = scancelFunc(container).Args
+		return exec.Command("echo")
+	}
+
+	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+		[]string(nil),
+		func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+			dispatcher.UpdateState(container.UUID, dispatch.Running)
+			time.Sleep(1 * time.Second)
+			dispatcher.Arv.Update("containers", container.UUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"priority": 0}},
+				nil)
+		})
+	c.Check(container.State, Equals, "Cancelled")
+	c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-	s.integrationTest(c, true)
+	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
+		fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
+		fmt.Sprintf("--mem-per-cpu=%d", 2862),
+		fmt.Sprintf("--cpus-per-task=%d", 4),
+		fmt.Sprintf("--priority=%d", 1)},
+		func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+			dispatcher.UpdateState(container.UUID, dispatch.Running)
+			time.Sleep(3 * time.Second)
+			dispatcher.UpdateState(container.UUID, dispatch.Complete)
+		})
+	c.Check(container.State, Equals, "Cancelled")
 }
 
-func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+func (s *TestSuite) integrationTest(c *C,
+	newSqueueCmd func() *exec.Cmd,
+	sbatchCmdComps []string,
+	runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
 	arvadostest.ResetEnv()
 
 	arv, err := arvadosclient.MakeArvadosClient()
@@ -86,13 +131,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 	defer func(orig func() *exec.Cmd) {
 		squeueCmd = orig
 	}(squeueCmd)
-	squeueCmd = func() *exec.Cmd {
-		if missingFromSqueue {
-			return exec.Command("echo")
-		} else {
-			return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
-		}
-	}
+	squeueCmd = newSqueueCmd
 
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
@@ -113,11 +152,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 		RunContainer: func(dispatcher *dispatch.Dispatcher,
 			container dispatch.Container,
 			status chan dispatch.Container) {
-			go func() {
-				dispatcher.UpdateState(container.UUID, dispatch.Running)
-				time.Sleep(3 * time.Second)
-				dispatcher.UpdateState(container.UUID, dispatch.Complete)
-			}()
+			go runContainer(dispatcher, container)
 			run(dispatcher, container, status)
 			doneProcessing <- struct{}{}
 		},
@@ -130,20 +165,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 
 	squeueUpdater.Done()
 
-	item := containers.Items[0]
-	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
-		fmt.Sprintf("--job-name=%s", item.UUID),
-		fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
-		fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
-		fmt.Sprintf("--priority=%d", item.Priority)}
-
-	if missingFromSqueue {
-		// not in squeue when run() started, so it will have called sbatch
-		c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
-	} else {
-		// already in squeue when run() started, will have just monitored it instead
-		c.Check(sbatchCmdLine, DeepEquals, []string(nil))
-	}
+	c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
 	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
@@ -154,11 +176,7 @@ func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
 	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
-	if missingFromSqueue {
-		c.Check(container.State, Equals, "Cancelled")
-	} else {
-		c.Check(container.State, Equals, "Complete")
-	}
+	return container
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 3ee8b6f..68983b8 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -13,7 +13,6 @@ import (
 type Squeue struct {
 	squeueContents []string
 	squeueDone     chan struct{}
-	squeueError    error
 	squeueCond     *sync.Cond
 	SlurmLock      sync.Mutex
 }
@@ -25,10 +24,10 @@ func squeueFunc() *exec.Cmd {
 
 var squeueCmd = squeueFunc
 
-// RunSqueue runs squeue once and captures the output.  If there is an error,
-// set "squeueError".  If it succeeds, set "squeueContents" and then wake up
-// any goroutines waiting squeueCond in CheckSqueue().
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output.  If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue().  If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
 	var newSqueueContents []string
 
 	// Mutex between squeue sync and running sbatch or scancel.  This
@@ -39,15 +38,12 @@ func (squeue *Squeue) RunSqueue() error {
 	defer squeue.SlurmLock.Unlock()
 
 	// Also ensure unlock on all return paths
-	defer squeue.squeueCond.L.Unlock()
 
 	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
 		log.Printf("Error creating stdout pipe for squeue: %v", err)
-		squeue.squeueCond.L.Lock()
-		squeue.squeueError = err
-		return err
+		return
 	}
 	cmd.Start()
 	scanner := bufio.NewScanner(sq)
@@ -57,48 +53,37 @@ func (squeue *Squeue) RunSqueue() error {
 	if err := scanner.Err(); err != nil {
 		cmd.Wait()
 		log.Printf("Error reading from squeue pipe: %v", err)
-		squeue.squeueCond.L.Lock()
-		squeue.squeueError = err
-		return err
+		return
 	}
 
 	err = cmd.Wait()
 	if err != nil {
 		log.Printf("Error running squeue: %v", err)
-		squeue.squeueCond.L.Lock()
-		squeue.squeueError = err
-		return err
+		return
 	}
 
 	squeue.squeueCond.L.Lock()
-	squeue.squeueError = nil
 	squeue.squeueContents = newSqueueContents
 	squeue.squeueCond.Broadcast()
-
-	return nil
+	squeue.squeueCond.L.Unlock()
 }
 
 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
 // does not run squeue directly, but instead blocks until woken up by next
 // successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
 	squeue.squeueCond.L.Lock()
 	// block until next squeue broadcast signaling an update.
 	squeue.squeueCond.Wait()
-	if squeue.squeueError != nil {
-		e := squeue.squeueError
-		squeue.squeueCond.L.Unlock()
-		return false, e
-	}
 	contents := squeue.squeueContents
 	squeue.squeueCond.L.Unlock()
 
 	for _, k := range contents {
 		if k == uuid {
-			return true, nil
+			return true
 		}
 	}
-	return false, nil
+	return false
 }
 
 // StartMonitor starts the squeue monitoring goroutine.

commit d45be86b354adec485504bfc09f41e0e22241f34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 3 17:57:48 2016 -0400

    9187: Fix refactoring messup

diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index e157469..3ee8b6f 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -35,18 +35,18 @@ func (squeue *Squeue) RunSqueue() error {
 	// establishes a sequence so that squeue doesn't run concurrently with
 	// sbatch or scancel; the next update of squeue will occur only after
 	// sbatch or scancel has completed.
-	squeueUpdater.SlurmLock.Lock()
-	defer squeueUpdater.SlurmLock.Unlock()
+	squeue.SlurmLock.Lock()
+	defer squeue.SlurmLock.Unlock()
 
 	// Also ensure unlock on all return paths
-	defer squeueUpdater.squeueCond.L.Unlock()
+	defer squeue.squeueCond.L.Unlock()
 
 	cmd := squeueCmd()
 	sq, err := cmd.StdoutPipe()
 	if err != nil {
 		log.Printf("Error creating stdout pipe for squeue: %v", err)
-		squeueUpdater.squeueCond.L.Lock()
-		squeueUpdater.squeueError = err
+		squeue.squeueCond.L.Lock()
+		squeue.squeueError = err
 		return err
 	}
 	cmd.Start()
@@ -57,23 +57,23 @@ func (squeue *Squeue) RunSqueue() error {
 	if err := scanner.Err(); err != nil {
 		cmd.Wait()
 		log.Printf("Error reading from squeue pipe: %v", err)
-		squeueUpdater.squeueCond.L.Lock()
-		squeueUpdater.squeueError = err
+		squeue.squeueCond.L.Lock()
+		squeue.squeueError = err
 		return err
 	}
 
 	err = cmd.Wait()
 	if err != nil {
 		log.Printf("Error running squeue: %v", err)
-		squeueUpdater.squeueCond.L.Lock()
-		squeueUpdater.squeueError = err
+		squeue.squeueCond.L.Lock()
+		squeue.squeueError = err
 		return err
 	}
 
-	squeueUpdater.squeueCond.L.Lock()
-	squeueUpdater.squeueError = nil
-	squeueUpdater.squeueContents = newSqueueContents
-	squeueUpdater.squeueCond.Broadcast()
+	squeue.squeueCond.L.Lock()
+	squeue.squeueError = nil
+	squeue.squeueContents = newSqueueContents
+	squeue.squeueCond.Broadcast()
 
 	return nil
 }
@@ -82,16 +82,16 @@ func (squeue *Squeue) RunSqueue() error {
 // does not run squeue directly, but instead blocks until woken up by next
 // successful update of squeue.
 func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
-	squeueUpdater.squeueCond.L.Lock()
+	squeue.squeueCond.L.Lock()
 	// block until next squeue broadcast signaling an update.
-	squeueUpdater.squeueCond.Wait()
-	if squeueUpdater.squeueError != nil {
-		e := squeueUpdater.squeueError
-		squeueUpdater.squeueCond.L.Unlock()
+	squeue.squeueCond.Wait()
+	if squeue.squeueError != nil {
+		e := squeue.squeueError
+		squeue.squeueCond.L.Unlock()
 		return false, e
 	}
-	contents := squeueUpdater.squeueContents
-	squeueUpdater.squeueCond.L.Unlock()
+	contents := squeue.squeueContents
+	squeue.squeueCond.L.Unlock()
 
 	for _, k := range contents {
 		if k == uuid {
@@ -103,16 +103,16 @@ func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
 
 // StartMonitor starts the squeue monitoring goroutine.
 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
-	squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
-	squeueUpdater.squeueDone = make(chan struct{})
-	squeueUpdater.RunSqueue()
-	go squeueUpdater.SyncSqueue(pollInterval)
+	squeue.squeueCond = sync.NewCond(&sync.Mutex{})
+	squeue.squeueDone = make(chan struct{})
+	squeue.RunSqueue()
+	go squeue.SyncSqueue(pollInterval)
 }
 
 // Done stops the squeue monitoring goroutine.
 func (squeue *Squeue) Done() {
-	squeueUpdater.squeueDone <- struct{}{}
-	close(squeueUpdater.squeueDone)
+	squeue.squeueDone <- struct{}{}
+	close(squeue.squeueDone)
 }
 
 // SyncSqueue periodically polls RunSqueue() at the given duration until
@@ -121,10 +121,10 @@ func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
 	ticker := time.NewTicker(pollInterval)
 	for {
 		select {
-		case <-squeueUpdater.squeueDone:
+		case <-squeue.squeueDone:
 			return
 		case <-ticker.C:
-			squeueUpdater.RunSqueue()
+			squeue.RunSqueue()
 		}
 	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list