[ARVADOS] updated: ce6ffc733c3d8a4637066a90df90d8ffa5d67116
Git user
git at public.curoverse.com
Mon May 23 10:17:21 EDT 2016
Summary of changes:
sdk/go/dispatch/dispatch.go | 135 ++++++++-------
.../crunch-dispatch-local/crunch-dispatch-local.go | 49 +++---
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 189 ++++++---------------
3 files changed, 150 insertions(+), 223 deletions(-)
via ce6ffc733c3d8a4637066a90df90d8ffa5d67116 (commit)
from 2361ee3e1ff9f98041c7a84dcd2bbdf956fa004c (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit ce6ffc733c3d8a4637066a90df90d8ffa5d67116
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon May 23 10:17:17 2016 -0400
9187: Fix compiler errors. On to tests.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 88330b9..cc9b20d 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,7 +1,12 @@
-package dispatcher
+package dispatch
import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
"time"
)
@@ -30,27 +35,27 @@ type ContainerList struct {
type DispatcherState struct {
mineMutex sync.Mutex
- mineMap map[string]chan struct{}
+ mineMap map[string]chan Container
pollInterval time.Duration
- arv arvadosclient.ArvadosClient
+ Arv arvadosclient.ArvadosClient
auth apiClientAuthorization
- containers chan Container
+ containers chan []Container
doneProcessing chan struct{}
- runContainer func(c Container)
- waitContainer func(c Container)
+ runContainer func(*DispatcherState, Container, chan Container)
+ waitContainer func(*DispatcherState, Container, chan Container)
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
// for which this process has a goroutine running. Returns channel used to
-// send a "cancel" signal.
-func (dispatcher *DispatcherState) setMine(uuid string) chan struct{} {
+// send container status updates.
+func (dispatcher *DispatcherState) setMine(uuid string) chan Container {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
if ch, ok := dispatcher.mineMap[uuid]; ok {
return ch
}
- ch = make(chan struct{})
+ ch := make(chan Container)
dispatcher.mineMap[uuid] = ch
return ch
}
@@ -66,56 +71,50 @@ func (dispatcher *DispatcherState) notMine(uuid string) {
// Check whether there is already a goroutine running for this
// container.
-func (dispatcher *DispatcherState) checkMine(uuid string) chan struct{} {
- mineMutex.Lock()
- defer mineMutex.Unlock()
- ch, ok := dispatcher.mineMap[uuid]
+func (dispatcher *DispatcherState) updateMine(c Container) bool {
+ dispatcher.mineMutex.Lock()
+ defer dispatcher.mineMutex.Unlock()
+ ch, ok := dispatcher.mineMap[c.UUID]
if ok {
- return ch
+ ch <- c
+ return true
}
- return nil
+ return false
}
func (dispatcher *DispatcherState) pollContainers() {
ticker := time.NewTicker(dispatcher.pollInterval)
paramsQ := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
- "order": []string{"priority desc"}}
+ "filters": [][]interface{}{{"state", "=", "Queued"},
+ {"priority", ">", 0}},
+ "order": []string{"priority desc"}}
paramsP := arvadosclient.Dict{
"filters": [][]interface{}{{"state", "in", []string{"Locked", "Running"}},
- []string{"priority", "=", "0"},
- []string{"LockedByUUID", "=", dispatcher.auth.UUID}}}
+ {"LockedByUUID", "=", dispatcher.auth.UUID}}}
for {
select {
case <-ticker.C:
{
var containers ContainerList
- err := arv.List("containers", paramsQ, &containers)
+ err := dispatcher.Arv.List("containers", paramsQ, &containers)
if err != nil {
- log.Printf("Error getting list of queued containers: %q", err)
+ log.Printf("Error getting list of containers: %q", err)
} else {
- for _, c := range containers.Items {
- dispatcher.containers <- c
- }
+ dispatcher.containers <- containers.Items
}
}
{
var containers ContainerList
- err := arv.List("containers", paramsP, &containers)
+ err := dispatcher.Arv.List("containers", paramsP, &containers)
if err != nil {
- log.Printf("Error getting list of cancelled containers: %q", err)
+ log.Printf("Error getting list of containers: %q", err)
} else {
- for _, c := range containers.Items {
- // only containers I know about
- if ch := dispatcher.checkMine(c.UUID); ch != nil {
- ch <- struct{}{}
- }
- }
+ dispatcher.containers <- containers.Items
}
}
- case <-doneProcessing:
+ case <-dispatcher.doneProcessing:
close(dispatcher.containers)
ticker.Stop()
return
@@ -123,30 +122,40 @@ func (dispatcher *DispatcherState) pollContainers() {
}
}
-func (dispatcher *DispatcherState) runQueuedContainers() {
- for container := range dispatcher.containers {
- if dispatcher.checkMine(c.UUID) != nil {
- continue
- }
+func (dispatcher *DispatcherState) handleContainers() {
+ for containerlist := range dispatcher.containers {
+ for _, container := range containerlist {
+ if dispatcher.updateMine(container) {
+ continue
+ }
- if container.State == "Locked" {
- if container.LockedByUUID != dispatcher.auth.UUID {
- // Locked by a different dispatcher
+ if container.State == "Locked" || container.State == "Running" {
+ log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+ "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+ container.UUID, dispatcher.auth.UUID)
+ go func(c Container, ch chan Container) {
+ defer dispatcher.notMine(c.UUID)
+ dispatcher.waitContainer(dispatcher, c, ch)
+ }(container, dispatcher.setMine(container.UUID))
continue
}
- log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
- "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
- container.UUID, dispatcher.auth.UUID)
- go dispatcher.waitContainer(dispatcher, container, dispatcher.setMine(container.UUID))
- continue
- }
- go dispatcher.runContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+ // Lock container to this dispatcher
+ if err := dispatcher.UpdateState(container.UUID, "Locked"); err != nil {
+ continue
+ }
+
+ // Run it
+ go func(c Container, ch chan Container) {
+ defer dispatcher.notMine(c.UUID)
+ dispatcher.runContainer(dispatcher, c, ch)
+ }(container, dispatcher.setMine(container.UUID))
+ }
}
}
func (dispatcher *DispatcherState) UpdateState(uuid, newState string) error {
- err := dispatcher.arv.Update("containers", uuid,
+ err := dispatcher.Arv.Update("containers", uuid,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": newState}},
nil)
@@ -157,35 +166,29 @@ func (dispatcher *DispatcherState) UpdateState(uuid, newState string) error {
}
func (dispatcher *DispatcherState) RunDispatcher() {
-
// Graceful shutdown on signal
- sigChan = make(chan os.Signal, 1)
+ sigChan := make(chan os.Signal)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func(sig <-chan os.Signal) {
for sig := range sig {
log.Printf("Caught signal: %v", sig)
- doneProcessing <- struct{}{}
+ dispatcher.doneProcessing <- struct{}{}
}
}(sigChan)
go dispatcher.pollContainers()
- dispatcher.runQueuedContainers()
+ dispatcher.handleContainers()
}
func MakeDispatcher(arv arvadosclient.ArvadosClient,
pollInterval time.Duration,
- runContainer func(*DispatcherState, Container, chan struct{}),
- waitContainer func(*DispatcherState, Container, chan struct{})) (*DispatcherState, error) {
+ runContainer func(*DispatcherState, Container, chan Container),
+ waitContainer func(*DispatcherState, Container, chan Container)) (*DispatcherState, error) {
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- log.Printf("Error making Arvados client: %v", err)
- return nil, err
- }
- dispatcher := make(DispatcherState)
- dispatcher.arv = arv
+ dispatcher := &DispatcherState{}
+ dispatcher.Arv = arv
- err = dispatcher.arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+ err := dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
if err != nil {
log.Printf("Error getting my token UUID: %v", err)
return nil, err
@@ -194,9 +197,9 @@ func MakeDispatcher(arv arvadosclient.ArvadosClient,
dispatcher.pollInterval = pollInterval
dispatcher.runContainer = runContainer
dispatcher.waitContainer = waitContainer
- dispatcher.mineMap = make(map[string]chan struct{})
- dispatcher.doneProcessing = make(chan bool)
- dispatcher.containers = make(chan Container)
+ dispatcher.mineMap = make(map[string]chan Container)
+ dispatcher.doneProcessing = make(chan struct{})
+ dispatcher.containers = make(chan []Container)
return dispatcher, nil
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index a9cb73b..f9596f1 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -7,9 +7,7 @@ import (
"log"
"os"
"os/exec"
- "os/signal"
"sync"
- "syscall"
"time"
)
@@ -45,11 +43,19 @@ func doMain() error {
runningCmds = make(map[string]*exec.Cmd)
- dispatch, err = dispatch.MakeDispatcher(arv, pollInterval*time.Second, run, setFinalState)
+ arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
+ log.Printf("Error making Arvados client: %v", err)
return err
}
- dispatch.RunDispatcher()
+
+ var dispatcher *dispatch.DispatcherState
+ dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
+ run, setFinalState)
+ if err != nil {
+ return err
+ }
+ dispatcher.RunDispatcher()
runningCmdsMutex.Lock()
// Finished dispatching; interrupt any crunch jobs that are still running
@@ -69,19 +75,14 @@ func doMain() error {
// Run container using the given crunch-run command
// Set the container state to Running
// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(dispatcher *DispatcherState,
- container Container,
- cancel chan struct{}) {
+func run(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
uuid := container.UUID
waitGroup.Add(1)
defer waitGroup.Done()
- defer dispatcher.notMine(uuid)
-
- if err := dispatcher.UpdateState(uuid, "Locked"); err != nil {
- return
- }
cmd := exec.Command(*crunchRunCommand, uuid)
cmd.Stdin = nil
@@ -94,14 +95,14 @@ func run(dispatcher *DispatcherState,
if err := cmd.Start(); err != nil {
runningCmdsMutex.Unlock()
log.Printf("Error starting crunch-run for %v: %q", uuid, err)
- updateState(uuid, "Queued")
+ dispatcher.UpdateState(uuid, "Cancelled")
return
}
runningCmds[uuid] = cmd
runningCmdsMutex.Unlock()
defer func() {
- setFinalState(dispatcher, uuid)
+ setFinalState(dispatcher, container, status)
// Remove the crunch job from runningCmds
runningCmdsMutex.Lock()
@@ -111,13 +112,13 @@ func run(dispatcher *DispatcherState,
log.Printf("Starting container %v", uuid)
- // Interrupt the child process if notification comes on cancel channel
+ // Interrupt the child process if priority changes to 0
go func() {
- select {
- case <-cancel:
- log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
- cmd.Process.Signal(os.Interrupt)
- return
+ for c := range status {
+ if (c.State == "Locked" || c.State == "Running") && c.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
+ }
}
}()
@@ -129,9 +130,9 @@ func run(dispatcher *DispatcherState,
log.Printf("Finished container run for %v", uuid)
}
-func setFinalState(dispatcher *DispatcherState,
- container Container,
- cancel chan struct{}) {
+func setFinalState(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
uuid := container.UUID
@@ -139,7 +140,7 @@ func setFinalState(dispatcher *DispatcherState,
// If the container is "Running" or "Locked", that's an error, so
// change it to "Cancelled". TODO: perhaps this should be "Error"
// state instead?
- err := dispatcher.arv.Get("containers", uuid, nil, &container)
+ err := dispatcher.Arv.Get("containers", uuid, nil, &container)
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index d6359ac..254f862 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -5,15 +5,13 @@ import (
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/dispatch"
"io/ioutil"
"log"
"math"
"os"
"os/exec"
- "os/signal"
"strconv"
- "sync"
- "syscall"
"time"
)
@@ -47,79 +45,29 @@ func doMain() error {
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
+ arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
+ log.Printf("Error making Arvados client: %v", err)
return err
}
- // Channel to terminate
- doneProcessing = make(chan bool)
-
- // Graceful shutdown
- sigChan = make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- go func(sig <-chan os.Signal) {
- for sig := range sig {
- log.Printf("Caught signal: %v", sig)
- doneProcessing <- true
- }
- }(sigChan)
-
- // Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
-
- // Wait for all running crunch jobs to complete / terminate
- waitGroup.Wait()
-
- return nil
-}
-
-// Poll for queued containers using pollInterval.
-// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
- var auth apiClientAuthorization
- err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+ var dispatcher *dispatch.DispatcherState
+ dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
+ func(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+ run(*crunchRunCommand, *finishCommand, dispatcher, container, status)
+ }, waitContainer)
if err != nil {
- log.Printf("Error getting my token UUID: %v", err)
- return
- }
-
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
- for {
- select {
- case <-ticker.C:
- dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
- case <-doneProcessing:
- ticker.Stop()
- return
- }
- }
-}
-
-// Get the list of queued containers from API server and invoke run
-// for each container.
-func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
- params := arvadosclient.Dict{
- "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}},
- []string{"priority", ">", "0"}},
- "order": []string{"priority desc"}}
-
- var containers ContainerList
- err := arv.List("containers", params, &containers)
- if err != nil {
- log.Printf("Error getting list of queued containers: %q", err)
- return
+ return err
}
+ dispatcher.RunDispatcher()
+ return nil
}
// sbatchCmd
-func sbatchFunc(container Container) *exec.Cmd {
+func sbatchFunc(container dispatch.Container) *exec.Cmd {
memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
return exec.Command("sbatch", "--share", "--parsable",
"--job-name="+container.UUID,
@@ -138,7 +86,8 @@ func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiIns
var striggerCmd = striggerFunc
// Submit job to slurm using sbatch.
-func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+func submit(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
defer func() {
@@ -148,7 +97,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
// OK, no cleanup needed
return
}
- err := arv.Update("containers", container.UUID,
+ err := dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Queued"}},
nil)
@@ -222,8 +171,12 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
// the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
- cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
+ insecure := "0"
+ if arv.ApiInsecure {
+ insecure = "1"
+ }
+ cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@@ -235,93 +188,63 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
}
}
-func updateState(uuid, newState string) error {
- err := arv.Update("containers", uuid,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": newState}},
- nil)
- if err != nil {
- log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
- }
- return err
-}
-
// Run a queued container: [1] Set container state to locked. [2]
// Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
- setMine(container.UUID, true)
- defer setMine(container.UUID, false)
-
- if err := updateState(uuid, "Locked"); err != nil {
- return
- }
+func run(crunchRunCommand, finishCommand string,
+ dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
log.Printf("About to submit queued container %v", container.UUID)
- jobid, err := submit(container, crunchRunCommand)
+ jobid, err := submit(dispatcher, container, crunchRunCommand)
if err != nil {
log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
return
}
- insecure := "0"
- if arv.ApiInsecure {
- insecure = "1"
- }
- finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+ finalizeRecordOnFinish(jobid, container.UUID, finishCommand, dispatcher.Arv)
log.Printf("Submitted container %v to slurm", container.UUID)
- waitContainer(container, pollInterval)
+ waitContainer(dispatcher, container, status)
}
// Wait for a container to finish. Cancel the slurm job if the
// container priority changes to zero before it ends.
-func waitContainer(container Container, pollInterval time.Duration) {
+func waitContainer(dispatcher *dispatch.DispatcherState,
+ container dispatch.Container,
+ status chan dispatch.Container) {
+
log.Printf("Monitoring container %v started", container.UUID)
defer log.Printf("Monitoring container %v finished", container.UUID)
- pollTicker := time.NewTicker(pollInterval)
- defer pollTicker.Stop()
- for _ = range pollTicker.C {
- var updated Container
- err := arv.Get("containers", container.UUID, nil, &updated)
- if err != nil {
- log.Printf("Error getting container %s: %q", container.UUID, err)
- continue
- }
- if updated.State == "Complete" || updated.State == "Cancelled" {
- return
- }
- if updated.Priority != 0 {
- continue
- }
-
- // Priority is zero, but state is Running or Locked
- log.Printf("Canceling container %s", container.UUID)
+ for container = range status {
+ if (container.State == "Locked" || container.State == "Running") && container.Priority == 0 {
+ log.Printf("Canceling container %s", container.UUID)
+
+ err := exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue; will retry", container.UUID)
+ continue
+ }
+ }
- err = exec.Command("scancel", "--name="+container.UUID).Run()
- if err != nil {
- log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
- if inQ, err := checkSqueue(container.UUID); err != nil {
- log.Printf("Error running squeue: %v", err)
- continue
- } else if inQ {
- log.Printf("Container %s is still in squeue; will retry", container.UUID)
+ err = dispatcher.Arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Cancelled"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating state for container %s: %s", container.UUID, err)
continue
}
}
-
- err = arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Cancelled"}},
- nil)
- if err != nil {
- log.Printf("Error updating state for container %s: %s", container.UUID, err)
- continue
- }
-
- return
}
+
}
func checkSqueue(uuid string) (bool, error) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list