[ARVADOS] updated: ce6ffc733c3d8a4637066a90df90d8ffa5d67116

Git user git at public.curoverse.com
Mon May 23 10:17:21 EDT 2016


Summary of changes:
 sdk/go/dispatch/dispatch.go                        | 135 ++++++++-------
 .../crunch-dispatch-local/crunch-dispatch-local.go |  49 +++---
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 189 ++++++---------------
 3 files changed, 150 insertions(+), 223 deletions(-)

       via  ce6ffc733c3d8a4637066a90df90d8ffa5d67116 (commit)
      from  2361ee3e1ff9f98041c7a84dcd2bbdf956fa004c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit ce6ffc733c3d8a4637066a90df90d8ffa5d67116
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon May 23 10:17:17 2016 -0400

    9187: Fix compiler errors.  On to tests.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 88330b9..cc9b20d 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,7 +1,12 @@
-package dispatcher
+package dispatch
 
 import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"log"
+	"os"
+	"os/signal"
+	"sync"
+	"syscall"
 	"time"
 )
 
@@ -30,27 +35,27 @@ type ContainerList struct {
 
 type DispatcherState struct {
 	mineMutex      sync.Mutex
-	mineMap        map[string]chan struct{}
+	mineMap        map[string]chan Container
 	pollInterval   time.Duration
-	arv            arvadosclient.ArvadosClient
+	Arv            arvadosclient.ArvadosClient
 	auth           apiClientAuthorization
-	containers     chan Container
+	containers     chan []Container
 	doneProcessing chan struct{}
-	runContainer   func(c Container)
-	waitContainer  func(c Container)
+	runContainer   func(*DispatcherState, Container, chan Container)
+	waitContainer  func(*DispatcherState, Container, chan Container)
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
 // for which this process has a goroutine running.  Returns channel used to
-// send a "cancel" signal.
-func (dispatcher *DispatcherState) setMine(uuid string) chan struct{} {
+// send container status updates.
+func (dispatcher *DispatcherState) setMine(uuid string) chan Container {
 	dispatcher.mineMutex.Lock()
 	defer dispatcher.mineMutex.Unlock()
 	if ch, ok := dispatcher.mineMap[uuid]; ok {
 		return ch
 	}
 
-	ch = make(chan struct{})
+	ch := make(chan Container)
 	dispatcher.mineMap[uuid] = ch
 	return ch
 }
@@ -66,56 +71,50 @@ func (dispatcher *DispatcherState) notMine(uuid string) {
 
 // Check whether there is already a goroutine running for this
 // container.
-func (dispatcher *DispatcherState) checkMine(uuid string) chan struct{} {
-	mineMutex.Lock()
-	defer mineMutex.Unlock()
-	ch, ok := dispatcher.mineMap[uuid]
+func (dispatcher *DispatcherState) updateMine(c Container) bool {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	ch, ok := dispatcher.mineMap[c.UUID]
 	if ok {
-		return ch
+		ch <- c
+		return true
 	}
-	return nil
+	return false
 }
 
 func (dispatcher *DispatcherState) pollContainers() {
 	ticker := time.NewTicker(dispatcher.pollInterval)
 
 	paramsQ := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
-		"order":   []string{"priority desc"}}
+		"filters": [][]interface{}{{"state", "=", "Queued"},
+			{"priority", ">", 0}},
+		"order": []string{"priority desc"}}
 	paramsP := arvadosclient.Dict{
 		"filters": [][]interface{}{{"state", "in", []string{"Locked", "Running"}},
-			[]string{"priority", "=", "0"},
-			[]string{"LockedByUUID", "=", dispatcher.auth.UUID}}}
+			{"LockedByUUID", "=", dispatcher.auth.UUID}}}
 
 	for {
 		select {
 		case <-ticker.C:
 			{
 				var containers ContainerList
-				err := arv.List("containers", paramsQ, &containers)
+				err := dispatcher.Arv.List("containers", paramsQ, &containers)
 				if err != nil {
-					log.Printf("Error getting list of queued containers: %q", err)
+					log.Printf("Error getting list of containers: %q", err)
 				} else {
-					for _, c := range containers.Items {
-						dispatcher.containers <- c
-					}
+					dispatcher.containers <- containers.Items
 				}
 			}
 			{
 				var containers ContainerList
-				err := arv.List("containers", paramsP, &containers)
+				err := dispatcher.Arv.List("containers", paramsP, &containers)
 				if err != nil {
-					log.Printf("Error getting list of cancelled containers: %q", err)
+					log.Printf("Error getting list of containers: %q", err)
 				} else {
-					for _, c := range containers.Items {
-						// only containers I know about
-						if ch := dispatcher.checkMine(c.UUID); ch != nil {
-							ch <- struct{}{}
-						}
-					}
+					dispatcher.containers <- containers.Items
 				}
 			}
-		case <-doneProcessing:
+		case <-dispatcher.doneProcessing:
 			close(dispatcher.containers)
 			ticker.Stop()
 			return
@@ -123,30 +122,40 @@ func (dispatcher *DispatcherState) pollContainers() {
 	}
 }
 
-func (dispatcher *DispatcherState) runQueuedContainers() {
-	for container := range dispatcher.containers {
-		if dispatcher.checkMine(c.UUID) != nil {
-			continue
-		}
+func (dispatcher *DispatcherState) handleContainers() {
+	for containerlist := range dispatcher.containers {
+		for _, container := range containerlist {
+			if dispatcher.updateMine(container) {
+				continue
+			}
 
-		if container.State == "Locked" {
-			if container.LockedByUUID != dispatcher.auth.UUID {
-				// Locked by a different dispatcher
+			if container.State == "Locked" || container.State == "Running" {
+				log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+					"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+					container.UUID, dispatcher.auth.UUID)
+				go func(c Container, ch chan Container) {
+					defer dispatcher.notMine(c.UUID)
+					dispatcher.waitContainer(dispatcher, c, ch)
+				}(container, dispatcher.setMine(container.UUID))
 				continue
 			}
-			log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
-				"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
-				container.UUID, dispatcher.auth.UUID)
-			go dispatcher.waitContainer(dispatcher, container, dispatcher.setMine(container.UUID))
-			continue
-		}
 
-		go dispatcher.runContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+			// Lock container to this dispatcher
+			if err := dispatcher.UpdateState(container.UUID, "Locked"); err != nil {
+				continue
+			}
+
+			// Run it
+			go func(c Container, ch chan Container) {
+				defer dispatcher.notMine(c.UUID)
+				dispatcher.runContainer(dispatcher, c, ch)
+			}(container, dispatcher.setMine(container.UUID))
+		}
 	}
 }
 
 func (dispatcher *DispatcherState) UpdateState(uuid, newState string) error {
-	err := dispatcher.arv.Update("containers", uuid,
+	err := dispatcher.Arv.Update("containers", uuid,
 		arvadosclient.Dict{
 			"container": arvadosclient.Dict{"state": newState}},
 		nil)
@@ -157,35 +166,29 @@ func (dispatcher *DispatcherState) UpdateState(uuid, newState string) error {
 }
 
 func (dispatcher *DispatcherState) RunDispatcher() {
-
 	// Graceful shutdown on signal
-	sigChan = make(chan os.Signal, 1)
+	sigChan := make(chan os.Signal)
 	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
 	go func(sig <-chan os.Signal) {
 		for sig := range sig {
 			log.Printf("Caught signal: %v", sig)
-			doneProcessing <- struct{}{}
+			dispatcher.doneProcessing <- struct{}{}
 		}
 	}(sigChan)
 
 	go dispatcher.pollContainers()
-	dispatcher.runQueuedContainers()
+	dispatcher.handleContainers()
 }
 
 func MakeDispatcher(arv arvadosclient.ArvadosClient,
 	pollInterval time.Duration,
-	runContainer func(*DispatcherState, Container, chan struct{}),
-	waitContainer func(*DispatcherState, Container, chan struct{})) (*DispatcherState, error) {
+	runContainer func(*DispatcherState, Container, chan Container),
+	waitContainer func(*DispatcherState, Container, chan Container)) (*DispatcherState, error) {
 
-	arv, err = arvadosclient.MakeArvadosClient()
-	if err != nil {
-		log.Printf("Error making Arvados client: %v", err)
-		return nil, err
-	}
-	dispatcher := make(DispatcherState)
-	dispatcher.arv = arv
+	dispatcher := &DispatcherState{}
+	dispatcher.Arv = arv
 
-	err = dispatcher.arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+	err := dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
 	if err != nil {
 		log.Printf("Error getting my token UUID: %v", err)
 		return nil, err
@@ -194,9 +197,9 @@ func MakeDispatcher(arv arvadosclient.ArvadosClient,
 	dispatcher.pollInterval = pollInterval
 	dispatcher.runContainer = runContainer
 	dispatcher.waitContainer = waitContainer
-	dispatcher.mineMap = make(map[string]chan struct{})
-	dispatcher.doneProcessing = make(chan bool)
-	dispatcher.containers = make(chan Container)
+	dispatcher.mineMap = make(map[string]chan Container)
+	dispatcher.doneProcessing = make(chan struct{})
+	dispatcher.containers = make(chan []Container)
 
 	return dispatcher, nil
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index a9cb73b..f9596f1 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -7,9 +7,7 @@ import (
 	"log"
 	"os"
 	"os/exec"
-	"os/signal"
 	"sync"
-	"syscall"
 	"time"
 )
 
@@ -45,11 +43,19 @@ func doMain() error {
 
 	runningCmds = make(map[string]*exec.Cmd)
 
-	dispatch, err = dispatch.MakeDispatcher(arv, pollInterval*time.Second, run, setFinalState)
+	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
-	dispatch.RunDispatcher()
+
+	var dispatcher *dispatch.DispatcherState
+	dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
+		run, setFinalState)
+	if err != nil {
+		return err
+	}
+	dispatcher.RunDispatcher()
 
 	runningCmdsMutex.Lock()
 	// Finished dispatching; interrupt any crunch jobs that are still running
@@ -69,19 +75,14 @@ func doMain() error {
 // Run container using the given crunch-run command
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
-func run(dispatcher *DispatcherState,
-	container Container,
-	cancel chan struct{}) {
+func run(dispatcher *dispatch.DispatcherState,
+	container dispatch.Container,
+	status chan dispatch.Container) {
 
 	uuid := container.UUID
 
 	waitGroup.Add(1)
 	defer waitGroup.Done()
-	defer dispatcher.notMine(uuid)
-
-	if err := dispatcher.UpdateState(uuid, "Locked"); err != nil {
-		return
-	}
 
 	cmd := exec.Command(*crunchRunCommand, uuid)
 	cmd.Stdin = nil
@@ -94,14 +95,14 @@ func run(dispatcher *DispatcherState,
 	if err := cmd.Start(); err != nil {
 		runningCmdsMutex.Unlock()
 		log.Printf("Error starting crunch-run for %v: %q", uuid, err)
-		updateState(uuid, "Queued")
+		dispatcher.UpdateState(uuid, "Cancelled")
 		return
 	}
 	runningCmds[uuid] = cmd
 	runningCmdsMutex.Unlock()
 
 	defer func() {
-		setFinalState(dispatcher, uuid)
+		setFinalState(dispatcher, container, status)
 
 		// Remove the crunch job from runningCmds
 		runningCmdsMutex.Lock()
@@ -111,13 +112,13 @@ func run(dispatcher *DispatcherState,
 
 	log.Printf("Starting container %v", uuid)
 
-	// Interrupt the child process if notification comes on cancel channel
+	// Interrupt the child process if priority changes to 0
 	go func() {
-		select {
-		case <-cancel:
-			log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
-			cmd.Process.Signal(os.Interrupt)
-			return
+		for c := range status {
+			if (c.State == "Locked" || c.State == "Running") && c.Priority == 0 {
+				log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+				cmd.Process.Signal(os.Interrupt)
+			}
 		}
 	}()
 
@@ -129,9 +130,9 @@ func run(dispatcher *DispatcherState,
 	log.Printf("Finished container run for %v", uuid)
 }
 
-func setFinalState(dispatcher *DispatcherState,
-	container Container,
-	cancel chan struct{}) {
+func setFinalState(dispatcher *dispatch.DispatcherState,
+	container dispatch.Container,
+	status chan dispatch.Container) {
 
 	uuid := container.UUID
 
@@ -139,7 +140,7 @@ func setFinalState(dispatcher *DispatcherState,
 	// If the container is "Running" or "Locked", that's an error, so
 	// change it to "Cancelled".  TODO: perhaps this should be "Error"
 	// state instead?
-	err := dispatcher.arv.Get("containers", uuid, nil, &container)
+	err := dispatcher.Arv.Get("containers", uuid, nil, &container)
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index d6359ac..254f862 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -5,15 +5,13 @@ import (
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"io/ioutil"
 	"log"
 	"math"
 	"os"
 	"os/exec"
-	"os/signal"
 	"strconv"
-	"sync"
-	"syscall"
 	"time"
 )
 
@@ -47,79 +45,29 @@ func doMain() error {
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
+	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
 		return err
 	}
 
-	// Channel to terminate
-	doneProcessing = make(chan bool)
-
-	// Graceful shutdown
-	sigChan = make(chan os.Signal, 1)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			doneProcessing <- true
-		}
-	}(sigChan)
-
-	// Run all queued containers
-	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
-
-	// Wait for all running crunch jobs to complete / terminate
-	waitGroup.Wait()
-
-	return nil
-}
-
-// Poll for queued containers using pollInterval.
-// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
-	var auth apiClientAuthorization
-	err := arv.Call("GET", "api_client_authorizations", "", "current", nil, &auth)
+	var dispatcher *dispatch.DispatcherState
+	dispatcher, err = dispatch.MakeDispatcher(arv, time.Duration(*pollInterval)*time.Second,
+		func(dispatcher *dispatch.DispatcherState,
+			container dispatch.Container,
+			status chan dispatch.Container) {
+			run(*crunchRunCommand, *finishCommand, dispatcher, container, status)
+		}, waitContainer)
 	if err != nil {
-		log.Printf("Error getting my token UUID: %v", err)
-		return
-	}
-
-	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
-	for {
-		select {
-		case <-ticker.C:
-			dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
-		case <-doneProcessing:
-			ticker.Stop()
-			return
-		}
-	}
-}
-
-// Get the list of queued containers from API server and invoke run
-// for each container.
-func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
-	params := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}},
-			[]string{"priority", ">", "0"}},
-		"order": []string{"priority desc"}}
-
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
-	if err != nil {
-		log.Printf("Error getting list of queued containers: %q", err)
-		return
+		return err
 	}
+	dispatcher.RunDispatcher()
 
+	return nil
 }
 
 // sbatchCmd
-func sbatchFunc(container Container) *exec.Cmd {
+func sbatchFunc(container dispatch.Container) *exec.Cmd {
 	memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
 	return exec.Command("sbatch", "--share", "--parsable",
 		"--job-name="+container.UUID,
@@ -138,7 +86,8 @@ func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiIns
 var striggerCmd = striggerFunc
 
 // Submit job to slurm using sbatch.
-func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+func submit(dispatcher *dispatch.DispatcherState,
+	container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
 	submitErr = nil
 
 	defer func() {
@@ -148,7 +97,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 			// OK, no cleanup needed
 			return
 		}
-		err := arv.Update("containers", container.UUID,
+		err := dispatcher.Arv.Update("containers", container.UUID,
 			arvadosclient.Dict{
 				"container": arvadosclient.Dict{"state": "Queued"}},
 			nil)
@@ -222,8 +171,12 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 
 // finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
 // the slurm controller when the job finishes.
-func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
-	cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand string, arv arvadosclient.ArvadosClient) {
+	insecure := "0"
+	if arv.ApiInsecure {
+		insecure = "1"
+	}
+	cmd := striggerCmd(jobid, containerUUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
 	err := cmd.Run()
@@ -235,93 +188,63 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
 	}
 }
 
-func updateState(uuid, newState string) error {
-	err := arv.Update("containers", uuid,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": newState}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
-	}
-	return err
-}
-
 // Run a queued container: [1] Set container state to locked. [2]
 // Execute crunch-run as a slurm batch job. [3] waitContainer().
-func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
-	setMine(container.UUID, true)
-	defer setMine(container.UUID, false)
-
-	if err := updateState(uuid, "Locked"); err != nil {
-		return
-	}
+func run(crunchRunCommand, finishCommand string,
+	dispatcher *dispatch.DispatcherState,
+	container dispatch.Container,
+	status chan dispatch.Container) {
 
 	log.Printf("About to submit queued container %v", container.UUID)
 
-	jobid, err := submit(container, crunchRunCommand)
+	jobid, err := submit(dispatcher, container, crunchRunCommand)
 	if err != nil {
 		log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
 		return
 	}
 
-	insecure := "0"
-	if arv.ApiInsecure {
-		insecure = "1"
-	}
-	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, dispatcher.Arv)
 
 	log.Printf("Submitted container %v to slurm", container.UUID)
-	waitContainer(container, pollInterval)
+	waitContainer(dispatcher, container, status)
 }
 
 // Wait for a container to finish. Cancel the slurm job if the
 // container priority changes to zero before it ends.
-func waitContainer(container Container, pollInterval time.Duration) {
+func waitContainer(dispatcher *dispatch.DispatcherState,
+	container dispatch.Container,
+	status chan dispatch.Container) {
+
 	log.Printf("Monitoring container %v started", container.UUID)
 	defer log.Printf("Monitoring container %v finished", container.UUID)
 
-	pollTicker := time.NewTicker(pollInterval)
-	defer pollTicker.Stop()
-	for _ = range pollTicker.C {
-		var updated Container
-		err := arv.Get("containers", container.UUID, nil, &updated)
-		if err != nil {
-			log.Printf("Error getting container %s: %q", container.UUID, err)
-			continue
-		}
-		if updated.State == "Complete" || updated.State == "Cancelled" {
-			return
-		}
-		if updated.Priority != 0 {
-			continue
-		}
-
-		// Priority is zero, but state is Running or Locked
-		log.Printf("Canceling container %s", container.UUID)
+	for container = range status {
+		if (container.State == "Locked" || container.State == "Running") && container.Priority == 0 {
+			log.Printf("Canceling container %s", container.UUID)
+
+			err := exec.Command("scancel", "--name="+container.UUID).Run()
+			if err != nil {
+				log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+				if inQ, err := checkSqueue(container.UUID); err != nil {
+					log.Printf("Error running squeue: %v", err)
+					continue
+				} else if inQ {
+					log.Printf("Container %s is still in squeue; will retry", container.UUID)
+					continue
+				}
+			}
 
-		err = exec.Command("scancel", "--name="+container.UUID).Run()
-		if err != nil {
-			log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
-			if inQ, err := checkSqueue(container.UUID); err != nil {
-				log.Printf("Error running squeue: %v", err)
-				continue
-			} else if inQ {
-				log.Printf("Container %s is still in squeue; will retry", container.UUID)
+			err = dispatcher.Arv.Update("containers", container.UUID,
+				arvadosclient.Dict{
+					"container": arvadosclient.Dict{"state": "Cancelled"}},
+				nil)
+			if err != nil {
+				log.Printf("Error updating state for container %s: %s", container.UUID, err)
 				continue
 			}
 		}
-
-		err = arv.Update("containers", container.UUID,
-			arvadosclient.Dict{
-				"container": arvadosclient.Dict{"state": "Cancelled"}},
-			nil)
-		if err != nil {
-			log.Printf("Error updating state for container %s: %s", container.UUID, err)
-			continue
-		}
-
-		return
 	}
+
 }
 
 func checkSqueue(uuid string) (bool, error) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list