[ARVADOS] updated: 78c3676de22b2795701d3e36e734ccb081157086

git at public.curoverse.com git at public.curoverse.com
Wed Jan 20 11:41:28 EST 2016


Summary of changes:
 .../crunch-dispatch-local/crunch-dispatch-local.go | 76 ++++++++++++++--------
 1 file changed, 50 insertions(+), 26 deletions(-)

       via  78c3676de22b2795701d3e36e734ccb081157086 (commit)
      from  93b12225ca665ec1869eea038aa516ec71c68ff4 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 78c3676de22b2795701d3e36e734ccb081157086
Author: radhika <radhika at curoverse.com>
Date:   Wed Jan 20 11:40:30 2016 -0500

    8028: add command to waitGroup during run method itself; not during signal handling.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index eb8550e..47c3564 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -19,8 +19,14 @@ func main() {
 	}
 }
 
-var arv arvadosclient.ArvadosClient
-var runningCmds map[string]*exec.Cmd
+var (
+	arv              arvadosclient.ArvadosClient
+	runningCmds      map[string]*exec.Cmd
+	runningCmdsMutex sync.Mutex
+	waitGroup        sync.WaitGroup
+	doneProcessing   chan bool
+	sigChan          chan os.Signal
+)
 
 func doMain() error {
 	flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
@@ -49,44 +55,46 @@ func doMain() error {
 		return err
 	}
 
+	// Channel to terminate
+	doneProcessing = make(chan bool)
+
+	// Map of running crunch jobs
 	runningCmds = make(map[string]*exec.Cmd)
+
+	// Graceful shutdown
 	sigChan = make(chan os.Signal, 1)
 	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 	go func(sig <-chan os.Signal) {
-		var wg sync.WaitGroup
 		for sig := range sig {
+			log.Printf("Caught signal: %v", sig)
 			doneProcessing <- true
-			caught := sig
-			for uuid, cmd := range runningCmds {
-				go func(uuid string) {
-					wg.Add(1)
-					defer wg.Done()
-					cmd.Process.Signal(caught)
-					if _, err := cmd.Process.Wait(); err != nil {
-						log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
-					}
-				}(uuid)
-			}
 		}
-		wg.Wait()
 	}(sigChan)
 
-	// channel to terminate
-	doneProcessing = make(chan bool)
-
-	// run all queued containers
+	// Run all queued containers
 	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+
+	// Finished dispatching; interrupt any crunch jobs that are still running
+	for uuid, cmd := range runningCmds {
+		go func(uuid string) {
+			cmd.Process.Signal(os.Interrupt)
+			if _, err := cmd.Process.Wait(); err != nil {
+				log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+			}
+		}(uuid)
+	}
+
+	// Wait for all running crunch jobs to complete / terminate
+	waitGroup.Wait()
+
 	return nil
 }
 
-var doneProcessing chan bool
-var sigChan chan os.Signal
-
 // Poll for queued containers using pollInterval.
 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more child processes are running,
+// This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
 	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
@@ -129,6 +137,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 
 	for i := 0; i < len(containers.Items); i++ {
 		log.Printf("About to run queued container %v", containers.Items[i].UUID)
+		// Run the container
 		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
 	}
 }
@@ -149,10 +158,18 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		return
 	}
 
+	// Add this crunch job to the list of runningCmds
+	runningCmdsMutex.Lock()
 	runningCmds[uuid] = cmd
+	runningCmdsMutex.Unlock()
 
 	log.Printf("Started container run for %v", uuid)
 
+	// Add this crunch job to waitGroup
+	waitGroup.Add(1)
+	defer waitGroup.Done()
+
+	// Update container status to Running
 	err := arv.Update("containers", uuid,
 		arvadosclient.Dict{
 			"container": arvadosclient.Dict{"state": "Running"}},
@@ -161,7 +178,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
 	}
 
-	// Terminate the runner if container priority becomes zero
+	// A goroutine to terminate the runner if container priority becomes zero
 	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
 	go func() {
 		for {
@@ -175,7 +192,9 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 					if container.Priority == 0 {
 						priorityTicker.Stop()
 						cmd.Process.Signal(os.Interrupt)
+						runningCmdsMutex.Lock()
 						delete(runningCmds, uuid)
+						runningCmdsMutex.Unlock()
 						return
 					}
 				}
@@ -183,14 +202,19 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 		}
 	}()
 
-	// Wait for the process to exit
+	// Wait for the crunch job to exit
 	if _, err := cmd.Process.Wait(); err != nil {
-		log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
 	}
+
+	// Remove the crunch job to runningCmds
+	runningCmdsMutex.Lock()
 	delete(runningCmds, uuid)
+	runningCmdsMutex.Unlock()
 
 	priorityTicker.Stop()
 
+	// The container state should be 'Complete'
 	var container Container
 	err = arv.Get("containers", uuid, nil, &container)
 	if container.State == "Running" {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list