[ARVADOS] created: 4153cb6cfad920ed0b1a4b818d3bcc8de492d134
Git user
git at public.curoverse.com
Thu May 26 16:11:55 EDT 2016
at 4153cb6cfad920ed0b1a4b818d3bcc8de492d134 (commit)
commit 4153cb6cfad920ed0b1a4b818d3bcc8de492d134
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 19 14:12:42 2016 -0400
9187: Refactor dispatcher support into common library and update to use Locking API.
New dispatcher package in Go SDK provides framework for monitoring list of
queued/locked/running containers. Try to lock containers in the queue; locked
or running containers are passed to RunContainer goroutine supplied by the
specific dispatcher. Refactor existing dispatchers (-local and -slurm) to use
this framework. Dispatchers have crash recovery behavior, can put containers
which are unaccounted in cancelled state.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
new file mode 100644
index 0000000..355ed7c
--- /dev/null
+++ b/sdk/go/dispatch/dispatch.go
@@ -0,0 +1,229 @@
+package dispatch
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "log"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+)
+
+// Constants for container states
+const (
+ Queued = "Queued"
+ Locked = "Locked"
+ Running = "Running"
+ Complete = "Complete"
+ Cancelled = "Cancelled"
+)
+
+type apiClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+ Items []apiClientAuthorization `json:"items"`
+}
+
+// Container data
+type Container struct {
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+ LockedByUUID string `json:"locked_by_uuid"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+ Items []Container `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+}
+
+// Dispatcher holds the state of the dispatcher
+type Dispatcher struct {
+ Arv arvadosclient.ArvadosClient
+ RunContainer func(*Dispatcher, Container, chan Container)
+ PollInterval time.Duration
+ DoneProcessing chan struct{}
+
+ mineMutex sync.Mutex
+ mineMap map[string]chan Container
+ auth apiClientAuthorization
+ containers chan Container
+}
+
+// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
+// for which this process is actively starting/monitoring. Returns channel to
+// be used to send container status updates.
+func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+ dispatcher.mineMutex.Lock()
+ defer dispatcher.mineMutex.Unlock()
+ if ch, ok := dispatcher.mineMap[uuid]; ok {
+ return ch
+ }
+
+ ch := make(chan Container)
+ dispatcher.mineMap[uuid] = ch
+ return ch
+}
+
+// Release a container which is no longer being monitored.
+func (dispatcher *Dispatcher) notMine(uuid string) {
+ dispatcher.mineMutex.Lock()
+ defer dispatcher.mineMutex.Unlock()
+ if ch, ok := dispatcher.mineMap[uuid]; ok {
+ close(ch)
+ delete(dispatcher.mineMap, uuid)
+ }
+}
+
+// Check if there is a channel for updates associated with this container. If
+// so send the container record on the channel and return true, if not return
+// false.
+func (dispatcher *Dispatcher) updateMine(c Container) bool {
+ dispatcher.mineMutex.Lock()
+ defer dispatcher.mineMutex.Unlock()
+ ch, ok := dispatcher.mineMap[c.UUID]
+ if ok {
+ ch <- c
+ return true
+ }
+ return false
+}
+
+func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
+ var containers ContainerList
+ err := dispatcher.Arv.List("containers", params, &containers)
+ if err != nil {
+ log.Printf("Error getting list of containers: %q", err)
+ } else {
+ if containers.ItemsAvailable > len(containers.Items) {
+ // TODO: support paging
+ log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+ containers.ItemsAvailable,
+ len(containers.Items))
+ }
+ for _, container := range containers.Items {
+ touched[container.UUID] = true
+ dispatcher.containers <- container
+ }
+ }
+}
+
+func (dispatcher *Dispatcher) pollContainers() {
+ ticker := time.NewTicker(dispatcher.PollInterval)
+
+ paramsQ := arvadosclient.Dict{
+ "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
+ "order": []string{"priority desc"},
+ "limit": "1000"}
+ paramsP := arvadosclient.Dict{
+ "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.auth.UUID}},
+ "limit": "1000"}
+
+ for {
+ select {
+ case <-ticker.C:
+ touched := make(map[string]bool)
+ dispatcher.getContainers(paramsQ, touched)
+ dispatcher.getContainers(paramsP, touched)
+ dispatcher.mineMutex.Lock()
+ var monitored []string
+ for k := range dispatcher.mineMap {
+ if _, ok := touched[k]; !ok {
+ monitored = append(monitored, k)
+ }
+ }
+ dispatcher.mineMutex.Unlock()
+ if monitored != nil {
+ dispatcher.getContainers(arvadosclient.Dict{
+ "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+ }
+ case <-dispatcher.DoneProcessing:
+ close(dispatcher.containers)
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func (dispatcher *Dispatcher) handleUpdate(container Container) {
+ if dispatcher.updateMine(container) {
+ if container.State == Complete || container.State == Cancelled {
+ log.Printf("Container %v now in state %v", container.UUID, container.State)
+ dispatcher.notMine(container.UUID)
+ }
+ return
+ }
+
+ if container.State == Queued {
+ // Try to take the lock
+ if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+ return
+ }
+ container.State = Locked
+ }
+
+ if container.State == Locked || container.State == Running {
+ go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+ }
+}
+
+// UpdateState makes an API call to change the state of a container.
+func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+ err := dispatcher.Arv.Update("containers", uuid,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": newState}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+ }
+ return err
+}
+
+// RunDispatcher runs the main loop of the dispatcher until receiving a message
+// on the dispatcher.DoneProcessing channel. It also installs a signal handler
+// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
+//
+// When a new queued container appears and is successfully locked, the
+// dispatcher will call RunContainer() followed by MonitorContainer(). If a
+// container appears that is Locked or Running but not known to the dispatcher,
+// it will only call monitorContainer(). The monitorContainer() callback is
+// passed a channel over which it will receive updates to the container state.
+// The callback is responsible for draining the channel, if it fails to do so
+// it will deadlock the dispatcher.
+func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+ err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+ if err != nil {
+ log.Printf("Error getting my token UUID: %v", err)
+ return
+ }
+
+ dispatcher.mineMap = make(map[string]chan Container)
+ dispatcher.containers = make(chan Container)
+
+ // Graceful shutdown on signal
+ sigChan := make(chan os.Signal)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ log.Printf("Caught signal: %v", sig)
+ dispatcher.DoneProcessing <- struct{}{}
+ }
+ }(sigChan)
+
+ defer close(sigChan)
+ defer signal.Stop(sigChan)
+
+ go dispatcher.pollContainers()
+ for container := range dispatcher.containers {
+ dispatcher.handleUpdate(container)
+ }
+
+ return nil
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 4023870..cc472a4 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -1,14 +1,15 @@
package main
+// Dispatcher service for Crunch that runs containers locally.
+
import (
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"log"
"os"
"os/exec"
- "os/signal"
"sync"
- "syscall"
"time"
)
@@ -20,12 +21,10 @@ func main() {
}
var (
- arv arvadosclient.ArvadosClient
runningCmds map[string]*exec.Cmd
runningCmdsMutex sync.Mutex
waitGroup sync.WaitGroup
- doneProcessing chan bool
- sigChan chan os.Signal
+ crunchRunCommand *string
)
func doMain() error {
@@ -36,12 +35,7 @@ func doMain() error {
10,
"Interval in seconds to poll for queued containers")
- priorityPollInterval := flags.Int(
- "container-priority-poll-interval",
- 60,
- "Interval in seconds to check priority of a dispatched container")
-
- crunchRunCommand := flags.String(
+ crunchRunCommand = flags.String(
"crunch-run-command",
"/usr/bin/crunch-run",
"Crunch command to run container")
@@ -49,35 +43,32 @@ func doMain() error {
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
+ runningCmds = make(map[string]*exec.Cmd)
+
+ arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
+ log.Printf("Error making Arvados client: %v", err)
return err
}
+ arv.Retries = 25
- // Channel to terminate
- doneProcessing = make(chan bool)
-
- // Map of running crunch jobs
- runningCmds = make(map[string]*exec.Cmd)
-
- // Graceful shutdown
- sigChan = make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- doneProcessing <- true
- }
- }(sigChan)
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ RunContainer: run,
+ PollInterval: time.Duration(*pollInterval) * time.Second,
+ DoneProcessing: make(chan struct{})}
- // Run all queued containers
- runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
+ err = dispatcher.RunDispatcher()
+ if err != nil {
+ return err
+ }
+ runningCmdsMutex.Lock()
// Finished dispatching; interrupt any crunch jobs that are still running
for _, cmd := range runningCmds {
cmd.Process.Signal(os.Interrupt)
}
+ runningCmdsMutex.Unlock()
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
@@ -85,166 +76,98 @@ func doMain() error {
return nil
}
-// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
- ticker := time.NewTicker(pollInterval)
-
- for {
- select {
- case <-ticker.C:
- dispatchLocal(priorityPollInterval, crunchRunCommand)
- case <-doneProcessing:
- ticker.Stop()
- return
- }
- }
+func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+ return cmd.Start()
}
-// Container data
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
+var startCmd = startFunc
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
- params := arvadosclient.Dict{
- "filters": [][]string{[]string{"state", "=", "Queued"}},
- }
+// Run a container.
+//
+// If the container is Locked, start a new crunch-run process and wait until
+// crunch-run completes. If the priority is set to zero, set an interrupt
+// signal to the crunch-run process.
+//
+// If the container is in any other state, or is not Complete/Cancelled after
+// crunch-run terminates, mark the container as Cancelled.
+func run(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
- var containers ContainerList
- err := arv.List("containers", params, &containers)
- if err != nil {
- log.Printf("Error getting list of queued containers: %q", err)
- return
- }
+ uuid := container.UUID
- for _, c := range containers.Items {
- log.Printf("About to run queued container %v", c.UUID)
- // Run the container
+ if container.State == dispatch.Locked {
waitGroup.Add(1)
- go func(c Container) {
- run(c.UUID, crunchRunCommand, pollInterval)
- waitGroup.Done()
- }(c)
- }
-}
-
-func updateState(uuid, newState string) error {
- err := arv.Update("containers", uuid,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": newState}},
- nil)
- if err != nil {
- log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
- }
- return err
-}
-
-// Run queued container:
-// Set container state to Locked
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
- if err := updateState(uuid, "Locked"); err != nil {
- return
- }
- cmd := exec.Command(crunchRunCommand, uuid)
- cmd.Stdin = nil
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stderr
+ cmd := exec.Command(*crunchRunCommand, uuid)
+ cmd.Stdin = nil
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stderr
- // Add this crunch job to the list of runningCmds only if we
- // succeed in starting crunch-run.
- runningCmdsMutex.Lock()
- if err := cmd.Start(); err != nil {
- log.Printf("Error starting crunch-run for %v: %q", uuid, err)
- runningCmdsMutex.Unlock()
- updateState(uuid, "Queued")
- return
- }
- runningCmds[uuid] = cmd
- runningCmdsMutex.Unlock()
+ log.Printf("Starting container %v", uuid)
- defer func() {
- setFinalState(uuid)
+ // Add this crunch job to the list of runningCmds only if we
+ // succeed in starting crunch-run.
- // Remove the crunch job from runningCmds
runningCmdsMutex.Lock()
- delete(runningCmds, uuid)
- runningCmdsMutex.Unlock()
- }()
-
- log.Printf("Starting container %v", uuid)
-
- updateState(uuid, "Running")
+ if err := startCmd(container, cmd); err != nil {
+ runningCmdsMutex.Unlock()
+ log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
+ dispatcher.UpdateState(uuid, dispatch.Cancelled)
+ } else {
+ runningCmds[uuid] = cmd
+ runningCmdsMutex.Unlock()
+
+ // Need to wait for crunch-run to exit
+ done := make(chan struct{})
+
+ go func() {
+ if _, err := cmd.Process.Wait(); err != nil {
+ log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+ }
+ log.Printf("sending done")
+ done <- struct{}{}
+ }()
+
+ Loop:
+ for {
+ select {
+ case <-done:
+ break Loop
+ case c := <-status:
+ // Interrupt the child process if priority changes to 0
+ if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
+ }
+ }
+ }
+ close(done)
- cmdExited := make(chan struct{})
+ log.Printf("Finished container run for %v", uuid)
- // Kill the child process if container priority changes to zero
- go func() {
- ticker := time.NewTicker(pollInterval)
- defer ticker.Stop()
- for {
- select {
- case <-cmdExited:
- return
- case <-ticker.C:
- }
- var container Container
- err := arv.Get("containers", uuid, nil, &container)
- if err != nil {
- log.Printf("Error getting container %v: %q", uuid, err)
- continue
- }
- if container.Priority == 0 {
- log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
- cmd.Process.Signal(os.Interrupt)
- }
+ // Remove the crunch job from runningCmds
+ runningCmdsMutex.Lock()
+ delete(runningCmds, uuid)
+ runningCmdsMutex.Unlock()
}
- }()
-
- // Wait for crunch-run to exit
- if _, err := cmd.Process.Wait(); err != nil {
- log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+ waitGroup.Done()
}
- close(cmdExited)
-
- log.Printf("Finished container run for %v", uuid)
-}
-func setFinalState(uuid string) {
- // The container state should now be 'Complete' if everything
- // went well. If it started but crunch-run didn't change its
- // final state to 'Running', fix that now. If it never even
- // started, cancel it as unrunnable. (TODO: Requeue instead,
- // and fix tests so they can tell something happened even if
- // the final state is Queued.)
- var container Container
- err := arv.Get("containers", uuid, nil, &container)
+ // If the container is not finalized, then change it to "Cancelled".
+ err := dispatcher.Arv.Get("containers", uuid, nil, &container)
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- fixState := map[string]string{
- "Running": "Complete",
- "Locked": "Cancelled",
+ if container.State != dispatch.Complete && container.State != dispatch.Cancelled {
+ log.Printf("After %s process termination, container state for %v is %q. Updating it to %q",
+ *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
+ dispatcher.UpdateState(uuid, dispatch.Cancelled)
}
- if newState, ok := fixState[container.State]; ok {
- log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
- updateState(uuid, newState)
+
+ // drain any subsequent status changes
+ for _ = range status {
}
+
+ log.Printf("Finalized container %v", uuid)
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index e3ab3a4..aca60e9 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -1,19 +1,20 @@
package main
import (
+ "bytes"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-
- "bytes"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ . "gopkg.in/check.v1"
+ "io"
"log"
"net/http"
"net/http/httptest"
"os"
- "syscall"
+ "os/exec"
+ "strings"
"testing"
"time"
-
- . "gopkg.in/check.v1"
)
// Gocheck boilerplate
@@ -32,6 +33,7 @@ var initialArgs []string
func (s *TestSuite) SetUpSuite(c *C) {
initialArgs = os.Args
arvadostest.StartAPI()
+ runningCmds = make(map[string]*exec.Cmd)
}
func (s *TestSuite) TearDownSuite(c *C) {
@@ -41,12 +43,6 @@ func (s *TestSuite) TearDownSuite(c *C) {
func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-local"}
os.Args = args
-
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- c.Fatalf("Error making arvados client: %s", err)
- }
}
func (s *TestSuite) TearDownTest(c *C) {
@@ -58,29 +54,48 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) Test_doMain(c *C) {
- args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
- os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
+
+ echo := "echo"
+ crunchRunCommand = &echo
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
+ startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ dispatcher.UpdateState(container.UUID, "Running")
+ dispatcher.UpdateState(container.UUID, "Complete")
+ return cmd.Start()
+ }
- go func() {
- time.Sleep(5 * time.Second)
- sigChan <- syscall.SIGINT
- }()
+ err = dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
- err := doMain()
- c.Check(err, IsNil)
+ // Wait for all running crunch jobs to complete / terminate
+ waitGroup.Wait()
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers ContainerList
+ var containers dispatch.ContainerList
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container Container
+ var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
c.Check(container.State, Equals, "Complete")
@@ -90,13 +105,13 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
@@ -106,31 +121,35 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2","State":"Queued"}]}`)}
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
testWithServerStub(c, apiStubResponses, "echo",
- "After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+ `After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2". Updating it to "Cancelled"`)
}
func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] =
- arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3","State":"Queued"}]}`)}
+
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
- testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
+ testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting nosuchcommand for zzzzz-dz642-xxxxxxxxxxxxxx3")
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+ apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
+ arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+
apiStub := arvadostest.ServerStub{apiStubResponses}
api := httptest.NewServer(&apiStub)
defer api.Close()
- arv = arvadosclient.ArvadosClient{
+ arv := arvadosclient.ArvadosClient{
Scheme: "http",
ApiServer: api.URL[7:],
ApiToken: "abc123",
@@ -139,15 +158,38 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(buf)
+ log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
+ *crunchRunCommand = crunchCmd
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
+ startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ dispatcher.UpdateState(container.UUID, "Running")
+ dispatcher.UpdateState(container.UUID, "Complete")
+ return cmd.Start()
+ }
+
go func() {
- time.Sleep(2 * time.Second)
- sigChan <- syscall.SIGTERM
+ for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+ time.Sleep(100 * time.Millisecond)
+ }
+ dispatcher.DoneProcessing <- struct{}{}
}()
- runQueuedContainers(time.Second, time.Second, crunchCmd)
+ err := dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 53e4705..641b4bc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -1,19 +1,19 @@
package main
+// Dispatcher service for Crunch that submits containers to the slurm queue.
+
import (
"bufio"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"io/ioutil"
"log"
"math"
"os"
"os/exec"
- "os/signal"
- "strconv"
- "sync"
- "syscall"
+ "strings"
"time"
)
@@ -25,12 +25,8 @@ func main() {
}
var (
- arv arvadosclient.ArvadosClient
- runningCmds map[string]*exec.Cmd
- runningCmdsMutex sync.Mutex
- waitGroup sync.WaitGroup
- doneProcessing chan bool
- sigChan chan os.Signal
+ crunchRunCommand *string
+ finishCommand *string
)
func doMain() error {
@@ -41,17 +37,12 @@ func doMain() error {
10,
"Interval in seconds to poll for queued containers")
- priorityPollInterval := flags.Int(
- "container-priority-poll-interval",
- 60,
- "Interval in seconds to check priority of a dispatched container")
-
- crunchRunCommand := flags.String(
+ crunchRunCommand = flags.String(
"crunch-run-command",
"/usr/bin/crunch-run",
"Crunch command to run container")
- finishCommand := flags.String(
+ finishCommand = flags.String(
"finish-command",
"/usr/bin/crunch-finish-slurm.sh",
"Command to run from strigger when job is finished")
@@ -59,142 +50,56 @@ func doMain() error {
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
+ arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
+ log.Printf("Error making Arvados client: %v", err)
return err
}
+ arv.Retries = 25
- // Channel to terminate
- doneProcessing = make(chan bool)
-
- // Graceful shutdown
- sigChan = make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- doneProcessing <- true
- }
- }(sigChan)
-
- // Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
-
- // Wait for all running crunch jobs to complete / terminate
- waitGroup.Wait()
-
- return nil
-}
-
-type apiClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
- Items []apiClientAuthorization `json:"items"`
-}
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ RunContainer: run,
+ PollInterval: time.Duration(*pollInterval) * time.Second,
+ DoneProcessing: make(chan struct{})}
-// Poll for queued containers using pollInterval.
-// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
- var auth apiClientAuthorization
- err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+ err = dispatcher.RunDispatcher()
if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
- return
- }
-
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
- for {
- select {
- case <-ticker.C:
- dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
- case <-doneProcessing:
- ticker.Stop()
- return
- }
- }
-}
-
-// Container data
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run
-// for each container.
-func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
- params := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
- }
-
- var containers ContainerList
- err := arv.List("containers", params, &containers)
- if err != nil {
- log.Printf("Error getting list of queued containers: %q", err)
- return
+ return err
}
- for _, container := range containers.Items {
- if container.State == "Locked" {
- if container.LockedByUUID != auth.UUID {
- // Locked by a different dispatcher
- continue
- } else if checkMine(container.UUID) {
- // I already have a goroutine running
- // for this container: it just hasn't
- // gotten past Locked state yet.
- continue
- }
- log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
- "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
- container.UUID, auth.UUID)
- setMine(container.UUID, true)
- go func() {
- waitContainer(container, pollInterval)
- setMine(container.UUID, false)
- }()
- }
- go run(container, crunchRunCommand, finishCommand, pollInterval)
- }
+ return nil
}
// sbatchCmd
-func sbatchFunc(container Container) *exec.Cmd {
+func sbatchFunc(container dispatch.Container) *exec.Cmd {
memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
return exec.Command("sbatch", "--share", "--parsable",
- "--job-name="+container.UUID,
- "--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
- "--cpus-per-task="+strconv.Itoa(int(container.RuntimeConstraints["vcpus"])))
+ fmt.Sprintf("--job-name=%s", container.UUID),
+ fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
+ fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--priority=%d", container.Priority))
}
-var sbatchCmd = sbatchFunc
-
// striggerCmd
func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
}
+// squeueFunc
+func squeueFunc() *exec.Cmd {
+ return exec.Command("squeue", "--format=%j")
+}
+
+// Wrap these so that they can be overridden by tests
var striggerCmd = striggerFunc
+var sbatchCmd = sbatchFunc
+var squeueCmd = squeueFunc
// Submit job to slurm using sbatch.
-func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+func submit(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
defer func() {
@@ -204,7 +109,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
// OK, no cleanup needed
return
}
- err := arv.Update("containers", container.UUID,
+ err := dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Queued"}},
nil)
@@ -244,7 +149,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
b, _ := ioutil.ReadAll(stdoutReader)
stdoutReader.Close()
stdoutChan <- b
- close(stdoutChan)
}()
stderrChan := make(chan []byte)
@@ -252,7 +156,6 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
b, _ := ioutil.ReadAll(stderrReader)
stderrReader.Close()
stderrChan <- b
- close(stderrChan)
}()
// Send a tiny script on stdin to execute the crunch-run command
@@ -265,21 +168,28 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
stdoutMsg := <-stdoutChan
stderrmsg := <-stderrChan
+ close(stdoutChan)
+ close(stderrChan)
+
if err != nil {
submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
return
}
// If everything worked out, got the jobid on stdout
- jobid = string(stdoutMsg)
+ jobid = strings.TrimSpace(string(stdoutMsg))
return
}
// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
- cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
+ insecure := "0"
+ if arv.ApiInsecure {
+ insecure = "1"
+ }
+ cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@@ -291,104 +201,8 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
}
}
-// Run a queued container: [1] Set container state to locked. [2]
-// Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
- setMine(container.UUID, true)
- defer setMine(container.UUID, false)
-
- // Update container status to Locked. This will fail if
- // another dispatcher (token) has already locked it. It will
- // succeed if *this* dispatcher has already locked it.
- err := arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Locked"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
- return
- }
-
- log.Printf("About to submit queued container %v", container.UUID)
-
- jobid, err := submit(container, crunchRunCommand)
- if err != nil {
- log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
- return
- }
-
- insecure := "0"
- if arv.ApiInsecure {
- insecure = "1"
- }
- finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
-
- // Update container status to Running. This will fail if
- // another dispatcher (token) has already locked it. It will
- // succeed if *this* dispatcher has already locked it.
- err = arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Running"}},
- nil)
- if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
- }
- log.Printf("Submitted container %v to slurm", container.UUID)
- waitContainer(container, pollInterval)
-}
-
-// Wait for a container to finish. Cancel the slurm job if the
-// container priority changes to zero before it ends.
-func waitContainer(container Container, pollInterval time.Duration) {
- log.Printf("Monitoring container %v started", container.UUID)
- defer log.Printf("Monitoring container %v finished", container.UUID)
-
- pollTicker := time.NewTicker(pollInterval)
- defer pollTicker.Stop()
- for _ = range pollTicker.C {
- var updated Container
- err := arv.Get("containers", container.UUID, nil, &updated)
- if err != nil {
- log.Printf("Error getting container %s: %q", container.UUID, err)
- continue
- }
- if updated.State == "Complete" || updated.State == "Cancelled" {
- return
- }
- if updated.Priority != 0 {
- continue
- }
-
- // Priority is zero, but state is Running or Locked
- log.Printf("Canceling container %s", container.UUID)
-
- err = exec.Command("scancel", "--name="+container.UUID).Run()
- if err != nil {
- log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- continue
- } else if inQ {
- log.Printf("Container %s is still in squeue; will retry", container.UUID)
- continue
- }
- }
-
- err = arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Cancelled"}},
- nil)
- if err != nil {
- log.Printf("Error updating state for container %s: %s", container.UUID, err)
- continue
- }
-
- return
- }
-}
-
func checkSqueue(uuid string) (bool, error) {
- cmd := exec.Command("squeue", "--format=%j")
+ cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
return false, err
@@ -408,25 +222,67 @@ func checkSqueue(uuid string) (bool, error) {
return found, nil
}
-var mineMutex sync.RWMutex
-var mineMap = make(map[string]bool)
-
-// Goroutine-safely add/remove uuid to the set of "my" containers,
-// i.e., ones for which this process has a goroutine running.
-func setMine(uuid string, t bool) {
- mineMutex.Lock()
- if t {
- mineMap[uuid] = true
- } else {
- delete(mineMap, uuid)
+// Run or monitor a container.
+//
+// If the container is marked as Locked, check if it is already in the slurm
+// queue. If not, submit it.
+//
+// If the container is marked as Running, check if it is in the slurm queue.
+// If not, mark it as Cancelled.
+//
+// Monitor status updates. If the priority changes to zero, cancel the
+// container using scancel.
+func run(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+
+ uuid := container.UUID
+
+ if container.State == dispatch.Locked {
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ } else if !inQ {
+ log.Printf("About to submit queued container %v", container.UUID)
+
+ jobid, err := submit(dispatcher, container, *crunchRunCommand)
+ if err != nil {
+ log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
+ } else {
+ finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
+ }
+ }
+ } else if container.State == dispatch.Running {
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ } else if !inQ {
+ log.Printf("Container %s in Running state but not in slurm queue, marking Cancelled.", container.UUID)
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
+ }
+
+ log.Printf("Monitoring container %v started", uuid)
+
+ for container = range status {
+ if (container.State == dispatch.Locked || container.State == dispatch.Running) && container.Priority == 0 {
+ log.Printf("Canceling container %s", container.UUID)
+
+ err := exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+ continue
+ }
+ }
+
+ err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ }
}
- mineMutex.Unlock()
-}
-// Check whether there is already a goroutine running for this
-// container.
-func checkMine(uuid string) bool {
- mineMutex.RLocker().Lock()
- defer mineMutex.RLocker().Unlock()
- return mineMap[uuid]
+ log.Printf("Monitoring container %v finished", uuid)
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3dfb7d5..348d5e4 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -3,6 +3,7 @@ package main
import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"bytes"
"fmt"
@@ -12,9 +13,7 @@ import (
"net/http/httptest"
"os"
"os/exec"
- "strconv"
"strings"
- "syscall"
"testing"
"time"
@@ -47,11 +46,6 @@ func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-slurm"}
os.Args = args
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- c.Fatalf("Error making arvados client: %s", err)
- }
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
@@ -64,18 +58,18 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) Test_doMain(c *C) {
- args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
- os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
var sbatchCmdLine []string
var striggerCmdLine []string
// Override sbatchCmd
- defer func(orig func(Container) *exec.Cmd) {
+ defer func(orig func(dispatch.Container) *exec.Cmd) {
sbatchCmd = orig
}(sbatchCmd)
- sbatchCmd = func(container Container) *exec.Cmd {
+ sbatchCmd = func(container dispatch.Container) *exec.Cmd {
sbatchCmdLine = sbatchFunc(container).Args
return exec.Command("sh")
}
@@ -90,41 +84,65 @@ func (s *TestSuite) Test_doMain(c *C) {
apiHost, apiToken, apiInsecure).Args
go func() {
time.Sleep(5 * time.Second)
- for _, state := range []string{"Running", "Complete"} {
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": state}},
- nil)
- }
+ arv.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": dispatch.Complete}},
+ nil)
}()
- return exec.Command("echo", "strigger")
+ return exec.Command("echo", striggerCmdLine...)
}
- go func() {
- time.Sleep(8 * time.Second)
- sigChan <- syscall.SIGINT
- }()
+ // Override squeueCmd
+ defer func(orig func() *exec.Cmd) {
+ squeueCmd = orig
+ }(squeueCmd)
+ squeueCmd = func() *exec.Cmd {
+ return exec.Command("echo")
+ }
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers ContainerList
- err := arv.List("containers", params, &containers)
+ var containers dispatch.ContainerList
+ err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
- err = doMain()
- c.Check(err, IsNil)
+ echo := "echo"
+ crunchRunCommand = &echo
+ finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+ finishCommand = &finishCmd
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ go func() {
+ time.Sleep(1)
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ }()
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
+ err = dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
item := containers.Items[0]
sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", item.UUID),
- fmt.Sprintf("--mem-per-cpu=%s", strconv.Itoa(int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576))))),
- fmt.Sprintf("--cpus-per-task=%s", strconv.Itoa(int(item.RuntimeConstraints["vcpus"])))}
+ fmt.Sprintf("--mem-per-cpu=%d", int(math.Ceil(float64(item.RuntimeConstraints["ram"])/float64(item.RuntimeConstraints["vcpus"]*1048576)))),
+ fmt.Sprintf("--cpus-per-task=%d", int(item.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--priority=%d", item.Priority)}
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
- c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
+ c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer", "--fini",
"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
// There should be no queued containers now
@@ -133,7 +151,7 @@ func (s *TestSuite) Test_doMain(c *C) {
c.Check(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container Container
+ var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
c.Check(container.State, Equals, "Complete")
@@ -144,7 +162,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -153,7 +171,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
api := httptest.NewServer(&apiStub)
defer api.Close()
- arv = arvadosclient.ArvadosClient{
+ arv := arvadosclient.ArvadosClient{
Scheme: "http",
ApiServer: api.URL[7:],
ApiToken: "abc123",
@@ -165,14 +183,36 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
log.SetOutput(buf)
defer log.SetOutput(os.Stderr)
+ crunchRunCommand = &crunchCmd
+ finishCmd := "/usr/bin/crunch-finish-slurm.sh"
+ finishCommand = &finishCmd
+
+ doneProcessing := make(chan struct{})
+ dispatcher := dispatch.Dispatcher{
+ Arv: arv,
+ PollInterval: time.Duration(1) * time.Second,
+ RunContainer: func(dispatcher *dispatch.Dispatcher,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ go func() {
+ time.Sleep(1)
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ }()
+ run(dispatcher, container, status)
+ doneProcessing <- struct{}{}
+ },
+ DoneProcessing: doneProcessing}
+
go func() {
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- sigChan <- syscall.SIGTERM
+ dispatcher.DoneProcessing <- struct{}{}
}()
- runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+ err := dispatcher.RunDispatcher()
+ c.Assert(err, IsNil)
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list