[ARVADOS] updated: 1032d5957a67aa706079917f3c20bdb1a5c91ad0
Git user
git at public.curoverse.com
Tue May 24 11:33:16 EDT 2016
Summary of changes:
sdk/go/dispatch/dispatch.go | 17 ++--
.../crunch-dispatch-local/crunch-dispatch-local.go | 36 +++++----
.../crunch-dispatch-local_test.go | 90 +++++++++++++++-------
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 21 +++--
.../crunch-dispatch-slurm_test.go | 63 +++++++++------
5 files changed, 144 insertions(+), 83 deletions(-)
via 1032d5957a67aa706079917f3c20bdb1a5c91ad0 (commit)
via eb87446c82c8bda882199d577cdcea6c4f79ecdf (commit)
from ce6ffc733c3d8a4637066a90df90d8ffa5d67116 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1032d5957a67aa706079917f3c20bdb1a5c91ad0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon May 23 17:53:31 2016 -0400
9187: Slurm tests wip
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index cc221e4..9a727bc 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -158,7 +158,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
*crunchRunCommand = crunchCmd
- var dispatcher *dispatch.DispatcherState
dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
run,
func(dispatcher *dispatch.DispatcherState,
@@ -176,7 +175,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
}
go func() {
- time.Sleep(2 * time.Second)
+ for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+ time.Sleep(100 * time.Millisecond)
+ }
dispatcher.DoneProcessing <- struct{}{}
}()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index a763c36..649b784 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -40,7 +40,7 @@ func doMain() error {
"/usr/bin/crunch-run",
"Crunch command to run container")
- finishCommand := flags.String(
+ finishCommand = flags.String(
"finish-command",
"/usr/bin/crunch-finish-slurm.sh",
"Command to run from strigger when job is finished")
@@ -189,20 +189,19 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arva
// Run a queued container: [1] Set container state to locked. [2]
// Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(crunchRunCommand, finishCommand string,
- dispatcher *dispatch.DispatcherState,
+func run(dispatcher *dispatch.DispatcherState,
container dispatch.Container,
status chan dispatch.Container) {
log.Printf("About to submit queued container %v", container.UUID)
- jobid, err := submit(dispatcher, container, crunchRunCommand)
+ jobid, err := submit(dispatcher, container, *crunchRunCommand)
if err != nil {
log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
return
}
- finalizeRecordOnFinish(jobid, container.UUID, finishCommand, dispatcher.Arv)
+ finalizeRecordOnFinish(jobid, container.UUID, *finishCommand, dispatcher.Arv)
log.Printf("Submitted container %v to slurm", container.UUID)
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 3dfb7d5..3658737 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -3,6 +3,7 @@ package main
import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"bytes"
"fmt"
@@ -14,7 +15,6 @@ import (
"os/exec"
"strconv"
"strings"
- "syscall"
"testing"
"time"
@@ -47,11 +47,6 @@ func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-slurm"}
os.Args = args
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- c.Fatalf("Error making arvados client: %s", err)
- }
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
@@ -64,18 +59,18 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) Test_doMain(c *C) {
- args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
- os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
var sbatchCmdLine []string
var striggerCmdLine []string
// Override sbatchCmd
- defer func(orig func(Container) *exec.Cmd) {
+ defer func(orig func(dispatch.Container) *exec.Cmd) {
sbatchCmd = orig
}(sbatchCmd)
- sbatchCmd = func(container Container) *exec.Cmd {
+ sbatchCmd = func(container dispatch.Container) *exec.Cmd {
sbatchCmdLine = sbatchFunc(container).Args
return exec.Command("sh")
}
@@ -100,22 +95,30 @@ func (s *TestSuite) Test_doMain(c *C) {
return exec.Command("echo", "strigger")
}
- go func() {
- time.Sleep(8 * time.Second)
- sigChan <- syscall.SIGINT
- }()
-
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers ContainerList
- err := arv.List("containers", params, &containers)
+ var containers dispatch.ContainerList
+ err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
- err = doMain()
- c.Check(err, IsNil)
+ echo := "echo"
+ crunchRunCommand = &echo
+
+ var dispatcher *dispatch.DispatcherState
+ dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+ run,
+ func(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ waitContainer(dispatcher, container, status)
+ dispatcher.DoneProcessing <- struct{}{}
+ })
+ c.Assert(err, IsNil)
+
+ dispatcher.RunDispatcher()
item := containers.Items[0]
sbatchCmdComps := []string{"sbatch", "--share", "--parsable",
@@ -133,7 +136,7 @@ func (s *TestSuite) Test_doMain(c *C) {
c.Check(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container Container
+ var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
c.Check(container.State, Equals, "Complete")
@@ -153,7 +156,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
api := httptest.NewServer(&apiStub)
defer api.Close()
- arv = arvadosclient.ArvadosClient{
+ arv := arvadosclient.ArvadosClient{
Scheme: "http",
ApiServer: api.URL[7:],
ApiToken: "abc123",
@@ -165,14 +168,26 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
log.SetOutput(buf)
defer log.SetOutput(os.Stderr)
+ *crunchRunCommand = crunchCmd
+
+ dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+ run,
+ func(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ waitContainer(dispatcher, container, status)
+ dispatcher.DoneProcessing <- struct{}{}
+ })
+ c.Assert(err, IsNil)
+
go func() {
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- sigChan <- syscall.SIGTERM
+ dispatcher.DoneProcessing <- struct{}{}
}()
- runQueuedContainers(2, 1, crunchCmd, crunchCmd)
+ dispatcher.RunDispatcher()
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
commit eb87446c82c8bda882199d577cdcea6c4f79ecdf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon May 23 17:30:56 2016 -0400
9187: Tests pass for local dispatcher
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index cc9b20d..926a550 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -40,7 +40,7 @@ type DispatcherState struct {
Arv arvadosclient.ArvadosClient
auth apiClientAuthorization
containers chan []Container
- doneProcessing chan struct{}
+ DoneProcessing chan struct{}
runContainer func(*DispatcherState, Container, chan Container)
waitContainer func(*DispatcherState, Container, chan Container)
}
@@ -87,11 +87,11 @@ func (dispatcher *DispatcherState) pollContainers() {
paramsQ := arvadosclient.Dict{
"filters": [][]interface{}{{"state", "=", "Queued"},
- {"priority", ">", 0}},
+ {"priority", ">", "0"}},
"order": []string{"priority desc"}}
paramsP := arvadosclient.Dict{
"filters": [][]interface{}{{"state", "in", []string{"Locked", "Running"}},
- {"LockedByUUID", "=", dispatcher.auth.UUID}}}
+ {"locked_by_uuid", "=", dispatcher.auth.UUID}}}
for {
select {
@@ -114,7 +114,7 @@ func (dispatcher *DispatcherState) pollContainers() {
dispatcher.containers <- containers.Items
}
}
- case <-dispatcher.doneProcessing:
+ case <-dispatcher.DoneProcessing:
close(dispatcher.containers)
ticker.Stop()
return
@@ -149,6 +149,7 @@ func (dispatcher *DispatcherState) handleContainers() {
go func(c Container, ch chan Container) {
defer dispatcher.notMine(c.UUID)
dispatcher.runContainer(dispatcher, c, ch)
+ dispatcher.waitContainer(dispatcher, c, ch)
}(container, dispatcher.setMine(container.UUID))
}
}
@@ -169,13 +170,17 @@ func (dispatcher *DispatcherState) RunDispatcher() {
// Graceful shutdown on signal
sigChan := make(chan os.Signal)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+
go func(sig <-chan os.Signal) {
for sig := range sig {
log.Printf("Caught signal: %v", sig)
- dispatcher.doneProcessing <- struct{}{}
+ dispatcher.DoneProcessing <- struct{}{}
}
}(sigChan)
+ defer close(sigChan)
+ defer signal.Stop(sigChan)
+
go dispatcher.pollContainers()
dispatcher.handleContainers()
}
@@ -198,7 +203,7 @@ func MakeDispatcher(arv arvadosclient.ArvadosClient,
dispatcher.runContainer = runContainer
dispatcher.waitContainer = waitContainer
dispatcher.mineMap = make(map[string]chan Container)
- dispatcher.doneProcessing = make(chan struct{})
+ dispatcher.DoneProcessing = make(chan struct{})
dispatcher.containers = make(chan []Container)
return dispatcher, nil
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index f9596f1..1f19a29 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -62,7 +62,7 @@ func doMain() error {
for _, cmd := range runningCmds {
cmd.Process.Signal(os.Interrupt)
}
- defer runningCmdsMutex.Unlock()
+ runningCmdsMutex.Unlock()
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
@@ -70,6 +70,12 @@ func doMain() error {
return nil
}
+func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+ return cmd.Start()
+}
+
+var startCmd = startFunc
+
// Run queued container:
// Set container state to Locked
// Run container using the given crunch-run command
@@ -89,29 +95,24 @@ func run(dispatcher *dispatch.DispatcherState,
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
+ log.Printf("Starting container %v", uuid)
+
// Add this crunch job to the list of runningCmds only if we
// succeed in starting crunch-run.
runningCmdsMutex.Lock()
- if err := cmd.Start(); err != nil {
+ if err := startCmd(container, cmd); err != nil {
runningCmdsMutex.Unlock()
log.Printf("Error starting crunch-run for %v: %q", uuid, err)
dispatcher.UpdateState(uuid, "Cancelled")
+ go func() {
+ for _ = range status {
+ }
+ }()
return
}
runningCmds[uuid] = cmd
runningCmdsMutex.Unlock()
- defer func() {
- setFinalState(dispatcher, container, status)
-
- // Remove the crunch job from runningCmds
- runningCmdsMutex.Lock()
- delete(runningCmds, uuid)
- runningCmdsMutex.Unlock()
- }()
-
- log.Printf("Starting container %v", uuid)
-
// Interrupt the child process if priority changes to 0
go func() {
for c := range status {
@@ -152,4 +153,13 @@ func setFinalState(dispatcher *dispatch.DispatcherState,
log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
dispatcher.UpdateState(uuid, newState)
}
+
+ // Remove the crunch job from runningCmds
+ runningCmdsMutex.Lock()
+ if _, ok := runningCmds[uuid]; ok {
+ delete(runningCmds, uuid)
+ }
+ runningCmdsMutex.Unlock()
+
+ log.Printf("Finalized container %v", uuid)
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index e3ab3a4..cc221e4 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -1,19 +1,19 @@
package main
import (
+ "bytes"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-
- "bytes"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ . "gopkg.in/check.v1"
+ "io"
"log"
"net/http"
"net/http/httptest"
"os"
- "syscall"
+ "os/exec"
"testing"
"time"
-
- . "gopkg.in/check.v1"
)
// Gocheck boilerplate
@@ -32,6 +32,7 @@ var initialArgs []string
func (s *TestSuite) SetUpSuite(c *C) {
initialArgs = os.Args
arvadostest.StartAPI()
+ runningCmds = make(map[string]*exec.Cmd)
}
func (s *TestSuite) TearDownSuite(c *C) {
@@ -41,12 +42,6 @@ func (s *TestSuite) TearDownSuite(c *C) {
func (s *TestSuite) SetUpTest(c *C) {
args := []string{"crunch-dispatch-local"}
os.Args = args
-
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- c.Fatalf("Error making arvados client: %s", err)
- }
}
func (s *TestSuite) TearDownTest(c *C) {
@@ -58,29 +53,45 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
}
-func (s *TestSuite) Test_doMain(c *C) {
- args := []string{"-poll-interval", "2", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
- os.Args = append(os.Args, args...)
+func (s *TestSuite) TestIntegration(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
+
+ echo := "echo"
+ crunchRunCommand = &echo
+
+ dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+ run,
+ func(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ setFinalState(dispatcher, container, status)
+ dispatcher.DoneProcessing <- struct{}{}
+ })
+ c.Assert(err, IsNil)
+
+ startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ dispatcher.UpdateState(container.UUID, "Running")
+ dispatcher.UpdateState(container.UUID, "Complete")
+ return cmd.Start()
+ }
- go func() {
- time.Sleep(5 * time.Second)
- sigChan <- syscall.SIGINT
- }()
+ dispatcher.RunDispatcher()
- err := doMain()
- c.Check(err, IsNil)
+ // Wait for all running crunch jobs to complete / terminate
+ waitGroup.Wait()
// There should be no queued containers now
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers ContainerList
+ var containers dispatch.ContainerList
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container Container
+ var container dispatch.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
c.Check(container.State, Equals, "Complete")
@@ -90,7 +101,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+ testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
@@ -125,12 +136,15 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
}
func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+ apiStubResponses["/arvados/v1/api_client_authorizations/current"] =
+ arvadostest.StubResponse{200, string(`{"uuid": "abc", "api_token": "xyz"}`)}
+
apiStub := arvadostest.ServerStub{apiStubResponses}
api := httptest.NewServer(&apiStub)
defer api.Close()
- arv = arvadosclient.ArvadosClient{
+ arv := arvadosclient.ArvadosClient{
Scheme: "http",
ApiServer: api.URL[7:],
ApiToken: "abc123",
@@ -139,15 +153,34 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(buf)
+ log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
+ *crunchRunCommand = crunchCmd
+
+ var dispatcher *dispatch.DispatcherState
+ dispatcher, err := dispatch.MakeDispatcher(arv, time.Duration(1)*time.Second,
+ run,
+ func(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ setFinalState(dispatcher, container, status)
+ dispatcher.DoneProcessing <- struct{}{}
+ })
+ c.Assert(err, IsNil)
+
+ startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ dispatcher.UpdateState(container.UUID, "Running")
+ dispatcher.UpdateState(container.UUID, "Complete")
+ return cmd.Start()
+ }
+
go func() {
time.Sleep(2 * time.Second)
- sigChan <- syscall.SIGTERM
+ dispatcher.DoneProcessing <- struct{}{}
}()
- runQueuedContainers(time.Second, time.Second, crunchCmd)
+ dispatcher.RunDispatcher()
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 254f862..a763c36 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -22,7 +22,10 @@ func main() {
}
}
-var crunchRunCommand *string
+var (
+ crunchRunCommand *string
+ finishCommand *string
+)
func doMain() error {
flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
@@ -53,11 +56,7 @@ func doMain() error {
var dispatcher *dispatch.DispatcherState
dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
- func(dispatcher *dispatch.DispatcherState,
- container dispatch.Container,
- status chan dispatch.Container) {
- run(*crunchRunCommand, *finishCommand, dispatcher, container, status)
- }, waitContainer)
+ run, waitContainer)
if err != nil {
return err
}
@@ -206,7 +205,6 @@ func run(crunchRunCommand, finishCommand string,
finalizeRecordOnFinish(jobid, container.UUID, finishCommand, dispatcher.Arv)
log.Printf("Submitted container %v to slurm", container.UUID)
- waitContainer(dispatcher, container, status)
}
// Wait for a container to finish. Cancel the slurm job if the
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list