[ARVADOS] created: 26d7e1809376534b9060534e25399cbd462f38da

Git user git at public.curoverse.com
Fri May 5 17:11:51 EDT 2017


        at  26d7e1809376534b9060534e25399cbd462f38da (commit)


commit 26d7e1809376534b9060534e25399cbd462f38da
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 5 17:11:39 2017 -0400

    11626: Don't run scancel if the job is already gone from the queue.  Fix tests so there is no lingering goroutines.  Log sbatch errors where the user can see them.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 261444a..31c6d8d 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -148,12 +148,9 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
 		if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
 			log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
 		} else if alreadyTracking {
+			tracker.update(c)
 			switch c.State {
-			case Queued:
-				tracker.close()
-			case Locked, Running:
-				tracker.update(c)
-			case Cancelled, Complete:
+			case Queued, Cancelled, Complete:
 				tracker.close()
 			}
 		} else {
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index cca8b3f..a449169 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -226,13 +226,21 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 	if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
 		log.Printf("Submitting container %s to slurm", ctr.UUID)
 		if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
-			log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+			text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+			log.Printf(text)
+
+			lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+				"object_uuid": ctr.UUID,
+				"event_type":  "crunch-dispatch-slurm",
+				"properties":  map[string]string{"text": text}}}
+			disp.Arv.Create("logs", lr, nil)
+
 			disp.Unlock(ctr.UUID)
 			return
 		}
 	}
 
-	log.Printf("Start monitoring container %s", ctr.UUID)
+	log.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
 	defer log.Printf("Done monitoring container %s", ctr.UUID)
 
 	// If the container disappears from the slurm queue, there is
@@ -244,6 +252,7 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 		cancel()
 	}(ctr.UUID)
 
+	lastState := ctr.State
 	for {
 		select {
 		case <-ctx.Done():
@@ -259,12 +268,15 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
 			}
 			return
 		case updated, ok := <-status:
-			if !ok {
-				log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
-				scancel(ctr)
-			} else if updated.Priority == 0 {
-				log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
-				scancel(ctr)
+			if ok && lastState != updated.State || updated.Priority == 0 {
+				log.Printf("Container %s has state %q, priority %d", ctr.UUID, updated.State, updated.Priority)
+				lastState = updated.State
+			}
+			if !ok || updated.Priority == 0 {
+				if sqCheck.HasUUID(ctr.UUID) {
+					log.Printf("Cancelling slurm job %v", ctr.UUID)
+					scancel(ctr)
+				}
 			}
 		}
 	}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 1c366a0..0d7f703 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -52,6 +52,7 @@ func (s *TestSuite) SetUpTest(c *C) {
 
 func (s *TestSuite) TearDownTest(c *C) {
 	os.Args = initialArgs
+	arvadostest.ResetEnv()
 	arvadostest.StopAPI()
 }
 
@@ -60,21 +61,24 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-	done := false
+	var done *bool = new(bool)
+	*done = false
 	container := s.integrationTest(c,
 		func() *exec.Cmd {
-			if done {
+			if *done {
 				return exec.Command("true")
 			} else {
 				return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
 			}
 		},
+		nil,
+		nil,
 		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			time.Sleep(3 * time.Second)
+			*done = true
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
-			done = true
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
@@ -82,19 +86,7 @@ func (s *TestSuite) TestIntegrationNormal(c *C) {
 func (s *TestSuite) TestIntegrationCancel(c *C) {
 	var cmd *exec.Cmd
 	var scancelCmdLine []string
-	defer func(orig func(arvados.Container) *exec.Cmd) {
-		scancelCmd = orig
-	}(scancelCmd)
 	attempt := 0
-	scancelCmd = func(container arvados.Container) *exec.Cmd {
-		if attempt++; attempt == 1 {
-			return exec.Command("false")
-		} else {
-			scancelCmdLine = scancelFunc(container).Args
-			cmd = exec.Command("echo")
-			return cmd
-		}
-	}
 
 	container := s.integrationTest(c,
 		func() *exec.Cmd {
@@ -104,6 +96,16 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
 				return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
 			}
 		},
+		func(container arvados.Container) *exec.Cmd {
+			if attempt++; attempt == 1 {
+				return exec.Command("false")
+			} else {
+				scancelCmdLine = scancelFunc(container).Args
+				cmd = exec.Command("echo")
+				return cmd
+			}
+		},
+		nil,
 		[]string(nil),
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -118,11 +120,15 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-	container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch",
-		fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
-		fmt.Sprintf("--mem=%d", 11445),
-		fmt.Sprintf("--cpus-per-task=%d", 4),
-		fmt.Sprintf("--tmp=%d", 45777)},
+	container := s.integrationTest(c,
+		func() *exec.Cmd { return exec.Command("echo") },
+		nil,
+		nil,
+		[]string{"sbatch",
+			fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
+			fmt.Sprintf("--mem=%d", 11445),
+			fmt.Sprintf("--cpus-per-task=%d", 4),
+			fmt.Sprintf("--tmp=%d", 45777)},
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			time.Sleep(3 * time.Second)
@@ -131,8 +137,25 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
 	c.Check(container.State, Equals, arvados.ContainerStateCancelled)
 }
 
+func (s *TestSuite) TestSbatchFail(c *C) {
+	container := s.integrationTest(c,
+		func() *exec.Cmd { return exec.Command("echo") },
+		nil,
+		func(container arvados.Container) *exec.Cmd {
+			return exec.Command("false")
+		},
+		[]string(nil),
+		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+			dispatcher.UpdateState(container.UUID, dispatch.Running)
+			dispatcher.UpdateState(container.UUID, dispatch.Complete)
+		})
+	c.Check(container.State, Equals, arvados.ContainerStateComplete)
+}
+
 func (s *TestSuite) integrationTest(c *C,
 	newSqueueCmd func() *exec.Cmd,
+	newScancelCmd func(arvados.Container) *exec.Cmd,
+	newSbatchCmd func(arvados.Container) *exec.Cmd,
 	sbatchCmdComps []string,
 	runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
 	arvadostest.ResetEnv()
@@ -146,9 +169,14 @@ func (s *TestSuite) integrationTest(c *C,
 	defer func(orig func(arvados.Container) *exec.Cmd) {
 		sbatchCmd = orig
 	}(sbatchCmd)
-	sbatchCmd = func(container arvados.Container) *exec.Cmd {
-		sbatchCmdLine = sbatchFunc(container).Args
-		return exec.Command("sh")
+
+	if newSbatchCmd != nil {
+		sbatchCmd = newSbatchCmd
+	} else {
+		sbatchCmd = func(container arvados.Container) *exec.Cmd {
+			sbatchCmdLine = sbatchFunc(container).Args
+			return exec.Command("sh")
+		}
 	}
 
 	// Override squeueCmd
@@ -157,6 +185,12 @@ func (s *TestSuite) integrationTest(c *C,
 	}(squeueCmd)
 	squeueCmd = newSqueueCmd
 
+	// Override scancel
+	defer func(orig func(arvados.Container) *exec.Cmd) {
+		scancelCmd = orig
+	}(scancelCmd)
+	scancelCmd = newScancelCmd
+
 	// There should be one queued container
 	params := arvadosclient.Dict{
 		"filters": [][]string{{"state", "=", "Queued"}},
@@ -169,11 +203,16 @@ func (s *TestSuite) integrationTest(c *C,
 	theConfig.CrunchRunCommand = []string{"echo"}
 
 	ctx, cancel := context.WithCancel(context.Background())
+	doneRun := make(chan struct{})
+
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Duration(1) * time.Second,
 		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
-			go runContainer(disp, ctr)
+			go func() {
+				runContainer(disp, ctr)
+				doneRun <- struct{}{}
+			}()
 			run(disp, ctr, status)
 			cancel()
 		},
@@ -182,6 +221,7 @@ func (s *TestSuite) integrationTest(c *C,
 	sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
 
 	err = dispatcher.Run(ctx)
+	<-doneRun
 	c.Assert(err, Equals, context.Canceled)
 
 	sqCheck.Stop()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list