[ARVADOS] created: 2361ee3e1ff9f98041c7a84dcd2bbdf956fa004c

Git user git at public.curoverse.com
Fri May 20 16:41:59 EDT 2016


        at  2361ee3e1ff9f98041c7a84dcd2bbdf956fa004c (commit)


commit 2361ee3e1ff9f98041c7a84dcd2bbdf956fa004c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 20 16:41:53 2016 -0400

    9187: Dispatcher refactoring WIP

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
new file mode 100644
index 0000000..88330b9
--- /dev/null
+++ b/sdk/go/dispatch/dispatch.go
@@ -0,0 +1,202 @@
+package dispatcher
+
+import (
+	"log"
+	"time"
+)
+
+type apiClientAuthorization struct {
+	UUID     string `json:"uuid"`
+	APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+	Items []apiClientAuthorization `json:"items"`
+}
+
+// Container data
+type Container struct {
+	UUID               string           `json:"uuid"`
+	State              string           `json:"state"`
+	Priority           int              `json:"priority"`
+	RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+	LockedByUUID       string           `json:"locked_by_uuid"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+	Items []Container `json:"items"`
+}
+
+type DispatcherState struct {
+	mineMutex      sync.Mutex
+	mineMap        map[string]chan struct{}
+	pollInterval   time.Duration
+	arv            arvadosclient.ArvadosClient
+	auth           apiClientAuthorization
+	containers     chan Container
+	doneProcessing chan struct{}
+	runContainer   func(c Container)
+	waitContainer  func(c Container)
+}
+
+// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
+// for which this process has a goroutine running.  Returns channel used to
+// send a "cancel" signal.
+func (dispatcher *DispatcherState) setMine(uuid string) chan struct{} {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	if ch, ok := dispatcher.mineMap[uuid]; ok {
+		return ch
+	}
+
+	ch = make(chan struct{})
+	dispatcher.mineMap[uuid] = ch
+	return ch
+}
+
+func (dispatcher *DispatcherState) notMine(uuid string) {
+	dispatcher.mineMutex.Lock()
+	defer dispatcher.mineMutex.Unlock()
+	if ch, ok := dispatcher.mineMap[uuid]; ok {
+		close(ch)
+		delete(dispatcher.mineMap, uuid)
+	}
+}
+
+// Check whether there is already a goroutine running for this
+// container.
+func (dispatcher *DispatcherState) checkMine(uuid string) chan struct{} {
+	mineMutex.Lock()
+	defer mineMutex.Unlock()
+	ch, ok := dispatcher.mineMap[uuid]
+	if ok {
+		return ch
+	}
+	return nil
+}
+
+func (dispatcher *DispatcherState) pollContainers() {
+	ticker := time.NewTicker(dispatcher.pollInterval)
+
+	paramsQ := arvadosclient.Dict{
+		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
+		"order":   []string{"priority desc"}}
+	paramsP := arvadosclient.Dict{
+		"filters": [][]interface{}{{"state", "in", []string{"Locked", "Running"}},
+			[]string{"priority", "=", "0"},
+			[]string{"LockedByUUID", "=", dispatcher.auth.UUID}}}
+
+	for {
+		select {
+		case <-ticker.C:
+			{
+				var containers ContainerList
+				err := arv.List("containers", paramsQ, &containers)
+				if err != nil {
+					log.Printf("Error getting list of queued containers: %q", err)
+				} else {
+					for _, c := range containers.Items {
+						dispatcher.containers <- c
+					}
+				}
+			}
+			{
+				var containers ContainerList
+				err := arv.List("containers", paramsP, &containers)
+				if err != nil {
+					log.Printf("Error getting list of cancelled containers: %q", err)
+				} else {
+					for _, c := range containers.Items {
+						// only containers I know about
+						if ch := dispatcher.checkMine(c.UUID); ch != nil {
+							ch <- struct{}{}
+						}
+					}
+				}
+			}
+		case <-doneProcessing:
+			close(dispatcher.containers)
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+func (dispatcher *DispatcherState) runQueuedContainers() {
+	for container := range dispatcher.containers {
+		if dispatcher.checkMine(c.UUID) != nil {
+			continue
+		}
+
+		if container.State == "Locked" {
+			if container.LockedByUUID != dispatcher.auth.UUID {
+				// Locked by a different dispatcher
+				continue
+			}
+			log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+				"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+				container.UUID, dispatcher.auth.UUID)
+			go dispatcher.waitContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+			continue
+		}
+
+		go dispatcher.runContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+	}
+}
+
+func (dispatcher *DispatcherState) UpdateState(uuid, newState string) error {
+	err := dispatcher.arv.Update("containers", uuid,
+		arvadosclient.Dict{
+			"container": arvadosclient.Dict{"state": newState}},
+		nil)
+	if err != nil {
+		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+	}
+	return err
+}
+
+func (dispatcher *DispatcherState) RunDispatcher() {
+
+	// Graceful shutdown on signal
+	sigChan = make(chan os.Signal, 1)
+	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+	go func(sig <-chan os.Signal) {
+		for sig := range sig {
+			log.Printf("Caught signal: %v", sig)
+			doneProcessing <- struct{}{}
+		}
+	}(sigChan)
+
+	go dispatcher.pollContainers()
+	dispatcher.runQueuedContainers()
+}
+
+func MakeDispatcher(arv arvadosclient.ArvadosClient,
+	pollInterval time.Duration,
+	runContainer func(*DispatcherState, Container, chan struct{}),
+	waitContainer func(*DispatcherState, Container, chan struct{})) (*DispatcherState, error) {
+
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Printf("Error making Arvados client: %v", err)
+		return nil, err
+	}
+	dispatcher := make(DispatcherState)
+	dispatcher.arv = arv
+
+	err = dispatcher.arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.auth)
+	if err != nil {
+		log.Printf("Error getting my token UUID: %v", err)
+		return nil, err
+	}
+
+	dispatcher.pollInterval = pollInterval
+	dispatcher.runContainer = runContainer
+	dispatcher.waitContainer = waitContainer
+	dispatcher.mineMap = make(map[string]chan struct{})
+	dispatcher.doneProcessing = make(chan bool)
+	dispatcher.containers = make(chan Container)
+
+	return dispatcher, nil
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 959164e..a9cb73b 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -3,6 +3,7 @@ package main
 import (
 	"flag"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"log"
 	"os"
 	"os/exec"
@@ -20,12 +21,10 @@ func main() {
 }
 
 var (
-	arv              arvadosclient.ArvadosClient
 	runningCmds      map[string]*exec.Cmd
 	runningCmdsMutex sync.Mutex
 	waitGroup        sync.WaitGroup
-	doneProcessing   chan bool
-	sigChan          chan os.Signal
+	crunchRunCommand *string
 )
 
 func doMain() error {
@@ -36,12 +35,7 @@ func doMain() error {
 		10,
 		"Interval in seconds to poll for queued containers")
 
-	priorityPollInterval := flags.Int(
-		"container-priority-poll-interval",
-		60,
-		"Interval in seconds to check priority of a dispatched container")
-
-	crunchRunCommand := flags.String(
+	crunchRunCommand = flags.String(
 		"crunch-run-command",
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
@@ -49,35 +43,20 @@ func doMain() error {
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
-	var err error
-	arv, err = arvadosclient.MakeArvadosClient()
+	runningCmds = make(map[string]*exec.Cmd)
+
+	dispatch, err = dispatch.MakeDispatcher(arv, pollInterval*time.Second, run, setFinalState)
 	if err != nil {
 		return err
 	}
+	dispatch.RunDispatcher()
 
-	// Channel to terminate
-	doneProcessing = make(chan bool)
-
-	// Map of running crunch jobs
-	runningCmds = make(map[string]*exec.Cmd)
-
-	// Graceful shutdown
-	sigChan = make(chan os.Signal, 1)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			doneProcessing <- true
-		}
-	}(sigChan)
-
-	// Run all queued containers
-	runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
-
+	runningCmdsMutex.Lock()
 	// Finished dispatching; interrupt any crunch jobs that are still running
 	for _, cmd := range runningCmds {
 		cmd.Process.Signal(os.Interrupt)
 	}
+	defer runningCmdsMutex.Unlock()
 
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
@@ -85,86 +64,26 @@ func doMain() error {
 	return nil
 }
 
-// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
-//
-// Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more crunch jobs are running,
-// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
-	ticker := time.NewTicker(pollInterval)
-
-	for {
-		select {
-		case <-ticker.C:
-			dispatchLocal(priorityPollInterval, crunchRunCommand)
-		case <-doneProcessing:
-			ticker.Stop()
-			return
-		}
-	}
-}
-
-// Container data
-type Container struct {
-	UUID         string `json:"uuid"`
-	State        string `json:"state"`
-	Priority     int    `json:"priority"`
-	LockedByUUID string `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-	Items []Container `json:"items"`
-}
-
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
-	params := arvadosclient.Dict{
-		"filters": [][]string{[]string{"state", "=", "Queued"},
-			[]string{"priority", ">", "0"}},
-		"order": []string{"priority desc"}}
-
-	var containers ContainerList
-	err := arv.List("containers", params, &containers)
-	if err != nil {
-		log.Printf("Error getting list of queued containers: %q", err)
-		return
-	}
-
-	for _, c := range containers.Items {
-		log.Printf("About to run queued container %v", c.UUID)
-		// Run the container
-		waitGroup.Add(1)
-		go func(c Container) {
-			run(c.UUID, crunchRunCommand, pollInterval)
-			waitGroup.Done()
-		}(c)
-	}
-}
-
-func updateState(uuid, newState string) error {
-	err := arv.Update("containers", uuid,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": newState}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
-	}
-	return err
-}
-
 // Run queued container:
 // Set container state to Locked
 // Run container using the given crunch-run command
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
-	if err := updateState(uuid, "Locked"); err != nil {
+func run(dispatcher *DispatcherState,
+	container Container,
+	cancel chan struct{}) {
+
+	uuid := container.UUID
+
+	waitGroup.Add(1)
+	defer waitGroup.Done()
+	defer dispatcher.notMine(uuid)
+
+	if err := dispatcher.UpdateState(uuid, "Locked"); err != nil {
 		return
 	}
 
-	cmd := exec.Command(crunchRunCommand, uuid)
+	cmd := exec.Command(*crunchRunCommand, uuid)
 	cmd.Stdin = nil
 	cmd.Stderr = os.Stderr
 	cmd.Stdout = os.Stderr
@@ -182,7 +101,7 @@ func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
 	runningCmdsMutex.Unlock()
 
 	defer func() {
-		setFinalState(uuid)
+		setFinalState(dispatcher, uuid)
 
 		// Remove the crunch job from runningCmds
 		runningCmdsMutex.Lock()
@@ -192,28 +111,13 @@ func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
 
 	log.Printf("Starting container %v", uuid)
 
-	cmdExited := make(chan struct{})
-
-	// Kill the child process if container priority changes to zero
+	// Interrupt the child process if notification comes on cancel channel
 	go func() {
-		ticker := time.NewTicker(pollInterval)
-		defer ticker.Stop()
-		for {
-			select {
-			case <-cmdExited:
-				return
-			case <-ticker.C:
-			}
-			var container Container
-			err := arv.Get("containers", uuid, nil, &container)
-			if err != nil {
-				log.Printf("Error getting container %v: %q", uuid, err)
-				continue
-			}
-			if container.Priority == 0 {
-				log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
-				cmd.Process.Signal(os.Interrupt)
-			}
+		select {
+		case <-cancel:
+			log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+			cmd.Process.Signal(os.Interrupt)
+			return
 		}
 	}()
 
@@ -221,29 +125,30 @@ func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
 	if _, err := cmd.Process.Wait(); err != nil {
 		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
 	}
-	close(cmdExited)
 
 	log.Printf("Finished container run for %v", uuid)
 }
 
-func setFinalState(uuid string) {
-	// The container state should now be 'Complete' if everything
-	// went well. If it started but crunch-run didn't change its
-	// final state to 'Running', fix that now. If it never even
-	// started, cancel it as unrunnable. (TODO: Requeue instead,
-	// and fix tests so they can tell something happened even if
-	// the final state is Queued.)
-	var container Container
-	err := arv.Get("containers", uuid, nil, &container)
+func setFinalState(dispatcher *DispatcherState,
+	container Container,
+	cancel chan struct{}) {
+
+	uuid := container.UUID
+
+	// The container state should be 'Complete' if everything went well.
+	// If the container is "Running" or "Locked", that's an error, so
+	// change it to "Cancelled".  TODO: perhaps this should be "Error"
+	// state instead?
+	err := dispatcher.arv.Get("containers", uuid, nil, &container)
 	if err != nil {
 		log.Printf("Error getting final container state: %v", err)
 	}
 	fixState := map[string]string{
-		"Running": "Complete",
+		"Running": "Cancelled",
 		"Locked":  "Cancelled",
 	}
 	if newState, ok := fixState[container.State]; ok {
 		log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
-		updateState(uuid, newState)
+		dispatcher.UpdateState(uuid, newState)
 	}
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 53e4705..d6359ac 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -24,14 +24,7 @@ func main() {
 	}
 }
 
-var (
-	arv              arvadosclient.ArvadosClient
-	runningCmds      map[string]*exec.Cmd
-	runningCmdsMutex sync.Mutex
-	waitGroup        sync.WaitGroup
-	doneProcessing   chan bool
-	sigChan          chan os.Signal
-)
+var crunchRunCommand *string
 
 func doMain() error {
 	flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
@@ -41,12 +34,7 @@ func doMain() error {
 		10,
 		"Interval in seconds to poll for queued containers")
 
-	priorityPollInterval := flags.Int(
-		"container-priority-poll-interval",
-		60,
-		"Interval in seconds to check priority of a dispatched container")
-
-	crunchRunCommand := flags.String(
+	crunchRunCommand = flags.String(
 		"crunch-run-command",
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
@@ -87,15 +75,6 @@ func doMain() error {
 	return nil
 }
 
-type apiClientAuthorization struct {
-	UUID     string `json:"uuid"`
-	APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
-	Items []apiClientAuthorization `json:"items"`
-}
-
 // Poll for queued containers using pollInterval.
 // Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
@@ -122,26 +101,13 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman
 	}
 }
 
-// Container data
-type Container struct {
-	UUID               string           `json:"uuid"`
-	State              string           `json:"state"`
-	Priority           int              `json:"priority"`
-	RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-	LockedByUUID       string           `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-	Items []Container `json:"items"`
-}
-
 // Get the list of queued containers from API server and invoke run
 // for each container.
 func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
 	params := arvadosclient.Dict{
-		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
-	}
+		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}},
+			[]string{"priority", ">", "0"}},
+		"order": []string{"priority desc"}}
 
 	var containers ContainerList
 	err := arv.List("containers", params, &containers)
@@ -150,28 +116,6 @@ func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crun
 		return
 	}
 
-	for _, container := range containers.Items {
-		if container.State == "Locked" {
-			if container.LockedByUUID != auth.UUID {
-				// Locked by a different dispatcher
-				continue
-			} else if checkMine(container.UUID) {
-				// I already have a goroutine running
-				// for this container: it just hasn't
-				// gotten past Locked state yet.
-				continue
-			}
-			log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
-				"Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
-				container.UUID, auth.UUID)
-			setMine(container.UUID, true)
-			go func() {
-				waitContainer(container, pollInterval)
-				setMine(container.UUID, false)
-			}()
-		}
-		go run(container, crunchRunCommand, finishCommand, pollInterval)
-	}
 }
 
 // sbatchCmd
@@ -291,21 +235,24 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
 	}
 }
 
+func updateState(uuid, newState string) error {
+	err := arv.Update("containers", uuid,
+		arvadosclient.Dict{
+			"container": arvadosclient.Dict{"state": newState}},
+		nil)
+	if err != nil {
+		log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+	}
+	return err
+}
+
 // Run a queued container: [1] Set container state to locked. [2]
 // Execute crunch-run as a slurm batch job. [3] waitContainer().
 func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
 	setMine(container.UUID, true)
 	defer setMine(container.UUID, false)
 
-	// Update container status to Locked. This will fail if
-	// another dispatcher (token) has already locked it. It will
-	// succeed if *this* dispatcher has already locked it.
-	err := arv.Update("containers", container.UUID,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Locked"}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
+	if err := updateState(uuid, "Locked"); err != nil {
 		return
 	}
 
@@ -323,16 +270,6 @@ func run(container Container, crunchRunCommand, finishCommand string, pollInterv
 	}
 	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 
-	// Update container status to Running. This will fail if
-	// another dispatcher (token) has already locked it. It will
-	// succeed if *this* dispatcher has already locked it.
-	err = arv.Update("containers", container.UUID,
-		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Running"}},
-		nil)
-	if err != nil {
-		log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
-	}
 	log.Printf("Submitted container %v to slurm", container.UUID)
 	waitContainer(container, pollInterval)
 }
@@ -407,26 +344,3 @@ func checkSqueue(uuid string) (bool, error) {
 	}
 	return found, nil
 }
-
-var mineMutex sync.RWMutex
-var mineMap = make(map[string]bool)
-
-// Goroutine-safely add/remove uuid to the set of "my" containers,
-// i.e., ones for which this process has a goroutine running.
-func setMine(uuid string, t bool) {
-	mineMutex.Lock()
-	if t {
-		mineMap[uuid] = true
-	} else {
-		delete(mineMap, uuid)
-	}
-	mineMutex.Unlock()
-}
-
-// Check whether there is already a goroutine running for this
-// container.
-func checkMine(uuid string) bool {
-	mineMutex.RLocker().Lock()
-	defer mineMutex.RLocker().Unlock()
-	return mineMap[uuid]
-}

commit 24c703ad88b0c8a68484c08686e39b75017ddda1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 19 14:12:42 2016 -0400

    9187: Add filters

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 4023870..959164e 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -121,8 +121,9 @@ type ContainerList struct {
 // Get the list of queued containers from API server and invoke run for each container.
 func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
 	params := arvadosclient.Dict{
-		"filters": [][]string{[]string{"state", "=", "Queued"}},
-	}
+		"filters": [][]string{[]string{"state", "=", "Queued"},
+			[]string{"priority", ">", "0"}},
+		"order": []string{"priority desc"}}
 
 	var containers ContainerList
 	err := arv.List("containers", params, &containers)
@@ -172,8 +173,8 @@ func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
 	// succeed in starting crunch-run.
 	runningCmdsMutex.Lock()
 	if err := cmd.Start(); err != nil {
-		log.Printf("Error starting crunch-run for %v: %q", uuid, err)
 		runningCmdsMutex.Unlock()
+		log.Printf("Error starting crunch-run for %v: %q", uuid, err)
 		updateState(uuid, "Queued")
 		return
 	}
@@ -191,8 +192,6 @@ func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
 
 	log.Printf("Starting container %v", uuid)
 
-	updateState(uuid, "Running")
-
 	cmdExited := make(chan struct{})
 
 	// Kill the child process if container priority changes to zero
@@ -241,7 +240,7 @@ func setFinalState(uuid string) {
 	}
 	fixState := map[string]string{
 		"Running": "Complete",
-		"Locked": "Cancelled",
+		"Locked":  "Cancelled",
 	}
 	if newState, ok := fixState[container.State]; ok {
 		log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list