[ARVADOS] updated: 1032d5957a67aa706079917f3c20bdb1a5c91ad0

Git user git at public.curoverse.com
Tue May 24 11:33:16 EDT 2016


Summary of changes:
 sdk/go/dispatch/dispatch.go                        | 17 ++--
 .../crunch-dispatch-local/crunch-dispatch-local.go | 36 +++++----
 .../crunch-dispatch-local_test.go                  | 90 +++++++++++++++-------
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 21 +++--
 .../crunch-dispatch-slurm_test.go                  | 63 +++++++++------
 5 files changed, 144 insertions(+), 83 deletions(-)

       via  1032d5957a67aa706079917f3c20bdb1a5c91ad0 (commit)
       via  eb87446c82c8bda882199d577cdcea6c4f79ecdf (commit)
      from  ce6ffc733c3d8a4637066a90df90d8ffa5d67116 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1032d5957a67aa706079917f3c20bdb1a5c91ad0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon May 23 17:53:31 2016 -0400

    9187: Slurm tests wip

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index cc221e4..9a727bc 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -158,7 +158,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	*crunchRunCommand = crunchCmd
 
-	var dispatcher *dispatch.DispatcherState
 	dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
 		run,
 		func(dispatcher *dispatch.DispatcherState,
@@ -176,7 +175,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	}
 
 	go func() {
-		time.Sleep(2 * time.Second)
+		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+			time.Sleep(100 * time.Millisecond)
+		}
 		dispatcher.DoneProcessing <- struct{}{}
 	}()
 
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index a763c36..649b784 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -40,7 +40,7 @@ func doMain() error {
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
 
-	finishCommand := flags.String(
+	finishCommand = flags.String(
 		"finish-command",
 		"/usr/bin/crunch-finish-slurm.sh",
 		"Command to run from strigger when job is finished")
@@ -189,20 +189,19 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arva
 
 // Run a queued container: [1] Set container state to locked. [2]
 // Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(crunchRunCommand, finishCommand string,
-	dispatcher *dispatch.DispatcherState,
+func run(dispatcher *dispatch.DispatcherState,
 	container dispatch.Container,
 	status chan dispatch.Container) {
 
 	log.Printf("About to submit queued container %v", container.UUID)
 
-	jobid, err := submit(dispatcher, container, crunchRunCommand)
+	jobid, err := submit(dispatcher, container, *crunchRunCommand)
 	if err != nil {
 		log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
 		return
 	}
 
-	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, dispatcher.Arv)
+	finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
 
 	log.Printf("Submitted container %v to slurm", container.UUID)
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3dfb7d5..3658737 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -3,6 +3,7 @@ package main
 import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 
 	"bytes"
 	"fmt"
@@ -14,7 +15,6 @@ import (
 	"os/exec"
 	"strconv"
 	"strings"
-	"syscall"
 	"testing"
 	"time"
 
@@ -47,11 +47,6 @@ func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-slurm"}
 	os.Args = args
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
-	if err != nil {
-		c.Fatalf("Error making arvados client: %s", err)
-	}
 	os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
@@ -64,18 +59,18 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-	os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
 
 	var sbatchCmdLine []string
 	var striggerCmdLine []string
 
 	// Override sbatchCmd
-	defer func(orig func(Container) *exec.Cmd) {
+	defer func(orig func(dispatch.Container) *exec.Cmd) {
 		sbatchCmd = orig
 	}(sbatchCmd)
-	sbatchCmd = func(container Container) *exec.Cmd {
+	sbatchCmd = func(container dispatch.Container) *exec.Cmd {
 		sbatchCmdLine = sbatchFunc(container).Args
 		return exec.Command("sh")
 	}
@@ -100,22 +95,30 @@ func (s *TestSuite) Test_doMain(c *C) {
 		return exec.Command("echo", "strigger")
 	}
 
-	go func() {
-		time.Sleep(8 * time.Second)
-		sigChan <- syscall.SIGINT
-	}()
-
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
+	var containers dispatch.ContainerList
+	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
 	c.Check(len(containers.Items), Equals, 1)
 
-	err = doMain()
-	c.Check(err, IsNil)
+	echo := "echo"
+	crunchRunCommand = &echo
+
+	var dispatcher *dispatch.DispatcherState
+	dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+		run,
+		func(dispatcher *dispatch.DispatcherState,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			waitContainer(dispatcher, container, status)
+			dispatcher.DoneProcessing <- struct{}{}
+		})
+	c.Assert(err, IsNil)
+
+	dispatcher.RunDispatcher()
 
 	item := containers.Items[0]
 	sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
@@ -133,7 +136,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 	c.Check(len(containers.Items), Equals, 0)
 
 	// Previously "Queued" container should now be in "Complete" state
-	var container Container
+	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
 	c.Check(container.State, Equals, "Complete")
@@ -153,7 +156,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	api := httptest.NewServer(&apiStub)
 	defer api.Close()
 
-	arv = arvadosclient.ArvadosClient{
+	arv := arvadosclient.ArvadosClient{
 		Scheme:    "http",
 		ApiServer: api.URL[7:],
 		ApiToken:  "abc123",
@@ -165,14 +168,26 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	log.SetOutput(buf)
 	defer log.SetOutput(os.Stderr)
 
+	*crunchRunCommand = crunchCmd
+
+	dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+		run,
+		func(dispatcher *dispatch.DispatcherState,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			waitContainer(dispatcher, container, status)
+			dispatcher.DoneProcessing <- struct{}{}
+		})
+	c.Assert(err, IsNil)
+
 	go func() {
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		sigChan <- syscall.SIGTERM
+		dispatcher.DoneProcessing <- struct{}{}
 	}()
 
-	runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+	dispatcher.RunDispatcher()
 
 	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }

commit eb87446c82c8bda882199d577cdcea6c4f79ecdf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon May 23 17:30:56 2016 -0400

    9187: Tests pass for local dispatcher

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index cc9b20d..926a550 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -40,7 +40,7 @@ type DispatcherState struct {
 	Arv            arvadosclient.ArvadosClient
 	auth           apiClientAuthorization
 	containers     chan []Container
-	doneProcessing chan struct{}
+	DoneProcessing chan struct{}
 	runContainer   func(*DispatcherState, Container, chan Container)
 	waitContainer  func(*DispatcherState, Container, chan Container)
 }
@@ -87,11 +87,11 @@ func (dispatcher *DispatcherState) pollContainers() {
 
 	paramsQ := arvadosclient.Dict{
 		"filters": [][]interface{}{{"state", "=", "Queued"},
-			{"priority", ">", 0}},
+			{"priority", ">", "0"}},
 		"order": []string{"priority desc"}}
 	paramsP := arvadosclient.Dict{
 		"filters": [][]interface{}{{"state", "in", []string{"Locked", "Running"}},
-			{"LockedByUUID", "=", dispatcher.auth.UUID}}}
+			{"locked_by_uuid", "=", dispatcher.auth.UUID}}}
 
 	for {
 		select {
@@ -114,7 +114,7 @@ func (dispatcher *DispatcherState) pollContainers() {
 					dispatcher.containers <- containers.Items
 				}
 			}
-		case <-dispatcher.doneProcessing:
+		case <-dispatcher.DoneProcessing:
 			close(dispatcher.containers)
 			ticker.Stop()
 			return
@@ -149,6 +149,7 @@ func (dispatcher *DispatcherState) handleContainers() {
 			go func(c Container, ch chan Container) {
 				defer dispatcher.notMine(c.UUID)
 				dispatcher.runContainer(dispatcher, c, ch)
+				dispatcher.waitContainer(dispatcher, c, ch)
 			}(container, dispatcher.setMine(container.UUID))
 		}
 	}
@@ -169,13 +170,17 @@ func (dispatcher *DispatcherState) RunDispatcher() {
 	// Graceful shutdown on signal
 	sigChan := make(chan os.Signal)
 	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
 	go func(sig <-chan os.Signal) {
 		for sig := range sig {
 			log.Printf("Caught signal: %v", sig)
-			dispatcher.doneProcessing <- struct{}{}
+			dispatcher.DoneProcessing <- struct{}{}
 		}
 	}(sigChan)
 
+	defer close(sigChan)
+	defer signal.Stop(sigChan)
+
 	go dispatcher.pollContainers()
 	dispatcher.handleContainers()
 }
@@ -198,7 +203,7 @@ func MakeDispatcher(arv arvadosclient.ArvadosClient,
 	dispatcher.runContainer = runContainer
 	dispatcher.waitContainer = waitContainer
 	dispatcher.mineMap = make(map[string]chan Container)
-	dispatcher.doneProcessing = make(chan struct{})
+	dispatcher.DoneProcessing = make(chan struct{})
 	dispatcher.containers = make(chan []Container)
 
 	return dispatcher, nil
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index f9596f1..1f19a29 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -62,7 +62,7 @@ func doMain() error {
 	for _, cmd := range runningCmds {
 		cmd.Process.Signal(os.Interrupt)
 	}
-	defer runningCmdsMutex.Unlock()
+	runningCmdsMutex.Unlock()
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
@@ -70,6 +70,12 @@ func doMain() error {
 	return nil
 }
 
+func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+	return cmd.Start()
+}
+
+var startCmd = startFunc
+
 // Run queued container:
 // Set container state to Locked
 // Run container using the given crunch-run command
@@ -89,29 +95,24 @@ func run(dispatcher *dispatch.DispatcherState,
 	cmd.Stderr = os.Stderr
 	cmd.Stdout = os.Stderr
 
+	log.Printf("Starting container %v", uuid)
+
 	// Add this crunch job to the list of runningCmds only if we
 	// succeed in starting crunch-run.
 	runningCmdsMutex.Lock()
-	if err := cmd.Start(); err != nil {
+	if err := startCmd(container, cmd); err != nil {
 		runningCmdsMutex.Unlock()
 		log.Printf("Error starting crunch-run for %v: %q", uuid, err)
 		dispatcher.UpdateState(uuid, "Cancelled")
+		go func() {
+			for _ = range status {
+			}
+		}()
 		return
 	}
 	runningCmds[uuid] = cmd
 	runningCmdsMutex.Unlock()
 
-	defer func() {
-		setFinalState(dispatcher, container, status)
-
-		// Remove the crunch job from runningCmds
-		runningCmdsMutex.Lock()
-		delete(runningCmds, uuid)
-		runningCmdsMutex.Unlock()
-	}()
-
-	log.Printf("Starting container %v", uuid)
-
 	// Interrupt the child process if priority changes to 0
 	go func() {
 		for c := range status {
@@ -152,4 +153,13 @@ func setFinalState(dispatcher *dispatch.DispatcherState,
 		log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
 		dispatcher.UpdateState(uuid, newState)
 	}
+
+	// Remove the crunch job from runningCmds
+	runningCmdsMutex.Lock()
+	if _, ok := runningCmds[uuid]; ok {
+		delete(runningCmds, uuid)
+	}
+	runningCmdsMutex.Unlock()
+
+	log.Printf("Finalized container %v", uuid)
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index e3ab3a4..cc221e4 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -1,19 +1,19 @@
 package main
 
 import (
+	"bytes"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-
-	"bytes"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
+	. "gopkg.in/check.v1"
+	"io"
 	"log"
 	"net/http"
 	"net/http/httptest"
 	"os"
-	"syscall"
+	"os/exec"
 	"testing"
 	"time"
-
-	. "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -32,6 +32,7 @@ var initialArgs []string
 func (s *TestSuite) SetUpSuite(c *C) {
 	initialArgs = os.Args
 	arvadostest.StartAPI()
+	runningCmds = make(map[string]*exec.Cmd)
 }
 
 func (s *TestSuite) TearDownSuite(c *C) {
@@ -41,12 +42,6 @@ func (s *TestSuite) TearDownSuite(c *C) {
 func (s *TestSuite) SetUpTest(c *C) {
 	args := []string{"crunch-dispatch-local"}
 	os.Args = args
-
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
-	if err != nil {
-		c.Fatalf("Error making arvados client: %s", err)
-	}
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
@@ -58,29 +53,45 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 	arvadostest.ResetEnv()
 }
 
-func (s *TestSuite) Test_doMain(c *C) {
-	args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
-	os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+	arv, err := arvadosclient.MakeArvadosClient()
+	c.Assert(err, IsNil)
+
+	echo := "echo"
+	crunchRunCommand = &echo
+
+	dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+		run,
+		func(dispatcher *dispatch.DispatcherState,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			setFinalState(dispatcher, container, status)
+			dispatcher.DoneProcessing <- struct{}{}
+		})
+	c.Assert(err, IsNil)
+
+	startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+		dispatcher.UpdateState(container.UUID, "Running")
+		dispatcher.UpdateState(container.UUID, "Complete")
+		return cmd.Start()
+	}
 
-	go func() {
-		time.Sleep(5 * time.Second)
-		sigChan <- syscall.SIGINT
-	}()
+	dispatcher.RunDispatcher()
 
-	err := doMain()
-	c.Check(err, IsNil)
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
 
 	// There should be no queued containers now
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
-	var containers ContainerList
+	var containers dispatch.ContainerList
 	err = arv.List("containers", params, &containers)
 	c.Check(err, IsNil)
 	c.Assert(len(containers.Items), Equals, 0)
 
 	// Previously "Queued" container should now be in "Complete" state
-	var container Container
+	var container dispatch.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
 	c.Check(container.State, Equals, "Complete")
@@ -90,7 +101,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
@@ -125,12 +136,15 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+	apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
+		arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+
 	apiStub := arvadostest.ServerStub{apiStubResponses}
 
 	api := httptest.NewServer(&apiStub)
 	defer api.Close()
 
-	arv = arvadosclient.ArvadosClient{
+	arv := arvadosclient.ArvadosClient{
 		Scheme:    "http",
 		ApiServer: api.URL[7:],
 		ApiToken:  "abc123",
@@ -139,15 +153,34 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	}
 
 	buf := bytes.NewBuffer(nil)
-	log.SetOutput(buf)
+	log.SetOutput(io.MultiWriter(buf, os.Stderr))
 	defer log.SetOutput(os.Stderr)
 
+	*crunchRunCommand = crunchCmd
+
+	var dispatcher *dispatch.DispatcherState
+	dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+		run,
+		func(dispatcher *dispatch.DispatcherState,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			setFinalState(dispatcher, container, status)
+			dispatcher.DoneProcessing <- struct{}{}
+		})
+	c.Assert(err, IsNil)
+
+	startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+		dispatcher.UpdateState(container.UUID, "Running")
+		dispatcher.UpdateState(container.UUID, "Complete")
+		return cmd.Start()
+	}
+
 	go func() {
 		time.Sleep(2 * time.Second)
-		sigChan <- syscall.SIGTERM
+		dispatcher.DoneProcessing <- struct{}{}
 	}()
 
-	runQueuedContainers(time.Second, time.Second, crunchCmd)
+	dispatcher.RunDispatcher()
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 254f862..a763c36 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -22,7 +22,10 @@ func main() {
 	}
 }
 
-var crunchRunCommand *string
+var (
+	crunchRunCommand *string
+	finishCommand    *string
+)
 
 func doMain() error {
 	flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
@@ -53,11 +56,7 @@ func doMain() error {
 
 	var dispatcher *dispatch.DispatcherState
 	dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
-		func(dispatcher *dispatch.DispatcherState,
-			container dispatch.Container,
-			status chan dispatch.Container) {
-			run(*crunchRunCommand, *finishCommand, dispatcher, container, status)
-		}, waitContainer)
+		run, waitContainer)
 	if err != nil {
 		return err
 	}
@@ -206,7 +205,6 @@ func run(crunchRunCommand, finishCommand string,
 	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, dispatcher.Arv)
 
 	log.Printf("Submitted container %v to slurm", container.UUID)
-	waitContainer(dispatcher, container, status)
 }
 
 // Wait for a container to finish. Cancel the slurm job if the

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list