[ARVADOS] created: d3781de388530d06379974601247a6c044eee92e
git at public.curoverse.com
git at public.curoverse.com
Wed Jan 13 11:17:04 EST 2016
at d3781de388530d06379974601247a6c044eee92e (commit)
commit d3781de388530d06379974601247a6c044eee92e
Author: radhika <radhika at curoverse.com>
Date: Wed Jan 13 11:15:28 2016 -0500
8028: crunch-dispatch-local implementation
diff --git a/services/api/test/fixtures/containers.yml b/services/api/test/fixtures/containers.yml
new file mode 100644
index 0000000..22004b4
--- /dev/null
+++ b/services/api/test/fixtures/containers.yml
@@ -0,0 +1,25 @@
+queued:
+ uuid: zzzzz-dz642-queuedcontainer
+ owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ state: Queued
+ priority: 1
+ created_at: 2016-01-11 11:11:11.111111111 Z
+ updated_at: 2016-01-11 11:11:11.111111111 Z
+ container_image: test
+ cwd: test
+ output: test
+ output_path: test
+ command: ["echo", "hello"]
+
+completed:
+ uuid: zzzzz-dz642-compltcontainer
+ owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ state: Complete
+ priority: 1
+ created_at: 2016-01-11 11:11:11.111111111 Z
+ updated_at: 2016-01-11 11:11:11.111111111 Z
+ container_image: test
+ cwd: test
+ output: test
+ output_path: test
+ command: ["echo", "hello"]
diff --git a/services/crunch-dispatch-local/.gitignore b/services/crunch-dispatch-local/.gitignore
new file mode 100644
index 0000000..7c1070a
--- /dev/null
+++ b/services/crunch-dispatch-local/.gitignore
@@ -0,0 +1 @@
+crunch-dispatch-local
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
new file mode 100644
index 0000000..9fb4cb9
--- /dev/null
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -0,0 +1,176 @@
+package main
+
+import (
+ "flag"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "log"
+ "os"
+ "os/exec"
+ "time"
+)
+
+func main() {
+ err := doMain()
+ if err != nil {
+ log.Fatalf("%q", err)
+ }
+}
+
+var arv arvadosclient.ArvadosClient
+
+func doMain() error {
+ flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
+
+ pollInterval := flags.Int(
+ "poll-interval",
+ 10,
+ "Interval in seconds to poll for queued containers")
+
+ priorityPollInterval := flags.Int(
+ "container-priority-poll-interval",
+ 60,
+ "Interval in seconds to check priority of a dispatched container")
+
+ crunchRunCommand := flags.String(
+ "crunch-run-command",
+ "/usr/bin/crunch-run",
+ "Crunch command to run container")
+
+ // Parse args; omit the first arg which is the command name
+ flags.Parse(os.Args[1:])
+
+ var err error
+ arv, err = arvadosclient.MakeArvadosClient()
+ if err != nil {
+ return err
+ }
+
+ // channel to terminate
+ doneProcessing = make(chan bool)
+
+ // run all queued containers
+ runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+ return nil
+}
+
+var doneProcessing chan bool
+
+// Poll for queued containers using pollInterval.
+// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+//
+// Any errors encountered are logged but the program would continue to run (not exit).
+// This is because, once one or more child processes are running,
+// we would need to wait for them complete.
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+ ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+
+ for {
+ select {
+ case <-ticker.C:
+ dispatchLocal(priorityPollInterval, crunchRunCommand)
+ case <-doneProcessing:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+// Container data
+type Container struct {
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+ ItemsAvailable int `json:"items_available"`
+ Items []Container `json:"items"`
+}
+
+// Get the list of queued containers from API server and invoke run for each container.
+func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+ params := arvadosclient.Dict{
+ "filters": [][]string{[]string{"state", "=", "Queued"}},
+ }
+
+ var containers ContainerList
+ err := arv.List("containers", params, &containers)
+ if err != nil {
+ log.Printf("Error getting list of queued containers: %q", err)
+ return
+ }
+
+ for i := 0; i < containers.ItemsAvailable; i++ {
+ log.Printf("About to run queued container %v", containers.Items[i].UUID)
+ go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ }
+}
+
+// Run queued container:
+// Set container state to locked (TBD)
+// Run container using the given crunch-run command
+// Set the container state to Running
+// If the container priority becomes zero while crunch job is still running, terminate it.
+func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+ cmd := exec.Command(crunchRunCommand, "--job", uuid)
+
+ cmd.Stdin = nil
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stderr
+ if err := cmd.Start(); err != nil {
+ log.Printf("Error running container for %v: %q", uuid, err)
+ return
+ }
+
+ log.Printf("Started container run for %v", uuid)
+
+ err := arv.Update("containers", uuid,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Running"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+ }
+
+ // Terminate the runner if container priority becomes zero
+ priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+ go func() {
+ for {
+ select {
+ case <-priorityTicker.C:
+ var container Container
+ err := arv.Get("containers", uuid, nil, &container)
+ if err != nil {
+ log.Printf("Error getting container info for %v: %q", uuid, err)
+ } else {
+ if container.Priority == 0 {
+ priorityTicker.Stop()
+ cmd.Process.Kill()
+ return
+ }
+ }
+ }
+ }
+ }()
+
+ // Wait for the process to exit
+ if _, err := cmd.Process.Wait(); err != nil {
+ log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+ }
+
+ priorityTicker.Stop()
+
+ var container Container
+ err = arv.Get("containers", uuid, nil, &container)
+ if container.State == "Running" {
+ log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
+ err = arv.Update("containers", uuid,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Complete"}},
+ &container)
+ if err != nil {
+ log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
+ }
+ }
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
new file mode 100644
index 0000000..1d526b9
--- /dev/null
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -0,0 +1,158 @@
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+var _ = Suite(&TestSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
+
+type TestSuite struct{}
+type MockArvadosServerSuite struct{}
+
+var initialArgs []string
+
+func (s *TestSuite) SetUpSuite(c *C) {
+ initialArgs = os.Args
+ arvadostest.StartAPI()
+}
+
+func (s *TestSuite) TearDownSuite(c *C) {
+ arvadostest.StopAPI()
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+ args := []string{"crunch-dispatch-local"}
+ os.Args = args
+
+ var err error
+ arv, err = arvadosclient.MakeArvadosClient()
+ if err != nil {
+ c.Fatalf("Error making arvados client: %s", err)
+ }
+}
+
+func (s *TestSuite) TearDownTest(c *C) {
+ arvadostest.ResetEnv()
+ os.Args = initialArgs
+}
+
+func (s *MockArvadosServerSuite) TearDownTest(c *C) {
+ arvadostest.ResetEnv()
+}
+
+func (s *TestSuite) Test_doMain(c *C) {
+ args := []string{"-poll-interval", "1", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
+ os.Args = append(os.Args, args...)
+
+ go func() {
+ time.Sleep(2 * time.Second)
+ doneProcessing <- true
+ }()
+
+ err := doMain()
+ c.Check(err, IsNil)
+
+ // There should be no queued containers now
+ params := arvadosclient.Dict{
+ "filters": [][]string{[]string{"state", "=", "Queued"}},
+ }
+ var containers ContainerList
+ err = arv.List("containers", params, &containers)
+ c.Check(err, IsNil)
+ c.Assert(containers.ItemsAvailable, Equals, 0)
+
+ // Previously "Queued" container should now be in "Complete" state
+ var container Container
+ err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
+ c.Check(err, IsNil)
+ c.Check(container.State, Equals, "Complete")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
+ apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
+
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
+ apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/containers"] =
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+ apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
+ arvadostest.StubResponse{500, string(`{}`)}
+
+ testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+}
+
+func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
+ apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/containers"] =
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+ apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
+ arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+
+ testWithServerStub(c, apiStubResponses, "echo",
+ "After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+}
+
+func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
+ apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/containers"] =
+ arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+ apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
+ arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
+
+ testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+}
+
+func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+ apiStub := arvadostest.ServerStub{apiStubResponses}
+
+ api := httptest.NewServer(&apiStub)
+ defer api.Close()
+
+ arv = arvadosclient.ArvadosClient{
+ Scheme: "http",
+ ApiServer: api.URL[7:],
+ ApiToken: "abc123",
+ Client: &http.Client{Transport: &http.Transport{}},
+ Retries: 0,
+ }
+
+ tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
+ c.Check(err, IsNil)
+ defer os.Remove(tempfile.Name())
+ log.SetOutput(tempfile)
+
+ go func() {
+ time.Sleep(1 * time.Second)
+ doneProcessing <- true
+ }()
+
+ runQueuedContainers(1, 1, crunchCmd)
+
+ // Give some time for run goroutine to complete
+ time.Sleep(5 * time.Second)
+
+ buf, _ := ioutil.ReadFile(tempfile.Name())
+ c.Check(strings.Contains(string(buf), expected), Equals, true)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list