[ARVADOS] updated: 11d484489c21487074ecb6d15b086c02b13e326f
Git user
git at public.curoverse.com
Wed May 25 09:29:45 EDT 2016
Summary of changes:
sdk/go/dispatch/dispatch.go | 136 ++++++++++++---------
services/api/app/models/container.rb | 7 +-
.../crunch-dispatch-local/crunch-dispatch-local.go | 100 +++++++--------
.../crunch-dispatch-local_test.go | 22 ++--
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 29 +++--
.../crunch-dispatch-slurm_test.go | 33 ++---
6 files changed, 185 insertions(+), 142 deletions(-)
via 11d484489c21487074ecb6d15b086c02b13e326f (commit)
from 1032d5957a67aa706079917f3c20bdb1a5c91ad0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 11d484489c21487074ecb6d15b086c02b13e326f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed May 25 08:54:23 2016 -0400
9187: Explicitly query for monitored containers that are not listed (usually complete). Tests pass.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 926a550..9ce7bf8 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -10,6 +10,14 @@ import (
"time"
)
+const (
+ Queued = "Queued"
+ Locked = "Locked"
+ Running = "Running"
+ Complete = "Complete"
+ Cancelled = "Cancelled"
+)
+
type apiClientAuthorization struct {
UUID string `json:"uuid"`
APIToken string `json:"api_token"`
@@ -34,15 +42,15 @@ type ContainerList struct {
}
type DispatcherState struct {
- mineMutex sync.Mutex
- mineMap map[string]chan Container
- pollInterval time.Duration
- Arv arvadosclient.ArvadosClient
- auth apiClientAuthorization
- containers chan []Container
- DoneProcessing chan struct{}
- runContainer func(*DispatcherState, Container, chan Container)
- waitContainer func(*DispatcherState, Container, chan Container)
+ mineMutex sync.Mutex
+ mineMap map[string]chan Container
+ pollInterval time.Duration
+ Arv arvadosclient.ArvadosClient
+ auth apiClientAuthorization
+ containers chan Container
+ DoneProcessing chan struct{}
+ runContainer func(*DispatcherState, Container)
+ monitorContainer func(*DispatcherState, string, chan Container)
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
@@ -82,37 +90,46 @@ func (dispatcher *DispatcherState) updateMine(c Container) bool {
return false
}
+func (dispatcher *DispatcherState) getContainers(params arvadosclient.Dict, touched map[string]bool) {
+ // XXX needs to handle paging
+ var containers ContainerList
+ err := dispatcher.Arv.List("containers", params, &containers)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
+ } else {
+ for _, container := range containers.Items {
+ touched[container.UUID] = true
+ dispatcher.containers <- container
+ }
+ }
+}
+
func (dispatcher *DispatcherState) pollContainers() {
ticker := time.NewTicker(dispatcher.pollInterval)
paramsQ := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "=", "Queued"},
- {"priority", ">", "0"}},
- "order": []string{"priority desc"}}
+ "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
+ "order": []string{"priority desc"}}
paramsP := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "in", []string{"Locked", "Running"}},
- {"locked_by_uuid", "=", dispatcher.auth.UUID}}}
+ "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}}}
for {
select {
case <-ticker.C:
- {
- var containers ContainerList
- err := dispatcher.Arv.List("containers", paramsQ, &containers)
- if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- } else {
- dispatcher.containers <- containers.Items
+ touched := make(map[string]bool)
+ dispatcher.getContainers(paramsQ, touched)
+ dispatcher.getContainers(paramsP, touched)
+ dispatcher.mineMutex.Lock()
+ var monitored []string
+ for k, _ := range dispatcher.mineMap {
+ if _, ok := touched[k]; !ok {
+ monitored = append(monitored, k)
}
}
- {
- var containers ContainerList
- err := dispatcher.Arv.List("containers", paramsP, &containers)
- if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- } else {
- dispatcher.containers <- containers.Items
- }
+ dispatcher.mineMutex.Unlock()
+ if monitored != nil {
+ dispatcher.getContainers(arvadosclient.Dict{
+ "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
}
case <-dispatcher.DoneProcessing:
close(dispatcher.containers)
@@ -123,35 +140,40 @@ func (dispatcher *DispatcherState) pollContainers() {
}
func (dispatcher *DispatcherState) handleContainers() {
- for containerlist := range dispatcher.containers {
- for _, container := range containerlist {
- if dispatcher.updateMine(container) {
- continue
+ for container := range dispatcher.containers {
+ if dispatcher.updateMine(container) {
+ if container.State == Complete || container.State == Cancelled {
+ log.Printf("Container %v now in state %v", container.UUID, container.State)
+ dispatcher.notMine(container.UUID)
}
+ continue
+ }
- if container.State == "Locked" || container.State == "Running" {
- log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
- "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
- container.UUID, dispatcher.auth.UUID)
- go func(c Container, ch chan Container) {
- defer dispatcher.notMine(c.UUID)
- dispatcher.waitContainer(dispatcher, c, ch)
- }(container, dispatcher.setMine(container.UUID))
- continue
- }
+ if container.State == Locked || container.State == Running {
+ log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+ "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+ container.UUID, dispatcher.auth.UUID)
+ go func(uuid string, ch chan Container) {
+ dispatcher.monitorContainer(dispatcher, uuid, ch)
+ }(container.UUID, dispatcher.setMine(container.UUID))
+ continue
+ }
- // Lock container to this dispatcher
- if err := dispatcher.UpdateState(container.UUID, "Locked"); err != nil {
- continue
- }
+ if container.State != Queued {
+ // Not queued, pass
+ continue
+ }
- // Run it
- go func(c Container, ch chan Container) {
- defer dispatcher.notMine(c.UUID)
- dispatcher.runContainer(dispatcher, c, ch)
- dispatcher.waitContainer(dispatcher, c, ch)
- }(container, dispatcher.setMine(container.UUID))
+ // Must be queued, so try to lock container to this dispatcher
+ if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+ continue
}
+
+ // Run it
+ go func(c Container, ch chan Container) {
+ dispatcher.runContainer(dispatcher, c)
+ dispatcher.monitorContainer(dispatcher, c.UUID, ch)
+ }(container, dispatcher.setMine(container.UUID))
}
}
@@ -187,8 +209,8 @@ func (dispatcher *DispatcherState) RunDispatcher() {
func MakeDispatcher(arv arvadosclient.ArvadosClient,
pollInterval time.Duration,
- runContainer func(*DispatcherState, Container, chan Container),
- waitContainer func(*DispatcherState, Container, chan Container)) (*DispatcherState, error) {
+ runContainer func(*DispatcherState, Container),
+ monitorContainer func(*DispatcherState, string, chan Container)) (*DispatcherState, error) {
dispatcher := &DispatcherState{}
dispatcher.Arv = arv
@@ -201,10 +223,10 @@ func MakeDispatcher(arv arvadosclient.ArvadosClient,
dispatcher.pollInterval = pollInterval
dispatcher.runContainer = runContainer
- dispatcher.waitContainer = waitContainer
+ dispatcher.monitorContainer = monitorContainer
dispatcher.mineMap = make(map[string]chan Container)
dispatcher.DoneProcessing = make(chan struct{})
- dispatcher.containers = make(chan []Container)
+ dispatcher.containers = make(chan Container)
return dispatcher, nil
}
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 4c77008..94cb0d3 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -56,7 +56,7 @@ class Container < ArvadosModel
State_transitions = {
nil => [Queued],
Queued => [Locked, Cancelled],
- Locked => [Queued, Running, Cancelled],
+ Locked => [Queued, Running, Cancelled, Complete],
Running => [Complete, Cancelled]
}
@@ -125,8 +125,11 @@ class Container < ArvadosModel
end
when Complete
- if self.state_was == Running
+ case self.state_was
+ when Running
permitted.push :finished_at, :output, :log, :exit_code
+ when Locked
+ permitted.push :finished_at
end
when Cancelled
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 1f19a29..5e2e34e 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -48,10 +48,11 @@ func doMain() error {
log.Printf("Error making Arvados client: %v", err)
return err
}
+ arv.Retries = 25
var dispatcher *dispatch.DispatcherState
dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
- run, setFinalState)
+ run, monitorRun)
if err != nil {
return err
}
@@ -76,19 +77,12 @@ func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
var startCmd = startFunc
-// Run queued container:
-// Set container state to Locked
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
func run(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container dispatch.Container) {
uuid := container.UUID
waitGroup.Add(1)
- defer waitGroup.Done()
cmd := exec.Command(*crunchRunCommand, uuid)
cmd.Stdin = nil
@@ -103,63 +97,71 @@ func run(dispatcher *dispatch.DispatcherState,
if err := startCmd(container, cmd); err != nil {
runningCmdsMutex.Unlock()
log.Printf("Error starting crunch-run for %v: %q", uuid, err)
- dispatcher.UpdateState(uuid, "Cancelled")
- go func() {
- for _ = range status {
- }
- }()
+ dispatcher.UpdateState(uuid, dispatch.Complete)
+ waitGroup.Done()
return
}
runningCmds[uuid] = cmd
runningCmdsMutex.Unlock()
+}
+
+func monitorRun(dispatcher *dispatch.DispatcherState,
+ uuid string,
+ status chan dispatch.Container) {
- // Interrupt the child process if priority changes to 0
- go func() {
- for c := range status {
- if (c.State == "Locked" || c.State == "Running") && c.Priority == 0 {
- log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
- cmd.Process.Signal(os.Interrupt)
+ runningCmdsMutex.Lock()
+ cmd, ok := runningCmds[uuid]
+ runningCmdsMutex.Unlock()
+
+ if ok {
+ // Interrupt the child process if priority changes to 0
+ go func() {
+ for c := range status {
+ if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
+ }
}
+ }()
+
+ // Wait for crunch-run to exit
+ if _, err := cmd.Process.Wait(); err != nil {
+ log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
}
- }()
- // Wait for crunch-run to exit
- if _, err := cmd.Process.Wait(); err != nil {
- log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
- }
+ waitGroup.Done()
- log.Printf("Finished container run for %v", uuid)
-}
+ log.Printf("Finished container run for %v", uuid)
-func setFinalState(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
- status chan dispatch.Container) {
-
- uuid := container.UUID
+ // Remove the crunch job from runningCmds
+ runningCmdsMutex.Lock()
+ delete(runningCmds, uuid)
+ runningCmdsMutex.Unlock()
+ } else {
+ go func() {
+ for _ = range status {
+ }
+ }()
+ }
- // The container state should be 'Complete' if everything went well.
- // If the container is "Running" or "Locked", that's an error, so
- // change it to "Cancelled". TODO: perhaps this should be "Error"
- // state instead?
+ // The container state should be 'Complete'. If the container is still
+ // "Running" or "Locked", but we know it isn't actually running, so in
+ // that case change it to "Complete".
+ var container dispatch.Container
err := dispatcher.Arv.Get("containers", uuid, nil, &container)
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- fixState := map[string]string{
- "Running": "Cancelled",
- "Locked": "Cancelled",
- }
- if newState, ok := fixState[container.State]; ok {
+ if container.State == dispatch.Running || container.State == dispatch.Locked {
+ var newState string
+ if container.Priority == 0 {
+ newState = dispatch.Cancelled
+ } else {
+ newState = dispatch.Complete
+ }
log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
- dispatcher.UpdateState(uuid, newState)
- }
-
- // Remove the crunch job from runningCmds
- runningCmdsMutex.Lock()
- if _, ok := runningCmds[uuid]; ok {
- delete(runningCmds, uuid)
+ dispatcher.UpdateState(uuid, dispatch.Cancelled)
}
- runningCmdsMutex.Unlock()
log.Printf("Finalized container %v", uuid)
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 9a727bc..b169575 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -12,6 +12,7 @@ import (
"net/http/httptest"
"os"
"os/exec"
+ "strings"
"testing"
"time"
)
@@ -63,9 +64,9 @@ func (s *TestSuite) TestIntegration(c *C) {
dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
run,
func(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
+ container string,
status chan dispatch.Container) {
- setFinalState(dispatcher, container, status)
+ monitorRun(dispatcher, container, status)
dispatcher.DoneProcessing <- struct{}{}
})
c.Assert(err, IsNil)
@@ -107,7 +108,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
@@ -117,7 +118,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
@@ -128,7 +129,14 @@ func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`)}
+
+ // seq := &arvadostest.ResponseSequence{0,
+ // []arvadostest.StubResponse{{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`), nil},
+ // {200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Running"}]}`), nil},
+ // {200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Complete"}]}`), nil}}}
+ //apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{0, "", seq.Respond}
+
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
@@ -161,9 +169,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
run,
func(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
+ container string,
status chan dispatch.Container) {
- setFinalState(dispatcher, container, status)
+ monitorRun(dispatcher, container, status)
dispatcher.DoneProcessing <- struct{}{}
})
c.Assert(err, IsNil)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 649b784..e0b7ce8 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"strconv"
+ "strings"
"time"
)
@@ -53,10 +54,11 @@ func doMain() error {
log.Printf("Error making Arvados client: %v", err)
return err
}
+ arv.Retries = 25
var dispatcher *dispatch.DispatcherState
dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
- run, waitContainer)
+ run, monitorContainer)
if err != nil {
return err
}
@@ -163,7 +165,7 @@ func submit(dispatcher *dispatch.DispatcherState,
}
// If everything worked out, got the jobid on stdout
- jobid = string(stdoutMsg)
+ jobid = strings.TrimSpace(string(stdoutMsg))
return
}
@@ -188,10 +190,9 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arva
}
// Run a queued container: [1] Set container state to locked. [2]
-// Execute crunch-run as a slurm batch job. [3] waitContainer().
+// Execute crunch-run as a slurm batch job. [3] monitorContainer().
func run(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container dispatch.Container) {
log.Printf("About to submit queued container %v", container.UUID)
@@ -208,15 +209,18 @@ func run(dispatcher *dispatch.DispatcherState,
// Wait for a container to finish. Cancel the slurm job if the
// container priority changes to zero before it ends.
-func waitContainer(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
+func monitorContainer(dispatcher *dispatch.DispatcherState,
+ uuid string,
status chan dispatch.Container) {
- log.Printf("Monitoring container %v started", container.UUID)
- defer log.Printf("Monitoring container %v finished", container.UUID)
+ log.Printf("Monitoring container %v started", uuid)
+ defer log.Printf("Monitoring container %v finished", uuid)
+
+ var container dispatch.Container
+ dispatcher.Arv.Get("containers", uuid, nil, &container)
- for container = range status {
- if (container.State == "Locked" || container.State == "Running") && container.Priority == 0 {
+ for container := range status {
+ if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
log.Printf("Canceling container %s", container.UUID)
err := exec.Command("scancel", "--name="+container.UUID).Run()
@@ -233,11 +237,10 @@ func waitContainer(dispatcher *dispatch.DispatcherState,
err = dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Cancelled"}},
+ "container": arvadosclient.Dict{"state": dispatch.Cancelled}},
nil)
if err != nil {
log.Printf("Error updating state for container %s: %s", container.UUID, err)
- continue
}
}
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3658737..34f497c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -85,14 +85,12 @@ func (s *TestSuite) TestIntegration(c *C) {
apiHost, apiToken, apiInsecure).Args
go func() {
time.Sleep(5 * time.Second)
- for _, state := range []string{"Running", "Complete"} {
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": state}},
- nil)
- }
+ arv.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": dispatch.Complete}},
+ nil)
}()
- return exec.Command("echo", "strigger")
+ return exec.Command("echo", striggerCmdLine...)
}
// There should be no queued containers now
@@ -106,14 +104,19 @@ func (s *TestSuite) TestIntegration(c *C) {
echo := "echo"
crunchRunCommand = &echo
+ finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+ finishCommand = &finishCmd
var dispatcher *dispatch.DispatcherState
dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
run,
func(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
+ container string,
status chan dispatch.Container) {
- waitContainer(dispatcher, container, status)
+ go func() {
+ dispatcher.UpdateState(container, "Complete")
+ }()
+ monitorContainer(dispatcher, container, status)
dispatcher.DoneProcessing <- struct{}{}
})
c.Assert(err, IsNil)
@@ -127,7 +130,7 @@ func (s *TestSuite) TestIntegration(c *C) {
fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
- c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
+ c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
// There should be no queued containers now
@@ -147,7 +150,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -168,14 +171,16 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
log.SetOutput(buf)
defer log.SetOutput(os.Stderr)
- *crunchRunCommand = crunchCmd
+ crunchRunCommand = &crunchCmd
+ finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+ finishCommand = &finishCmd
dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
run,
func(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
+ container string,
status chan dispatch.Container) {
- waitContainer(dispatcher, container, status)
+ monitorContainer(dispatcher, container, status)
dispatcher.DoneProcessing <- struct{}{}
})
c.Assert(err, IsNil)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list