[ARVADOS] updated: 3ae9a789410e93eeb31ca5670c17a6d03d77f608
Git user
git at public.curoverse.com
Wed Jun 1 16:06:33 EDT 2016
Summary of changes:
sdk/go/dispatch/dispatch.go | 86 +++++----
.../crunch-dispatch-local/crunch-dispatch-local.go | 3 +-
.../crunch-dispatch-local_test.go | 6 +-
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 200 ++++++++++++++-------
.../crunch-dispatch-slurm_test.go | 80 +++++----
.../crunch-dispatch-slurm/crunch-finish-slurm.sh | 20 ---
6 files changed, 234 insertions(+), 161 deletions(-)
delete mode 100755 services/crunch-dispatch-slurm/crunch-finish-slurm.sh
via 3ae9a789410e93eeb31ca5670c17a6d03d77f608 (commit)
via 3a3910fdc8a5003c182f68e3423c96327a136175 (commit)
from 4153cb6cfad920ed0b1a4b818d3bcc8de492d134 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 3ae9a789410e93eeb31ca5670c17a6d03d77f608
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 1 16:06:26 2016 -0400
9187: Slurm dispatcher improvements around squeue
* Clarify that status updates are not guaranteed to be delivered on a
heartbeat.
* Refactor slurm dispatcher to monitor the container in squeue in a separate
goroutine.
* Refactor polling squeue to a single goroutine and cache the results so that
monitoring 100 containers doesn't result in 100 calls to squeue.
* No longer set up strigger to cancel job on finish, instead cancel running
jobs not in squeue.
* Test both cases where a job is/is not in squeue.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index a27971f..785b6ec 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,3 +1,6 @@
+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
package dispatch
import (
@@ -28,7 +31,7 @@ type apiClientAuthorizationList struct {
Items []apiClientAuthorization `json:"items"`
}
-// Container data
+// Represents an Arvados container record
type Container struct {
UUID string `json:"uuid"`
State string `json:"state"`
@@ -45,9 +48,27 @@ type ContainerList struct {
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
- Arv arvadosclient.ArvadosClient
- RunContainer func(*Dispatcher, Container, chan Container)
- PollInterval time.Duration
+ // The Arvados client
+ Arv arvadosclient.ArvadosClient
+
+ // When a new queued container appears and is either already owned by
+ // this dispatcher or is successfully locked, the dispatcher will call
+ // go RunContainer(). The RunContainer() goroutine gets a channel over
+ // which it will receive updates to the container state. The
+ // RunContainer() goroutine should only assume status updates come when
+ // the container record changes on the API server; if it needs to
+ // monitor the job submission to the underlying slurm/grid engine/etc
+ // queue it should spin up its own polling goroutines. When the
+ // channel is closed, that means the container is no longer being
+ // handled by this dispatcher and the goroutine should terminate. The
+ // goroutine is responsible for draining the 'status' channel, failure
+ // to do so may deadlock the dispatcher.
+ RunContainer func(*Dispatcher, Container, chan Container)
+
+ // Amount of time to wait between polling for updates.
+ PollInterval time.Duration
+
+ // Channel used to signal that RunDispatcher loop should exit.
DoneProcessing chan struct{}
mineMutex sync.Mutex
@@ -159,7 +180,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
// back to Queued and then locked by another dispatcher,
// LockedByUUID will be different. In either case, we want
// to stop monitoring it.
- log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+ log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
dispatcher.notMine(container.UUID)
return
}
@@ -191,7 +212,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
"container": arvadosclient.Dict{"state": newState}},
nil)
if err != nil {
- log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+ log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
}
return err
}
@@ -199,14 +220,6 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
// RunDispatcher runs the main loop of the dispatcher until receiving a message
// on the dispatcher.DoneProcessing channel. It also installs a signal handler
// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer(). If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer(). The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
func (dispatcher *Dispatcher) RunDispatcher() (err error) {
err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index aca60e9..0248f18 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -115,7 +115,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
+ testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to state \"Locked\"")
}
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -123,7 +123,7 @@ func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
apiStubResponses["/arvados/v1/containers"] =
arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
- arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+ arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
testWithServerStub(c, apiStubResponses, "echo",
`After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2". Updating it to "Cancelled"`)
@@ -142,7 +142,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
- arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+ arvadostest.StubResponse{200, string(`{"uuid": "` + arvadostest.Dispatch1AuthUUID + `", "api_token": "xyz"}`)}
apiStub := arvadostest.ServerStub{apiStubResponses}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 641b4bc..3e14820 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -14,9 +14,16 @@ import (
"os"
"os/exec"
"strings"
+ "sync"
"time"
)
+type Squeue struct {
+ sync.Mutex
+ squeueContents []string
+ SqueueDone chan struct{}
+}
+
func main() {
err := doMain()
if err != nil {
@@ -26,7 +33,7 @@ func main() {
var (
crunchRunCommand *string
- finishCommand *string
+ squeueUpdater Squeue
)
func doMain() error {
@@ -42,11 +49,6 @@ func doMain() error {
"/usr/bin/crunch-run",
"Crunch command to run container")
- finishCommand = flags.String(
- "finish-command",
- "/usr/bin/crunch-finish-slurm.sh",
- "Command to run from strigger when job is finished")
-
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
@@ -63,11 +65,17 @@ func doMain() error {
PollInterval: time.Duration(*pollInterval) * time.Second,
DoneProcessing: make(chan struct{})}
+ squeueUpdater.SqueueDone = make(chan struct{})
+ go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
+
err = dispatcher.RunDispatcher()
if err != nil {
return err
}
+ squeueUpdater.SqueueDone <- struct{}{}
+ close(squeueUpdater.SqueueDone)
+
return nil
}
@@ -81,19 +89,12 @@ func sbatchFunc(container dispatch.Container) *exec.Cmd {
fmt.Sprintf("--priority=%d", container.Priority))
}
-// striggerCmd
-func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
- return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
- fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
-}
-
// squeueFunc
func squeueFunc() *exec.Cmd {
return exec.Command("squeue", "--format=%j")
}
// Wrap these so that they can be overridden by tests
-var striggerCmd = striggerFunc
var sbatchCmd = sbatchFunc
var squeueCmd = squeueFunc
@@ -182,44 +183,66 @@ func submit(dispatcher *dispatch.Dispatcher,
return
}
-// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
-// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
- insecure := "0"
- if arv.ApiInsecure {
- insecure = "1"
- }
- cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- err := cmd.Run()
- if err != nil {
- log.Printf("While setting up strigger: %v", err)
- // BUG: we drop the error here and forget about it. A
- // human has to notice the container is stuck in
- // Running state, and fix it manually.
- }
-}
+func (squeue *Squeue) runSqueue() ([]string, error) {
+ var newSqueueContents []string
-func checkSqueue(uuid string) (bool, error) {
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
- return false, err
+ return nil, err
}
cmd.Start()
- defer cmd.Wait()
scanner := bufio.NewScanner(sq)
- found := false
for scanner.Scan() {
- if scanner.Text() == uuid {
- found = true
- }
+ newSqueueContents = append(newSqueueContents, scanner.Text())
}
if err := scanner.Err(); err != nil {
- return false, err
+ cmd.Wait()
+ return nil, err
+ }
+
+ err = cmd.Wait()
+ if err != nil {
+ return nil, err
+ }
+
+ return newSqueueContents, nil
+}
+
+func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
+ if check {
+ n, err := squeue.runSqueue()
+ if err != nil {
+ return false, err
+ }
+ squeue.Lock()
+ squeue.squeueContents = n
+ squeue.Unlock()
+ }
+
+ if uuid != "" {
+ squeue.Lock()
+ defer squeue.Unlock()
+ for _, k := range squeue.squeueContents {
+ if k == uuid {
+ return true, nil
+ }
+ }
+ }
+ return false, nil
+}
+
+func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
+ // TODO: considering using "squeue -i" instead of polling squeue.
+ ticker := time.NewTicker(pollInterval)
+ for {
+ select {
+ case <-squeueUpdater.SqueueDone:
+ return
+ case <-ticker.C:
+ squeue.CheckSqueue("", true)
+ }
}
- return found, nil
}
// Run or monitor a container.
@@ -239,50 +262,91 @@ func run(dispatcher *dispatch.Dispatcher,
uuid := container.UUID
if container.State == dispatch.Locked {
- if inQ, err := checkSqueue(container.UUID); err != nil {
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+ // maybe squeue is broken, put it back in the queue
log.Printf("Error running squeue: %v", err)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ dispatcher.UpdateState(container.UUID, dispatch.Queued)
} else if !inQ {
log.Printf("About to submit queued container %v", container.UUID)
- jobid, err := submit(dispatcher, container, *crunchRunCommand)
- if err != nil {
- log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
- } else {
- finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+ if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+ log.Printf("Error submitting container %s to slurm: %v",
+ container.UUID, err)
+ // maybe sbatch is broken, put it back to queued
+ dispatcher.UpdateState(container.UUID, dispatch.Queued)
}
}
- } else if container.State == dispatch.Running {
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
- } else if !inQ {
- log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
- dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
- }
}
log.Printf("Monitoring container %v started", uuid)
- for container = range status {
- if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
- log.Printf("Canceling container %s", container.UUID)
-
- err := exec.Command("scancel", "--name="+container.UUID).Run()
- if err != nil {
- log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
- if inQ, err := checkSqueue(container.UUID); err != nil {
+ // periodically check squeue
+ doneSqueue := make(chan struct{})
+ go func() {
+ squeueUpdater.CheckSqueue(container.UUID, true)
+ ticker := time.NewTicker(dispatcher.PollInterval)
+ for {
+ select {
+ case <-ticker.C:
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
log.Printf("Error running squeue: %v", err)
- continue
- } else if inQ {
- log.Printf("Container %s is still in squeue after scancel.", container.UUID)
- continue
+ // don't cancel, just leave it the way it is
+ } else if !inQ {
+ var con dispatch.Container
+ err := dispatcher.Arv.Get("containers", uuid, nil, &con)
+ if err != nil {
+ log.Printf("Error getting final container state: %v", err)
+ }
+
+ var st string
+ switch con.State {
+ case dispatch.Locked:
+ st = dispatch.Queued
+ case dispatch.Running:
+ st = dispatch.Cancelled
+ default:
+ st = ""
+ }
+
+ if st != "" {
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ uuid, con.State, st)
+ dispatcher.UpdateState(uuid, st)
+ }
}
+ case <-doneSqueue:
+ close(doneSqueue)
+ ticker.Stop()
+ return
}
+ }
+ }()
- err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ for container = range status {
+ if container.State == dispatch.Locked || container.State == dispatch.Running {
+ if container.Priority == 0 {
+ log.Printf("Canceling container %s", container.UUID)
+
+ err := exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v",
+ container.UUID, err)
+ if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue after scancel.",
+ container.UUID)
+ continue
+ }
+ }
+
+ err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
}
}
+ doneSqueue <- struct{}{}
+
log.Printf("Monitoring container %v finished", uuid)
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 348d5e4..d30c5df 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -1,12 +1,12 @@
package main
import (
+ "bytes"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
-
- "bytes"
- "fmt"
+ "io"
"log"
"math"
"net/http"
@@ -35,35 +35,43 @@ var initialArgs []string
func (s *TestSuite) SetUpSuite(c *C) {
initialArgs = os.Args
- arvadostest.StartAPI()
}
func (s *TestSuite) TearDownSuite(c *C) {
- arvadostest.StopAPI()
}
func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-slurm"}
os.Args = args
+ arvadostest.StartAPI()
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
func (s *TestSuite) TearDownTest(c *C) {
- arvadostest.ResetEnv()
os.Args = initialArgs
+ arvadostest.StopAPI()
}
func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) TestIntegration(c *C) {
+func (s *TestSuite) TestIntegrationNormal(c *C) {
+ s.integrationTest(c, false)
+}
+
+func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
+ s.integrationTest(c, true)
+}
+
+func (s *TestSuite) integrationTest(c *C, missingFromSqueue bool) {
+ arvadostest.ResetEnv()
+
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, IsNil)
var sbatchCmdLine []string
- var striggerCmdLine []string
// Override sbatchCmd
defer func(orig func(dispatch.Container) *exec.Cmd) {
@@ -74,30 +82,16 @@ func (s *TestSuite) TestIntegration(c *C) {
return exec.Command("sh")
}
- // Override striggerCmd
- defer func(orig func(jobid, containerUUID, finishCommand,
- apiHost, apiToken, apiInsecure string) *exec.Cmd) {
- striggerCmd = orig
- }(striggerCmd)
- striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
- striggerCmdLine = striggerFunc(jobid, containerUUID, finishCommand,
- apiHost, apiToken, apiInsecure).Args
- go func() {
- time.Sleep(5 * time.Second)
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": dispatch.Complete}},
- nil)
- }()
- return exec.Command("echo", striggerCmdLine...)
- }
-
// Override squeueCmd
defer func(orig func() *exec.Cmd) {
squeueCmd = orig
}(squeueCmd)
squeueCmd = func() *exec.Cmd {
- return exec.Command("echo")
+ if missingFromSqueue {
+ return exec.Command("echo")
+ } else {
+ return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+ }
}
// There should be no queued containers now
@@ -111,8 +105,6 @@ func (s *TestSuite) TestIntegration(c *C) {
echo := "echo"
crunchRunCommand = &echo
- finishCmd := "/usr/bin/crunch-finish-slurm.sh"
- finishCommand = &finishCmd
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
@@ -122,8 +114,8 @@ func (s *TestSuite) TestIntegration(c *C) {
container dispatch.Container,
status chan dispatch.Container) {
go func() {
- time.Sleep(1)
dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}()
run(dispatcher, container, status)
@@ -131,19 +123,29 @@ func (s *TestSuite) TestIntegration(c *C) {
},
DoneProcessing: doneProcessing}
+ squeueUpdater.SqueueDone = make(chan struct{})
+ go squeueUpdater.SyncSqueue(time.Duration(500) * time.Millisecond)
+
err = dispatcher.RunDispatcher()
c.Assert(err, IsNil)
+ squeueUpdater.SqueueDone <- struct{}{}
+ close(squeueUpdater.SqueueDone)
+
item := containers.Items[0]
sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", item.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
fmt.Sprintf("--priority=%d", item.Priority)}
- c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
- c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
- "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
+ if missingFromSqueue {
+ // not in squeue when run() started, so it will have called sbatch
+ c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+ } else {
+ // already in squeue when run() started, will have just monitored it instead
+ c.Check(sbatchCmdLine, DeepEquals, []string(nil))
+ }
// There should be no queued containers now
err = arv.List("containers", params, &containers)
@@ -154,7 +156,11 @@ func (s *TestSuite) TestIntegration(c *C) {
var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- c.Check(container.State, Equals, "Complete")
+ if missingFromSqueue {
+ c.Check(container.State, Equals, "Cancelled")
+ } else {
+ c.Check(container.State, Equals, "Complete")
+ }
}
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -180,12 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(buf)
+ log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
crunchRunCommand = &crunchCmd
- finishCmd := "/usr/bin/crunch-finish-slurm.sh"
- finishCommand = &finishCmd
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
@@ -195,7 +199,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
container dispatch.Container,
status chan dispatch.Container) {
go func() {
- time.Sleep(1)
+ time.Sleep(1 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}()
diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
deleted file mode 100755
index 95a37ba..0000000
--- a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/sh
-
-# Script to be called by strigger when a job finishes. This ensures the job
-# record has the correct state "Complete" even if the node running the job
-# failed.
-
-ARVADOS_API_HOST=$1
-ARVADOS_API_TOKEN=$2
-ARVADOS_API_HOST_INSECURE=$3
-uuid=$4
-jobid=$5
-
-# If it is possible to attach metadata to job records we could look up the
-# above information instead of getting it on the command line. For example,
-# this is the recipe for getting the job name (container uuid) from the job id.
-#uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
-
-export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE
-
-exec arv container update --uuid $uuid --container '{"state": "Complete"}'
commit 3a3910fdc8a5003c182f68e3423c96327a136175
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 27 17:30:07 2016 -0400
9187: Check LockedByUUID on container updates and terminate status updates if
not equal to dispatcher token.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 355ed7c..a27971f 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -52,7 +52,7 @@ type Dispatcher struct {
mineMutex sync.Mutex
mineMap map[string]chan Container
- auth apiClientAuthorization
+ Auth apiClientAuthorization
containers chan Container
}
@@ -100,17 +100,18 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
- } else {
- if containers.ItemsAvailable > len(containers.Items) {
- // TODO: support paging
- log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
- containers.ItemsAvailable,
- len(containers.Items))
- }
- for _, container := range containers.Items {
- touched[container.UUID] = true
- dispatcher.containers <- container
- }
+ return
+ }
+
+ if containers.ItemsAvailable > len(containers.Items) {
+ // TODO: support paging
+ log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+ containers.ItemsAvailable,
+ len(containers.Items))
+ }
+ for _, container := range containers.Items {
+ touched[container.UUID] = true
+ dispatcher.containers <- container
}
}
@@ -122,7 +123,7 @@ func (dispatcher *Dispatcher) pollContainers() {
"order": []string{"priority desc"},
"limit": "1000"}
paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+ "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
"limit": "1000"}
for {
@@ -152,11 +153,19 @@ func (dispatcher *Dispatcher) pollContainers() {
}
func (dispatcher *Dispatcher) handleUpdate(container Container) {
+ if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
+ // If container is Complete, Cancelled, or Queued, LockedByUUID
+ // will be nil. If the container was formally Locked, moved
+ // back to Queued and then locked by another dispatcher,
+ // LockedByUUID will be different. In either case, we want
+ // to stop monitoring it.
+ log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+ dispatcher.notMine(container.UUID)
+ return
+ }
+
if dispatcher.updateMine(container) {
- if container.State == Complete || container.State == Cancelled {
- log.Printf("Container %v now in state %v", container.UUID, container.State)
- dispatcher.notMine(container.UUID)
- }
+ // Already monitored, sent status update
return
}
@@ -169,6 +178,8 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
}
if container.State == Locked || container.State == Running {
+ // Not currently monitored but in Locked or Running state and
+ // owned by this dispatcher, so start monitoring.
go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
}
}
@@ -197,7 +208,7 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
// The callback is responsible for draining the channel, if it fails to do so
// it will deadlock the dispatcher.
func (dispatcher *Dispatcher) RunDispatcher() (err error) {
- err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
log.Printf("Error getting my token UUID: %v", err)
return
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index cc472a4..73a3895 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -159,7 +159,8 @@ func run(dispatcher *dispatch.Dispatcher,
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+ if container.LockedByUUID == dispatcher.Auth.UUID &&
+ (container.State == dispatch.Locked || container.State == dispatch.Running) {
log.Printf("After %s process termination, container state for %v is %q. Updating it to %q",
*crunchRunCommand, container.State, uuid, dispatch.Cancelled)
dispatcher.UpdateState(uuid, dispatch.Cancelled)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list