[ARVADOS] created: 683f5374b0fc516579c1d6dc3379fc900d642322
Git user
git at public.curoverse.com
Thu Jan 26 00:36:17 EST 2017
at 683f5374b0fc516579c1d6dc3379fc900d642322 (commit)
commit 683f5374b0fc516579c1d6dc3379fc900d642322
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jan 25 16:37:26 2017 -0500
10703: Un-pyramid run().
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 19ab5aa..8e61462 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -250,29 +250,30 @@ func run(dispatcher *dispatch.Dispatcher,
go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
for container = range status {
- if container.State == dispatch.Locked || container.State == dispatch.Running {
- if container.Priority == 0 {
- log.Printf("Canceling container %s", container.UUID)
-
- // Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.SlurmLock.Lock()
- cmd := scancelCmd(container)
- msg, err := cmd.CombinedOutput()
- squeueUpdater.SlurmLock.Unlock()
-
- if err != nil {
- log.Printf("Error stopping container %s with %v %v: %v %v",
- container.UUID, cmd.Path, cmd.Args, err, string(msg))
- if squeueUpdater.CheckSqueue(container.UUID) {
- log.Printf("Container %s is still in squeue after scancel.",
- container.UUID)
- continue
- }
- }
-
- err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+ if !(container.State == dispatch.Locked || container.State == dispatch.Running) {
+ continue
+ }
+ if container.Priority != 0 {
+ continue
+ }
+ log.Printf("Canceling container %s", container.UUID)
+
+ // Mutex between squeue sync and running sbatch or scancel.
+ squeueUpdater.SlurmLock.Lock()
+ cmd := scancelCmd(container)
+ msg, err := cmd.CombinedOutput()
+ squeueUpdater.SlurmLock.Unlock()
+
+ if err != nil {
+ log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+ if squeueUpdater.CheckSqueue(container.UUID) {
+ log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+ continue
}
}
+
+ // Ignore errors; if necessary, we'll try again next time
+ dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
}
monitorDone = true
}
commit 318dd887d108e0664ef22a8d38a34fa18e1b2657
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jan 25 16:04:30 2017 -0500
10701: Remove unneeded complexity in squeue invocation.
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index ae70800..19ab5aa 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,6 +3,7 @@ package main
// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
+ "bytes"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -10,8 +11,6 @@ import (
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"github.com/coreos/go-systemd/daemon"
- "io"
- "io/ioutil"
"log"
"math"
"os"
@@ -154,70 +153,31 @@ func submit(dispatcher *dispatch.Dispatcher,
}
}()
- // Create the command and attach to stdin/stdout
cmd := sbatchCmd(container)
- stdinWriter, stdinerr := cmd.StdinPipe()
- if stdinerr != nil {
- submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
- return
- }
- stdoutReader, stdoutErr := cmd.StdoutPipe()
- if stdoutErr != nil {
- submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
- return
- }
+ // Send a tiny script on stdin to execute the crunch-run
+ // command (slurm requires this to be a #! script)
+ cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
- stderrReader, stderrErr := cmd.StderrPipe()
- if stderrErr != nil {
- submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
- return
- }
+ var stdout, stderr bytes.Buffer
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
// Mutex between squeue sync and running sbatch or scancel.
squeueUpdater.SlurmLock.Lock()
defer squeueUpdater.SlurmLock.Unlock()
- log.Printf("sbatch starting: %+q", cmd.Args)
- err := cmd.Start()
- if err != nil {
- submitErr = fmt.Errorf("Error starting sbatch: %v", err)
- return
+ log.Printf("exec sbatch %+q", cmd.Args)
+ err := cmd.Run()
+ switch err.(type) {
+ case nil:
+ log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
+ return nil
+ case *exec.ExitError:
+ return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr)
+ default:
+ return fmt.Errorf("exec failed: %v", err)
}
-
- stdoutChan := make(chan []byte)
- go func() {
- b, _ := ioutil.ReadAll(stdoutReader)
- stdoutReader.Close()
- stdoutChan <- b
- close(stdoutChan)
- }()
-
- stderrChan := make(chan []byte)
- go func() {
- b, _ := ioutil.ReadAll(stderrReader)
- stderrReader.Close()
- stderrChan <- b
- close(stderrChan)
- }()
-
- // Send a tiny script on stdin to execute the crunch-run command
- // slurm actually enforces that this must be a #! script
- io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
- stdinWriter.Close()
-
- stdoutMsg := <-stdoutChan
- stderrmsg := <-stderrChan
-
- err = cmd.Wait()
-
- if err != nil {
- submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
- return
- }
-
- log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
- return
}
// If the container is marked as Locked, check if it is already in the slurm
commit e356309e05714cd65d88456c563cea606f820394
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jan 25 10:59:48 2017 -0500
10703: Do not catch signals in crunch-dispatch-slurm. Simplify "stop dispatcher loop" API.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 4987c01..4129b24 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -7,10 +7,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
- "os"
- "os/signal"
"sync"
- "syscall"
"time"
)
@@ -44,13 +41,11 @@ type Dispatcher struct {
// Amount of time to wait between polling for updates.
PollInterval time.Duration
- // Channel used to signal that RunDispatcher loop should exit.
- DoneProcessing chan struct{}
+ mineMutex sync.Mutex
+ mineMap map[string]chan arvados.Container
+ Auth arvados.APIClientAuthorization
- mineMutex sync.Mutex
- mineMap map[string]chan arvados.Container
- Auth arvados.APIClientAuthorization
- containers chan arvados.Container
+ stop chan struct{}
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
@@ -110,12 +105,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
}
for _, container := range containers.Items {
touched[container.UUID] = true
- dispatcher.containers <- container
+ dispatcher.handleUpdate(container)
}
}
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
ticker := time.NewTicker(dispatcher.PollInterval)
+ defer ticker.Stop()
paramsQ := arvadosclient.Dict{
"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
@@ -126,26 +122,24 @@ func (dispatcher *Dispatcher) pollContainers() {
"limit": "1000"}
for {
+ touched := make(map[string]bool)
+ dispatcher.getContainers(paramsQ, touched)
+ dispatcher.getContainers(paramsP, touched)
+ dispatcher.mineMutex.Lock()
+ var monitored []string
+ for k := range dispatcher.mineMap {
+ if _, ok := touched[k]; !ok {
+ monitored = append(monitored, k)
+ }
+ }
+ dispatcher.mineMutex.Unlock()
+ if monitored != nil {
+ dispatcher.getContainers(arvadosclient.Dict{
+ "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+ }
select {
case <-ticker.C:
- touched := make(map[string]bool)
- dispatcher.getContainers(paramsQ, touched)
- dispatcher.getContainers(paramsP, touched)
- dispatcher.mineMutex.Lock()
- var monitored []string
- for k := range dispatcher.mineMap {
- if _, ok := touched[k]; !ok {
- monitored = append(monitored, k)
- }
- }
- dispatcher.mineMutex.Unlock()
- if monitored != nil {
- dispatcher.getContainers(arvadosclient.Dict{
- "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
- }
- case <-dispatcher.DoneProcessing:
- close(dispatcher.containers)
- ticker.Stop()
+ case <-stop:
return
}
}
@@ -221,10 +215,18 @@ func (dispatcher *Dispatcher) Unlock(uuid string) error {
return err
}
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel. It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+ if dispatcher.stop == nil {
+ // already stopped
+ return
+ }
+ close(dispatcher.stop)
+ dispatcher.stop = nil
+}
+
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
if err != nil {
log.Printf("Error getting my token UUID: %v", err)
@@ -232,26 +234,7 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
}
dispatcher.mineMap = make(map[string]chan arvados.Container)
- dispatcher.containers = make(chan arvados.Container)
-
- // Graceful shutdown on signal
- sigChan := make(chan os.Signal)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- dispatcher.DoneProcessing <- struct{}{}
- }
- }(sigChan)
-
- defer close(sigChan)
- defer signal.Stop(sigChan)
-
- go dispatcher.pollContainers()
- for container := range dispatcher.containers {
- dispatcher.handleUpdate(container)
- }
-
+ dispatcher.stop = make(chan struct{})
+ dispatcher.pollContainers(dispatcher.stop)
return nil
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 0ca7651..cfb0c7d 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -10,7 +10,9 @@ import (
"log"
"os"
"os/exec"
+ "os/signal"
"sync"
+ "syscall"
"time"
)
@@ -54,16 +56,24 @@ func doMain() error {
arv.Retries = 25
dispatcher := dispatch.Dispatcher{
- Arv: arv,
- RunContainer: run,
- PollInterval: time.Duration(*pollInterval) * time.Second,
- DoneProcessing: make(chan struct{})}
+ Arv: arv,
+ RunContainer: run,
+ PollInterval: time.Duration(*pollInterval) * time.Second,
+ }
- err = dispatcher.RunDispatcher()
+ err = dispatcher.Run()
if err != nil {
return err
}
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
+ sig := <-c
+ log.Printf("Received %s, shutting down", sig)
+ signal.Stop(c)
+
+ dispatcher.Stop()
+
runningCmdsMutex.Lock()
// Finished dispatching; interrupt any crunch jobs that are still running
for _, cmd := range runningCmds {
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index bcb406e..0454730 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -62,7 +62,6 @@ func (s *TestSuite) TestIntegration(c *C) {
echo := "echo"
crunchRunCommand = &echo
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Second,
@@ -70,9 +69,9 @@ func (s *TestSuite) TestIntegration(c *C) {
container arvados.Container,
status chan arvados.Container) {
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
@@ -80,7 +79,7 @@ func (s *TestSuite) TestIntegration(c *C) {
return cmd.Start()
}
- err = dispatcher.RunDispatcher()
+ err = dispatcher.Run()
c.Assert(err, IsNil)
// Wait for all running crunch jobs to complete / terminate
@@ -166,7 +165,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
*crunchRunCommand = crunchCmd
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
@@ -174,9 +172,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
container arvados.Container,
status chan arvados.Container) {
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
@@ -188,10 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- dispatcher.DoneProcessing <- struct{}{}
+ dispatcher.Stop()
}()
- err := dispatcher.RunDispatcher()
+ err := dispatcher.Run()
c.Assert(err, IsNil)
// Wait for all running crunch jobs to complete / terminate
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index e768b50..ae70800 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -100,21 +100,16 @@ func doMain() error {
defer squeueUpdater.Done()
dispatcher := dispatch.Dispatcher{
- Arv: arv,
- RunContainer: run,
- PollInterval: time.Duration(theConfig.PollPeriod),
- DoneProcessing: make(chan struct{})}
+ Arv: arv,
+ RunContainer: run,
+ PollInterval: time.Duration(theConfig.PollPeriod),
+ }
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
- err = dispatcher.RunDispatcher()
- if err != nil {
- return err
- }
-
- return nil
+ return dispatcher.Run()
}
// sbatchCmd
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 4046103..86844dc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -146,7 +146,6 @@ func (s *TestSuite) integrationTest(c *C,
theConfig.CrunchRunCommand = []string{"echo"}
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
@@ -155,13 +154,13 @@ func (s *TestSuite) integrationTest(c *C,
status chan arvados.Container) {
go runContainer(dispatcher, container)
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
- err = dispatcher.RunDispatcher()
+ err = dispatcher.Run()
c.Assert(err, IsNil)
squeueUpdater.Done()
@@ -208,7 +207,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
theConfig.CrunchRunCommand = []string{crunchCmd}
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
@@ -221,18 +219,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}()
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
go func() {
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- dispatcher.DoneProcessing <- struct{}{}
+ dispatcher.Stop()
}()
- err := dispatcher.RunDispatcher()
+ err := dispatcher.Run()
c.Assert(err, IsNil)
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list