[ARVADOS] created: 26d7e1809376534b9060534e25399cbd462f38da
Git user
git at public.curoverse.com
Fri May 5 17:11:51 EDT 2017
at 26d7e1809376534b9060534e25399cbd462f38da (commit)
commit 26d7e1809376534b9060534e25399cbd462f38da
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 5 17:11:39 2017 -0400
11626: Don't run scancel if the job is already gone from the queue. Fix tests so there is no lingering goroutines. Log sbatch errors where the user can see them.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 261444a..31c6d8d 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -148,12 +148,9 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
+ tracker.update(c)
switch c.State {
- case Queued:
- tracker.close()
- case Locked, Running:
- tracker.update(c)
- case Cancelled, Complete:
+ case Queued, Cancelled, Complete:
tracker.close()
}
} else {
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index cca8b3f..a449169 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -226,13 +226,21 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
- log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ log.Printf(text)
+
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": ctr.UUID,
+ "event_type": "crunch-dispatch-slurm",
+ "properties": map[string]string{"text": text}}}
+ disp.Arv.Create("logs", lr, nil)
+
disp.Unlock(ctr.UUID)
return
}
}
- log.Printf("Start monitoring container %s", ctr.UUID)
+ log.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
defer log.Printf("Done monitoring container %s", ctr.UUID)
// If the container disappears from the slurm queue, there is
@@ -244,6 +252,7 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
cancel()
}(ctr.UUID)
+ lastState := ctr.State
for {
select {
case <-ctx.Done():
@@ -259,12 +268,15 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
}
return
case updated, ok := <-status:
- if !ok {
- log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
- scancel(ctr)
- } else if updated.Priority == 0 {
- log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
- scancel(ctr)
+ if ok && lastState != updated.State || updated.Priority == 0 {
+ log.Printf("Container %s has state %q, priority %d", ctr.UUID, updated.State, updated.Priority)
+ lastState = updated.State
+ }
+ if !ok || updated.Priority == 0 {
+ if sqCheck.HasUUID(ctr.UUID) {
+ log.Printf("Cancelling slurm job %v", ctr.UUID)
+ scancel(ctr)
+ }
}
}
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 1c366a0..0d7f703 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -52,6 +52,7 @@ func (s *TestSuite) SetUpTest(c *C) {
func (s *TestSuite) TearDownTest(c *C) {
os.Args = initialArgs
+ arvadostest.ResetEnv()
arvadostest.StopAPI()
}
@@ -60,21 +61,24 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
}
func (s *TestSuite) TestIntegrationNormal(c *C) {
- done := false
+ var done *bool = new(bool)
+ *done = false
container := s.integrationTest(c,
func() *exec.Cmd {
- if done {
+ if *done {
return exec.Command("true")
} else {
return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
}
},
+ nil,
+ nil,
[]string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
+ *done = true
dispatcher.UpdateState(container.UUID, dispatch.Complete)
- done = true
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
@@ -82,19 +86,7 @@ func (s *TestSuite) TestIntegrationNormal(c *C) {
func (s *TestSuite) TestIntegrationCancel(c *C) {
var cmd *exec.Cmd
var scancelCmdLine []string
- defer func(orig func(arvados.Container) *exec.Cmd) {
- scancelCmd = orig
- }(scancelCmd)
attempt := 0
- scancelCmd = func(container arvados.Container) *exec.Cmd {
- if attempt++; attempt == 1 {
- return exec.Command("false")
- } else {
- scancelCmdLine = scancelFunc(container).Args
- cmd = exec.Command("echo")
- return cmd
- }
- }
container := s.integrationTest(c,
func() *exec.Cmd {
@@ -104,6 +96,16 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
}
},
+ func(container arvados.Container) *exec.Cmd {
+ if attempt++; attempt == 1 {
+ return exec.Command("false")
+ } else {
+ scancelCmdLine = scancelFunc(container).Args
+ cmd = exec.Command("echo")
+ return cmd
+ }
+ },
+ nil,
[]string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -118,11 +120,15 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
}
func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch",
- fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
- fmt.Sprintf("--mem=%d", 11445),
- fmt.Sprintf("--cpus-per-task=%d", 4),
- fmt.Sprintf("--tmp=%d", 45777)},
+ container := s.integrationTest(c,
+ func() *exec.Cmd { return exec.Command("echo") },
+ nil,
+ nil,
+ []string{"sbatch",
+ fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
+ fmt.Sprintf("--mem=%d", 11445),
+ fmt.Sprintf("--cpus-per-task=%d", 4),
+ fmt.Sprintf("--tmp=%d", 45777)},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
@@ -131,8 +137,25 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
}
+func (s *TestSuite) TestSbatchFail(c *C) {
+ container := s.integrationTest(c,
+ func() *exec.Cmd { return exec.Command("echo") },
+ nil,
+ func(container arvados.Container) *exec.Cmd {
+ return exec.Command("false")
+ },
+ []string(nil),
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, arvados.ContainerStateComplete)
+}
+
func (s *TestSuite) integrationTest(c *C,
newSqueueCmd func() *exec.Cmd,
+ newScancelCmd func(arvados.Container) *exec.Cmd,
+ newSbatchCmd func(arvados.Container) *exec.Cmd,
sbatchCmdComps []string,
runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
arvadostest.ResetEnv()
@@ -146,9 +169,14 @@ func (s *TestSuite) integrationTest(c *C,
defer func(orig func(arvados.Container) *exec.Cmd) {
sbatchCmd = orig
}(sbatchCmd)
- sbatchCmd = func(container arvados.Container) *exec.Cmd {
- sbatchCmdLine = sbatchFunc(container).Args
- return exec.Command("sh")
+
+ if newSbatchCmd != nil {
+ sbatchCmd = newSbatchCmd
+ } else {
+ sbatchCmd = func(container arvados.Container) *exec.Cmd {
+ sbatchCmdLine = sbatchFunc(container).Args
+ return exec.Command("sh")
+ }
}
// Override squeueCmd
@@ -157,6 +185,12 @@ func (s *TestSuite) integrationTest(c *C,
}(squeueCmd)
squeueCmd = newSqueueCmd
+ // Override scancel
+ defer func(orig func(arvados.Container) *exec.Cmd) {
+ scancelCmd = orig
+ }(scancelCmd)
+ scancelCmd = newScancelCmd
+
// There should be one queued container
params := arvadosclient.Dict{
"filters": [][]string{{"state", "=", "Queued"}},
@@ -169,11 +203,16 @@ func (s *TestSuite) integrationTest(c *C,
theConfig.CrunchRunCommand = []string{"echo"}
ctx, cancel := context.WithCancel(context.Background())
+ doneRun := make(chan struct{})
+
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
- go runContainer(disp, ctr)
+ go func() {
+ runContainer(disp, ctr)
+ doneRun <- struct{}{}
+ }()
run(disp, ctr, status)
cancel()
},
@@ -182,6 +221,7 @@ func (s *TestSuite) integrationTest(c *C,
sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
err = dispatcher.Run(ctx)
+ <-doneRun
c.Assert(err, Equals, context.Canceled)
sqCheck.Stop()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list