[ARVADOS] created: d3781de388530d06379974601247a6c044eee92e

git at public.curoverse.com git at public.curoverse.com
Wed Jan 13 11:17:04 EST 2016


        at  d3781de388530d06379974601247a6c044eee92e (commit)


commit d3781de388530d06379974601247a6c044eee92e
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 13 11:15:28 2016 -0500

    8028: crunch-dispatch-local implementation

diff --git a/services/api/test/fixtures/containers.yml b/services/api/test/fixtures/containers.yml
new file mode 100644
index 0000000..22004b4
--- /dev/null
+++ b/services/api/test/fixtures/containers.yml
@@ -0,0 +1,25 @@
+queued:
+  uuid: zzzzz-dz642-queuedcontainer
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  state: Queued
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  container_image: test
+  cwd: test
+  output: test
+  output_path: test
+  command: ["echo", "hello"]
+
+completed:
+  uuid: zzzzz-dz642-compltcontainer
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  state: Complete
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  container_image: test
+  cwd: test
+  output: test
+  output_path: test
+  command: ["echo", "hello"]
diff --git a/services/crunch-dispatch-local/.gitignore b/services/crunch-dispatch-local/.gitignore
new file mode 100644
index 0000000..7c1070a
--- /dev/null
+++ b/services/crunch-dispatch-local/.gitignore
@@ -0,0 +1 @@
+crunch-dispatch-local
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
new file mode 100644
index 0000000..9fb4cb9
--- /dev/null
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -0,0 +1,176 @@
+package main
+
+import (
+	"flag"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"log"
+	"os"
+	"os/exec"
+	"time"
+)
+
+func main() {
+	err := doMain()
+	if err != nil {
+		log.Fatalf("%q", err)
+	}
+}
+
+var arv arvadosclient.ArvadosClient
+
+func doMain() error {
+	flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
+
+	pollInterval := flags.Int(
+		"poll-interval",
+		10,
+		"Interval in seconds to poll for queued containers")
+
+	priorityPollInterval := flags.Int(
+		"container-priority-poll-interval",
+		60,
+		"Interval in seconds to check priority of a dispatched container")
+
+	crunchRunCommand := flags.String(
+		"crunch-run-command",
+		"/usr/bin/crunch-run",
+		"Crunch command to run container")
+
+	// Parse args; omit the first arg which is the command name
+	flags.Parse(os.Args[1:])
+
+	var err error
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		return err
+	}
+
+	// channel to terminate
+	doneProcessing = make(chan bool)
+
+	// run all queued containers
+	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+	return nil
+}
+
+var doneProcessing chan bool
+
+// Poll for queued containers using pollInterval.
+// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+//
+// Any errors encountered are logged but the program would continue to run (not exit).
+// This is because, once one or more child processes are running,
+// we would need to wait for them complete.
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+
+	for {
+		select {
+		case <-ticker.C:
+			dispatchLocal(priorityPollInterval, crunchRunCommand)
+		case <-doneProcessing:
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+// Container data
+type Container struct {
+	UUID     string `json:"uuid"`
+	State    string `json:"state"`
+	Priority int    `json:"priority"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+	ItemsAvailable int         `json:"items_available"`
+	Items          []Container `json:"items"`
+}
+
+// Get the list of queued containers from API server and invoke run for each container.
+func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+	params := arvadosclient.Dict{
+		"filters": [][]string{[]string{"state", "=", "Queued"}},
+	}
+
+	var containers ContainerList
+	err := arv.List("containers", params, &containers)
+	if err != nil {
+		log.Printf("Error getting list of queued containers: %q", err)
+		return
+	}
+
+	for i := 0; i < containers.ItemsAvailable; i++ {
+		log.Printf("About to run queued container %v", containers.Items[i].UUID)
+		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+	}
+}
+
+// Run queued container:
+// Set container state to locked (TBD)
+// Run container using the given crunch-run command
+// Set the container state to Running
+// If the container priority becomes zero while crunch job is still running, terminate it.
+func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+	cmd := exec.Command(crunchRunCommand, "--job", uuid)
+
+	cmd.Stdin = nil
+	cmd.Stderr = os.Stderr
+	cmd.Stdout = os.Stderr
+	if err := cmd.Start(); err != nil {
+		log.Printf("Error running container for %v: %q", uuid, err)
+		return
+	}
+
+	log.Printf("Started container run for %v", uuid)
+
+	err := arv.Update("containers", uuid,
+		arvadosclient.Dict{
+			"container": arvadosclient.Dict{"state": "Running"}},
+		nil)
+	if err != nil {
+		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+	}
+
+	// Terminate the runner if container priority becomes zero
+	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+	go func() {
+		for {
+			select {
+			case <-priorityTicker.C:
+				var container Container
+				err := arv.Get("containers", uuid, nil, &container)
+				if err != nil {
+					log.Printf("Error getting container info for %v: %q", uuid, err)
+				} else {
+					if container.Priority == 0 {
+						priorityTicker.Stop()
+						cmd.Process.Kill()
+						return
+					}
+				}
+			}
+		}
+	}()
+
+	// Wait for the process to exit
+	if _, err := cmd.Process.Wait(); err != nil {
+		log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+	}
+
+	priorityTicker.Stop()
+
+	var container Container
+	err = arv.Get("containers", uuid, nil, &container)
+	if container.State == "Running" {
+		log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
+		err = arv.Update("containers", uuid,
+			arvadosclient.Dict{
+				"container": arvadosclient.Dict{"state": "Complete"}},
+			&container)
+		if err != nil {
+			log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
+		}
+	}
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
new file mode 100644
index 0000000..1d526b9
--- /dev/null
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -0,0 +1,158 @@
+package main
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
+	"io/ioutil"
+	"log"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"strings"
+	"testing"
+	"time"
+
+	. "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+var _ = Suite(&TestSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
+
+type TestSuite struct{}
+type MockArvadosServerSuite struct{}
+
+var initialArgs []string
+
+func (s *TestSuite) SetUpSuite(c *C) {
+	initialArgs = os.Args
+	arvadostest.StartAPI()
+}
+
+func (s *TestSuite) TearDownSuite(c *C) {
+	arvadostest.StopAPI()
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+	args := []string{"crunch-dispatch-local"}
+	os.Args = args
+
+	var err error
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		c.Fatalf("Error making arvados client: %s", err)
+	}
+}
+
+func (s *TestSuite) TearDownTest(c *C) {
+	arvadostest.ResetEnv()
+	os.Args = initialArgs
+}
+
+func (s *MockArvadosServerSuite) TearDownTest(c *C) {
+	arvadostest.ResetEnv()
+}
+
+func (s *TestSuite) Test_doMain(c *C) {
+	args := []string{"-poll-interval", "1", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
+	os.Args = append(os.Args, args...)
+
+	go func() {
+		time.Sleep(2 * time.Second)
+		doneProcessing <- true
+	}()
+
+	err := doMain()
+	c.Check(err, IsNil)
+
+	// There should be no queued containers now
+	params := arvadosclient.Dict{
+		"filters": [][]string{[]string{"state", "=", "Queued"}},
+	}
+	var containers ContainerList
+	err = arv.List("containers", params, &containers)
+	c.Check(err, IsNil)
+	c.Assert(containers.ItemsAvailable, Equals, 0)
+
+	// Previously "Queued" container should now be in "Complete" state
+	var container Container
+	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
+	c.Check(err, IsNil)
+	c.Check(container.State, Equals, "Complete")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
+
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] =
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
+		arvadostest.StubResponse{500, string(`{}`)}
+
+	testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+}
+
+func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] =
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
+		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+
+	testWithServerStub(c, apiStubResponses, "echo",
+		"After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+}
+
+func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] =
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
+		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
+
+	testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+}
+
+func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+	apiStub := arvadostest.ServerStub{apiStubResponses}
+
+	api := httptest.NewServer(&apiStub)
+	defer api.Close()
+
+	arv = arvadosclient.ArvadosClient{
+		Scheme:    "http",
+		ApiServer: api.URL[7:],
+		ApiToken:  "abc123",
+		Client:    &http.Client{Transport: &http.Transport{}},
+		Retries:   0,
+	}
+
+	tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
+	c.Check(err, IsNil)
+	defer os.Remove(tempfile.Name())
+	log.SetOutput(tempfile)
+
+	go func() {
+		time.Sleep(1 * time.Second)
+		doneProcessing <- true
+	}()
+
+	runQueuedContainers(1, 1, crunchCmd)
+
+	// Give some time for run goroutine to complete
+	time.Sleep(5 * time.Second)
+
+	buf, _ := ioutil.ReadFile(tempfile.Name())
+	c.Check(strings.Contains(string(buf), expected), Equals, true)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list