[ARVADOS] updated: 78c3676de22b2795701d3e36e734ccb081157086
git at public.curoverse.com
git at public.curoverse.com
Wed Jan 20 11:41:28 EST 2016
Summary of changes:
.../crunch-dispatch-local/crunch-dispatch-local.go | 76 ++++++++++++++--------
1 file changed, 50 insertions(+), 26 deletions(-)
via 78c3676de22b2795701d3e36e734ccb081157086 (commit)
from 93b12225ca665ec1869eea038aa516ec71c68ff4 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 78c3676de22b2795701d3e36e734ccb081157086
Author: radhika <radhika at curoverse.com>
Date: Wed Jan 20 11:40:30 2016 -0500
8028: add command to waitGroup during run method itself; not during signal handling.
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index eb8550e..47c3564 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -19,8 +19,14 @@ func main() {
}
}
-var arv arvadosclient.ArvadosClient
-var runningCmds map[string]*exec.Cmd
+var (
+ arv arvadosclient.ArvadosClient
+ runningCmds map[string]*exec.Cmd
+ runningCmdsMutex sync.Mutex
+ waitGroup sync.WaitGroup
+ doneProcessing chan bool
+ sigChan chan os.Signal
+)
func doMain() error {
flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
@@ -49,44 +55,46 @@ func doMain() error {
return err
}
+ // Channel to terminate
+ doneProcessing = make(chan bool)
+
+ // Map of running crunch jobs
runningCmds = make(map[string]*exec.Cmd)
+
+ // Graceful shutdown
sigChan = make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func(sig <-chan os.Signal) {
- var wg sync.WaitGroup
for sig := range sig {
+ log.Printf("Caught signal: %v", sig)
doneProcessing <- true
- caught := sig
- for uuid, cmd := range runningCmds {
- go func(uuid string) {
- wg.Add(1)
- defer wg.Done()
- cmd.Process.Signal(caught)
- if _, err := cmd.Process.Wait(); err != nil {
- log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
- }
- }(uuid)
- }
}
- wg.Wait()
}(sigChan)
- // channel to terminate
- doneProcessing = make(chan bool)
-
- // run all queued containers
+ // Run all queued containers
runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+
+ // Finished dispatching; interrupt any crunch jobs that are still running
+ for uuid, cmd := range runningCmds {
+ go func(uuid string) {
+ cmd.Process.Signal(os.Interrupt)
+ if _, err := cmd.Process.Wait(); err != nil {
+ log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+ }
+ }(uuid)
+ }
+
+ // Wait for all running crunch jobs to complete / terminate
+ waitGroup.Wait()
+
return nil
}
-var doneProcessing chan bool
-var sigChan chan os.Signal
-
// Poll for queued containers using pollInterval.
// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
//
// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more child processes are running,
+// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
@@ -129,6 +137,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
for i := 0; i < len(containers.Items); i++ {
log.Printf("About to run queued container %v", containers.Items[i].UUID)
+ // Run the container
go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
}
}
@@ -149,10 +158,18 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
return
}
+ // Add this crunch job to the list of runningCmds
+ runningCmdsMutex.Lock()
runningCmds[uuid] = cmd
+ runningCmdsMutex.Unlock()
log.Printf("Started container run for %v", uuid)
+ // Add this crunch job to waitGroup
+ waitGroup.Add(1)
+ defer waitGroup.Done()
+
+ // Update container status to Running
err := arv.Update("containers", uuid,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Running"}},
@@ -161,7 +178,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
}
- // Terminate the runner if container priority becomes zero
+ // A goroutine to terminate the runner if container priority becomes zero
priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
go func() {
for {
@@ -175,7 +192,9 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
if container.Priority == 0 {
priorityTicker.Stop()
cmd.Process.Signal(os.Interrupt)
+ runningCmdsMutex.Lock()
delete(runningCmds, uuid)
+ runningCmdsMutex.Unlock()
return
}
}
@@ -183,14 +202,19 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
}
}()
- // Wait for the process to exit
+ // Wait for the crunch job to exit
if _, err := cmd.Process.Wait(); err != nil {
- log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+ log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
}
+
+ // Remove the crunch job to runningCmds
+ runningCmdsMutex.Lock()
delete(runningCmds, uuid)
+ runningCmdsMutex.Unlock()
priorityTicker.Stop()
+ // The container state should be 'Complete'
var container Container
err = arv.Get("containers", uuid, nil, &container)
if container.State == "Running" {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list