[ARVADOS] updated: c62ae0ad70080b1217dc478b4921441013d21db4

git at public.curoverse.com git at public.curoverse.com
Wed Jan 20 12:37:54 EST 2016


Summary of changes:
 services/api/test/fixtures/containers.yml          |  25 +++
 services/crunch-dispatch-local/.gitignore          |   1 +
 .../crunch-dispatch-local/crunch-dispatch-local.go | 218 +++++++++++++++++++++
 .../crunch-dispatch-local_test.go                  | 159 +++++++++++++++
 4 files changed, 403 insertions(+)
 create mode 100644 services/api/test/fixtures/containers.yml
 create mode 100644 services/crunch-dispatch-local/.gitignore
 create mode 100644 services/crunch-dispatch-local/crunch-dispatch-local.go
 create mode 100644 services/crunch-dispatch-local/crunch-dispatch-local_test.go

       via  c62ae0ad70080b1217dc478b4921441013d21db4 (commit)
       via  6c3a694c75b069aa7f1792ce28c18c0471d981c2 (commit)
       via  ec3deb3aee4bab253882ab7d83c062a208a72644 (commit)
       via  c95847885fd858916dac6d8a7c4f0b364f9db340 (commit)
       via  f87bda0e69ec38a7485f250328c32643c5587fd8 (commit)
       via  4c1281bba5d3e01d677165b6e2fa7d9209e233b5 (commit)
       via  179d0ce2912bde1f88e22087386842adcfe361ac (commit)
       via  e4b791cbe3536f08fbb5df10cf8de11c2d816e04 (commit)
      from  bc9845761c44beecdd046620694f6d88af0e32fd (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c62ae0ad70080b1217dc478b4921441013d21db4
Merge: bc98457 6c3a694
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 20 12:37:34 2016 -0500

    closes #8028
    Merge branch '8028-crunch-dispatch-local'


commit 6c3a694c75b069aa7f1792ce28c18c0471d981c2
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 20 12:33:16 2016 -0500

    8028: Update the stub based test to use waitGroup.Wait() instead of sleep.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 0255831..be1fef8 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -176,19 +176,15 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 	// A goroutine to terminate the runner if container priority becomes zero
 	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
 	go func() {
-		for {
-			select {
-			case <-priorityTicker.C:
-				var container Container
-				err := arv.Get("containers", uuid, nil, &container)
-				if err != nil {
-					log.Printf("Error getting container info for %v: %q", uuid, err)
-				} else {
-					if container.Priority == 0 {
-						priorityTicker.Stop()
-						cmd.Process.Signal(os.Interrupt)
-						return
-					}
+		for _ = range priorityTicker.C {
+			var container Container
+			err := arv.Get("containers", uuid, nil, &container)
+			if err != nil {
+				log.Printf("Error getting container info for %v: %q", uuid, err)
+			} else {
+				if container.Priority == 0 {
+					priorityTicker.Stop()
+					cmd.Process.Signal(os.Interrupt)
 				}
 			}
 		}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index ef8cb83..3ec1e2e 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -145,14 +145,14 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	log.SetOutput(tempfile)
 
 	go func() {
-		time.Sleep(1 * time.Second)
+		time.Sleep(2 * time.Second)
 		sigChan <- syscall.SIGTERM
 	}()
 
 	runQueuedContainers(1, 1, crunchCmd)
 
-	// Give some time for run goroutine to complete
-	time.Sleep(5 * time.Second)
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
 
 	buf, _ := ioutil.ReadFile(tempfile.Name())
 	c.Check(strings.Contains(string(buf), expected), Equals, true)

commit ec3deb3aee4bab253882ab7d83c062a208a72644
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 20 12:05:59 2016 -0500

    8028: In runningCmds loop, no need to wait for process termination.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 47c3564..0255831 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -75,13 +75,8 @@ func doMain() error {
 	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
 
 	// Finished dispatching; interrupt any crunch jobs that are still running
-	for uuid, cmd := range runningCmds {
-		go func(uuid string) {
-			cmd.Process.Signal(os.Interrupt)
-			if _, err := cmd.Process.Wait(); err != nil {
-				log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
-			}
-		}(uuid)
+	for _, cmd := range runningCmds {
+		cmd.Process.Signal(os.Interrupt)
 	}
 
 	// Wait for all running crunch jobs to complete / terminate
@@ -192,9 +187,6 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 					if container.Priority == 0 {
 						priorityTicker.Stop()
 						cmd.Process.Signal(os.Interrupt)
-						runningCmdsMutex.Lock()
-						delete(runningCmds, uuid)
-						runningCmdsMutex.Unlock()
 						return
 					}
 				}

commit c95847885fd858916dac6d8a7c4f0b364f9db340
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 20 11:40:30 2016 -0500

    8028: add command to waitGroup during run method itself; not during signal handling.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index eb8550e..47c3564 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -19,8 +19,14 @@ func main() {
 	}
 }
 
-var arv arvadosclient.ArvadosClient
-var runningCmds map[string]*exec.Cmd
+var (
+	arv              arvadosclient.ArvadosClient
+	runningCmds      map[string]*exec.Cmd
+	runningCmdsMutex sync.Mutex
+	waitGroup        sync.WaitGroup
+	doneProcessing   chan bool
+	sigChan          chan os.Signal
+)
 
 func doMain() error {
 	flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
@@ -49,44 +55,46 @@ func doMain() error {
 		return err
 	}
 
+	// Channel to terminate
+	doneProcessing = make(chan bool)
+
+	// Map of running crunch jobs
 	runningCmds = make(map[string]*exec.Cmd)
+
+	// Graceful shutdown
 	sigChan = make(chan os.Signal, 1)
 	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 	go func(sig <-chan os.Signal) {
-		var wg sync.WaitGroup
 		for sig := range sig {
+			log.Printf("Caught signal: %v", sig)
 			doneProcessing <- true
-			caught := sig
-			for uuid, cmd := range runningCmds {
-				go func(uuid string) {
-					wg.Add(1)
-					defer wg.Done()
-					cmd.Process.Signal(caught)
-					if _, err := cmd.Process.Wait(); err != nil {
-						log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
-					}
-				}(uuid)
-			}
 		}
-		wg.Wait()
 	}(sigChan)
 
-	// channel to terminate
-	doneProcessing = make(chan bool)
-
-	// run all queued containers
+	// Run all queued containers
 	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+
+	// Finished dispatching; interrupt any crunch jobs that are still running
+	for uuid, cmd := range runningCmds {
+		go func(uuid string) {
+			cmd.Process.Signal(os.Interrupt)
+			if _, err := cmd.Process.Wait(); err != nil {
+				log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+			}
+		}(uuid)
+	}
+
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
+
 	return nil
 }
 
-var doneProcessing chan bool
-var sigChan chan os.Signal
-
 // Poll for queued containers using pollInterval.
 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more child processes are running,
+// This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
 	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
@@ -129,6 +137,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 
 	for i := 0; i < len(containers.Items); i++ {
 		log.Printf("About to run queued container %v", containers.Items[i].UUID)
+		// Run the container
 		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
 	}
 }
@@ -149,10 +158,18 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		return
 	}
 
+	// Add this crunch job to the list of runningCmds
+	runningCmdsMutex.Lock()
 	runningCmds[uuid] = cmd
+	runningCmdsMutex.Unlock()
 
 	log.Printf("Started container run for %v", uuid)
 
+	// Add this crunch job to waitGroup
+	waitGroup.Add(1)
+	defer waitGroup.Done()
+
+	// Update container status to Running
 	err := arv.Update("containers", uuid,
 		arvadosclient.Dict{
 			"container": arvadosclient.Dict{"state": "Running"}},
@@ -161,7 +178,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
 	}
 
-	// Terminate the runner if container priority becomes zero
+	// A goroutine to terminate the runner if container priority becomes zero
 	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
 	go func() {
 		for {
@@ -175,7 +192,9 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 					if container.Priority == 0 {
 						priorityTicker.Stop()
 						cmd.Process.Signal(os.Interrupt)
+						runningCmdsMutex.Lock()
 						delete(runningCmds, uuid)
+						runningCmdsMutex.Unlock()
 						return
 					}
 				}
@@ -183,14 +202,19 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		}
 	}()
 
-	// Wait for the process to exit
+	// Wait for the crunch job to exit
 	if _, err := cmd.Process.Wait(); err != nil {
-		log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
 	}
+
+	// Remove the crunch job to runningCmds
+	runningCmdsMutex.Lock()
 	delete(runningCmds, uuid)
+	runningCmdsMutex.Unlock()
 
 	priorityTicker.Stop()
 
+	// The container state should be 'Complete'
 	var container Container
 	err = arv.Get("containers", uuid, nil, &container)
 	if container.State == "Running" {

commit f87bda0e69ec38a7485f250328c32643c5587fd8
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 20 07:16:05 2016 -0500

    8028: when a signal is received, terminate all running commands and wait in a WaitGroup.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index e575839..eb8550e 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -7,6 +7,7 @@ import (
 	"os"
 	"os/exec"
 	"os/signal"
+	"sync"
 	"syscall"
 	"time"
 )
@@ -52,16 +53,22 @@ func doMain() error {
 	sigChan = make(chan os.Signal, 1)
 	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 	go func(sig <-chan os.Signal) {
+		var wg sync.WaitGroup
 		for sig := range sig {
 			doneProcessing <- true
 			caught := sig
 			for uuid, cmd := range runningCmds {
-				cmd.Process.Signal(caught)
-				if _, err := cmd.Process.Wait(); err != nil {
-					log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
-				}
+				go func(uuid string) {
+					wg.Add(1)
+					defer wg.Done()
+					cmd.Process.Signal(caught)
+					if _, err := cmd.Process.Wait(); err != nil {
+						log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+					}
+				}(uuid)
 			}
 		}
+		wg.Wait()
 	}(sigChan)
 
 	// channel to terminate

commit 4c1281bba5d3e01d677165b6e2fa7d9209e233b5
Author: radhika <radhika at curoverse.com>
Date:   Tue Jan 19 21:40:59 2016 -0500

    8028: add signal handling to dispatcher.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 2b7bd24..e575839 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -6,6 +6,8 @@ import (
 	"log"
 	"os"
 	"os/exec"
+	"os/signal"
+	"syscall"
 	"time"
 )
 
@@ -17,6 +19,7 @@ func main() {
 }
 
 var arv arvadosclient.ArvadosClient
+var runningCmds map[string]*exec.Cmd
 
 func doMain() error {
 	flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
@@ -45,6 +48,22 @@ func doMain() error {
 		return err
 	}
 
+	runningCmds = make(map[string]*exec.Cmd)
+	sigChan = make(chan os.Signal, 1)
+	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+	go func(sig <-chan os.Signal) {
+		for sig := range sig {
+			doneProcessing <- true
+			caught := sig
+			for uuid, cmd := range runningCmds {
+				cmd.Process.Signal(caught)
+				if _, err := cmd.Process.Wait(); err != nil {
+					log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+				}
+			}
+		}
+	}(sigChan)
+
 	// channel to terminate
 	doneProcessing = make(chan bool)
 
@@ -54,6 +73,7 @@ func doMain() error {
 }
 
 var doneProcessing chan bool
+var sigChan chan os.Signal
 
 // Poll for queued containers using pollInterval.
 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
@@ -122,6 +142,8 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		return
 	}
 
+	runningCmds[uuid] = cmd
+
 	log.Printf("Started container run for %v", uuid)
 
 	err := arv.Update("containers", uuid,
@@ -146,6 +168,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 					if container.Priority == 0 {
 						priorityTicker.Stop()
 						cmd.Process.Signal(os.Interrupt)
+						delete(runningCmds, uuid)
 						return
 					}
 				}
@@ -157,6 +180,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 	if _, err := cmd.Process.Wait(); err != nil {
 		log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
 	}
+	delete(runningCmds, uuid)
 
 	priorityTicker.Stop()
 
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 997c63a..ef8cb83 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -10,6 +10,7 @@ import (
 	"net/http/httptest"
 	"os"
 	"strings"
+	"syscall"
 	"testing"
 	"time"
 
@@ -63,16 +64,13 @@ func (s *TestSuite) Test_doMain(c *C) {
 	os.Args = append(os.Args, args...)
 
 	go func() {
-		time.Sleep(2 * time.Second)
-		doneProcessing <- true
+		time.Sleep(5 * time.Second)
+		sigChan <- syscall.SIGINT
 	}()
 
 	err := doMain()
 	c.Check(err, IsNil)
 
-	// Give some time for run goroutine to complete
-	time.Sleep(1 * time.Second)
-
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
@@ -148,7 +146,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	go func() {
 		time.Sleep(1 * time.Second)
-		doneProcessing <- true
+		sigChan <- syscall.SIGTERM
 	}()
 
 	runQueuedContainers(1, 1, crunchCmd)

commit 179d0ce2912bde1f88e22087386842adcfe361ac
Author: radhika <radhika at curoverse.com>
Date:   Tue Jan 19 12:16:51 2016 -0500

    8028: After getting list of Queued containers, instead of looking for containers.ItemsAvailable, look for len(containers.Items)

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 9fb4cb9..2b7bd24 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -84,8 +84,7 @@ type Container struct {
 
 // ContainerList is a list of the containers from api
 type ContainerList struct {
-	ItemsAvailable int         `json:"items_available"`
-	Items          []Container `json:"items"`
+	Items []Container `json:"items"`
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
@@ -101,7 +100,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 		return
 	}
 
-	for i := 0; i < containers.ItemsAvailable; i++ {
+	for i := 0; i < len(containers.Items); i++ {
 		log.Printf("About to run queued container %v", containers.Items[i].UUID)
 		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
 	}
@@ -113,7 +112,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
-	cmd := exec.Command(crunchRunCommand, "--job", uuid)
+	cmd := exec.Command(crunchRunCommand, uuid)
 
 	cmd.Stdin = nil
 	cmd.Stderr = os.Stderr
@@ -146,7 +145,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 				} else {
 					if container.Priority == 0 {
 						priorityTicker.Stop()
-						cmd.Process.Kill()
+						cmd.Process.Signal(os.Interrupt)
 						return
 					}
 				}
@@ -168,7 +167,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		err = arv.Update("containers", uuid,
 			arvadosclient.Dict{
 				"container": arvadosclient.Dict{"state": "Complete"}},
-			&container)
+			nil)
 		if err != nil {
 			log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
 		}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 1d526b9..997c63a 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -59,7 +59,7 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 }
 
 func (s *TestSuite) Test_doMain(c *C) {
-	args := []string{"-poll-interval", "1", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
+	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
 	os.Args = append(os.Args, args...)
 
 	go func() {
@@ -70,6 +70,9 @@ func (s *TestSuite) Test_doMain(c *C) {
 	err := doMain()
 	c.Check(err, IsNil)
 
+	// Give some time for run goroutine to complete
+	time.Sleep(1 * time.Second)
+
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
@@ -77,7 +80,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 	var containers ContainerList
 	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
-	c.Assert(containers.ItemsAvailable, Equals, 0)
+	c.Assert(len(containers.Items), Equals, 0)
 
 	// Previously "Queued" container should now be in "Complete" state
 	var container Container

commit e4b791cbe3536f08fbb5df10cf8de11c2d816e04
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 13 11:15:28 2016 -0500

    8028: crunch-dispatch-local implementation

diff --git a/services/api/test/fixtures/containers.yml b/services/api/test/fixtures/containers.yml
new file mode 100644
index 0000000..22004b4
--- /dev/null
+++ b/services/api/test/fixtures/containers.yml
@@ -0,0 +1,25 @@
+queued:
+  uuid: zzzzz-dz642-queuedcontainer
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  state: Queued
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  container_image: test
+  cwd: test
+  output: test
+  output_path: test
+  command: ["echo", "hello"]
+
+completed:
+  uuid: zzzzz-dz642-compltcontainer
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  state: Complete
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  container_image: test
+  cwd: test
+  output: test
+  output_path: test
+  command: ["echo", "hello"]
diff --git a/services/crunch-dispatch-local/.gitignore b/services/crunch-dispatch-local/.gitignore
new file mode 100644
index 0000000..7c1070a
--- /dev/null
+++ b/services/crunch-dispatch-local/.gitignore
@@ -0,0 +1 @@
+crunch-dispatch-local
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
new file mode 100644
index 0000000..9fb4cb9
--- /dev/null
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -0,0 +1,176 @@
+package main
+
+import (
+	"flag"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"log"
+	"os"
+	"os/exec"
+	"time"
+)
+
+func main() {
+	err := doMain()
+	if err != nil {
+		log.Fatalf("%q", err)
+	}
+}
+
+var arv arvadosclient.ArvadosClient
+
+func doMain() error {
+	flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
+
+	pollInterval := flags.Int(
+		"poll-interval",
+		10,
+		"Interval in seconds to poll for queued containers")
+
+	priorityPollInterval := flags.Int(
+		"container-priority-poll-interval",
+		60,
+		"Interval in seconds to check priority of a dispatched container")
+
+	crunchRunCommand := flags.String(
+		"crunch-run-command",
+		"/usr/bin/crunch-run",
+		"Crunch command to run container")
+
+	// Parse args; omit the first arg which is the command name
+	flags.Parse(os.Args[1:])
+
+	var err error
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		return err
+	}
+
+	// channel to terminate
+	doneProcessing = make(chan bool)
+
+	// run all queued containers
+	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+	return nil
+}
+
+var doneProcessing chan bool
+
+// Poll for queued containers using pollInterval.
+// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+//
+// Any errors encountered are logged but the program would continue to run (not exit).
+// This is because, once one or more child processes are running,
+// we would need to wait for them complete.
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+
+	for {
+		select {
+		case <-ticker.C:
+			dispatchLocal(priorityPollInterval, crunchRunCommand)
+		case <-doneProcessing:
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+// Container data
+type Container struct {
+	UUID     string `json:"uuid"`
+	State    string `json:"state"`
+	Priority int    `json:"priority"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+	ItemsAvailable int         `json:"items_available"`
+	Items          []Container `json:"items"`
+}
+
+// Get the list of queued containers from API server and invoke run for each container.
+func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+	params := arvadosclient.Dict{
+		"filters": [][]string{[]string{"state", "=", "Queued"}},
+	}
+
+	var containers ContainerList
+	err := arv.List("containers", params, &containers)
+	if err != nil {
+		log.Printf("Error getting list of queued containers: %q", err)
+		return
+	}
+
+	for i := 0; i < containers.ItemsAvailable; i++ {
+		log.Printf("About to run queued container %v", containers.Items[i].UUID)
+		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+	}
+}
+
+// Run queued container:
+// Set container state to locked (TBD)
+// Run container using the given crunch-run command
+// Set the container state to Running
+// If the container priority becomes zero while crunch job is still running, terminate it.
+func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+	cmd := exec.Command(crunchRunCommand, "--job", uuid)
+
+	cmd.Stdin = nil
+	cmd.Stderr = os.Stderr
+	cmd.Stdout = os.Stderr
+	if err := cmd.Start(); err != nil {
+		log.Printf("Error running container for %v: %q", uuid, err)
+		return
+	}
+
+	log.Printf("Started container run for %v", uuid)
+
+	err := arv.Update("containers", uuid,
+		arvadosclient.Dict{
+			"container": arvadosclient.Dict{"state": "Running"}},
+		nil)
+	if err != nil {
+		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+	}
+
+	// Terminate the runner if container priority becomes zero
+	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+	go func() {
+		for {
+			select {
+			case <-priorityTicker.C:
+				var container Container
+				err := arv.Get("containers", uuid, nil, &container)
+				if err != nil {
+					log.Printf("Error getting container info for %v: %q", uuid, err)
+				} else {
+					if container.Priority == 0 {
+						priorityTicker.Stop()
+						cmd.Process.Kill()
+						return
+					}
+				}
+			}
+		}
+	}()
+
+	// Wait for the process to exit
+	if _, err := cmd.Process.Wait(); err != nil {
+		log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+	}
+
+	priorityTicker.Stop()
+
+	var container Container
+	err = arv.Get("containers", uuid, nil, &container)
+	if container.State == "Running" {
+		log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
+		err = arv.Update("containers", uuid,
+			arvadosclient.Dict{
+				"container": arvadosclient.Dict{"state": "Complete"}},
+			&container)
+		if err != nil {
+			log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
+		}
+	}
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
new file mode 100644
index 0000000..1d526b9
--- /dev/null
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -0,0 +1,158 @@
+package main
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
+	"io/ioutil"
+	"log"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"strings"
+	"testing"
+	"time"
+
+	. "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+var _ = Suite(&TestSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
+
+type TestSuite struct{}
+type MockArvadosServerSuite struct{}
+
+var initialArgs []string
+
+func (s *TestSuite) SetUpSuite(c *C) {
+	initialArgs = os.Args
+	arvadostest.StartAPI()
+}
+
+func (s *TestSuite) TearDownSuite(c *C) {
+	arvadostest.StopAPI()
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+	args := []string{"crunch-dispatch-local"}
+	os.Args = args
+
+	var err error
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		c.Fatalf("Error making arvados client: %s", err)
+	}
+}
+
+func (s *TestSuite) TearDownTest(c *C) {
+	arvadostest.ResetEnv()
+	os.Args = initialArgs
+}
+
+func (s *MockArvadosServerSuite) TearDownTest(c *C) {
+	arvadostest.ResetEnv()
+}
+
+func (s *TestSuite) Test_doMain(c *C) {
+	args := []string{"-poll-interval", "1", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
+	os.Args = append(os.Args, args...)
+
+	go func() {
+		time.Sleep(2 * time.Second)
+		doneProcessing <- true
+	}()
+
+	err := doMain()
+	c.Check(err, IsNil)
+
+	// There should be no queued containers now
+	params := arvadosclient.Dict{
+		"filters": [][]string{[]string{"state", "=", "Queued"}},
+	}
+	var containers ContainerList
+	err = arv.List("containers", params, &containers)
+	c.Check(err, IsNil)
+	c.Assert(containers.ItemsAvailable, Equals, 0)
+
+	// Previously "Queued" container should now be in "Complete" state
+	var container Container
+	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
+	c.Check(err, IsNil)
+	c.Check(container.State, Equals, "Complete")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
+
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] =
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
+		arvadostest.StubResponse{500, string(`{}`)}
+
+	testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+}
+
+func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] =
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
+		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+
+	testWithServerStub(c, apiStubResponses, "echo",
+		"After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+}
+
+func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
+	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/containers"] =
+		arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
+		arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
+
+	testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+}
+
+func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+	apiStub := arvadostest.ServerStub{apiStubResponses}
+
+	api := httptest.NewServer(&apiStub)
+	defer api.Close()
+
+	arv = arvadosclient.ArvadosClient{
+		Scheme:    "http",
+		ApiServer: api.URL[7:],
+		ApiToken:  "abc123",
+		Client:    &http.Client{Transport: &http.Transport{}},
+		Retries:   0,
+	}
+
+	tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
+	c.Check(err, IsNil)
+	defer os.Remove(tempfile.Name())
+	log.SetOutput(tempfile)
+
+	go func() {
+		time.Sleep(1 * time.Second)
+		doneProcessing <- true
+	}()
+
+	runQueuedContainers(1, 1, crunchCmd)
+
+	// Give some time for run goroutine to complete
+	time.Sleep(5 * time.Second)
+
+	buf, _ := ioutil.ReadFile(tempfile.Name())
+	c.Check(strings.Contains(string(buf), expected), Equals, true)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list