[ARVADOS] created: 2757c83be8fefbd5ac6a70970ab03d7803569b58

Git user git at public.curoverse.com
Thu May 5 17:51:10 EDT 2016


        at  2757c83be8fefbd5ac6a70970ab03d7803569b58 (commit)


commit 2757c83be8fefbd5ac6a70970ab03d7803569b58
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 5 17:50:44 2016 -0400

    8128: Update crunch-dispatch-local to use new Locked state.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index e05c0c5..70fe640 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -72,7 +72,7 @@ func doMain() error {
 	}(sigChan)
 
 	// Run all queued containers
-	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+	runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
 
 	// Finished dispatching; interrupt any crunch jobs that are still running
 	for _, cmd := range runningCmds {
@@ -91,8 +91,8 @@ func doMain() error {
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
-	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+	ticker := time.NewTicker(pollInterval)
 
 	for {
 		select {
@@ -118,7 +118,7 @@ type ContainerList struct {
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
@@ -133,7 +133,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 	for i := 0; i < len(containers.Items); i++ {
 		log.Printf("About to run queued container %v", containers.Items[i].UUID)
 		// Run the container
-		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+		go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
 	}
 }
 
@@ -142,7 +142,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 // Run container using the given crunch-run command
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
 	cmd := exec.Command(crunchRunCommand, uuid)
 
 	cmd.Stdin = nil
@@ -158,7 +158,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 	runningCmds[uuid] = cmd
 	runningCmdsMutex.Unlock()
 
-	log.Printf("Started container run for %v", uuid)
+	log.Printf("Starting container %v", uuid)
 
 	// Add this crunch job to waitGroup
 	waitGroup.Add(1)
@@ -167,41 +167,48 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 	// Update container status to Running
 	err := arv.Update("containers", uuid,
 		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Running"}},
+			"container": arvadosclient.Dict{"state": "Locked"}},
 		nil)
 	if err != nil {
-		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+		log.Printf("Error updating container %v to 'Locked' state: %v", uuid, err)
 	}
 
-	// A goroutine to terminate the runner if container priority becomes zero
-	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+	cmdExited := make(chan struct{})
+
+	// Kill the child process if container priority changes to zero
 	go func() {
-		for _ = range priorityTicker.C {
+		ticker := time.NewTicker(pollInterval)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-cmdExited:
+				return
+			case <-ticker.C:
+			}
 			var container Container
 			err := arv.Get("containers", uuid, nil, &container)
 			if err != nil {
-				log.Printf("Error getting container info for %v: %q", uuid, err)
-			} else {
-				if container.Priority == 0 {
-					priorityTicker.Stop()
-					cmd.Process.Signal(os.Interrupt)
-				}
+				log.Printf("Error getting container %v: %q", uuid, err)
+				continue
+			}
+			if container.Priority == 0 {
+				log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+				cmd.Process.Signal(os.Interrupt)
 			}
 		}
 	}()
 
-	// Wait for the crunch job to exit
+	// Wait for crunch-run to exit
 	if _, err := cmd.Process.Wait(); err != nil {
 		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
 	}
+	close(cmdExited)
 
 	// Remove the crunch job to runningCmds
 	runningCmdsMutex.Lock()
 	delete(runningCmds, uuid)
 	runningCmdsMutex.Unlock()
 
-	priorityTicker.Stop()
-
 	// The container state should be 'Complete'
 	var container Container
 	err = arv.Get("containers", uuid, nil, &container)

commit 8285405a41257da35a482f87add07c7692548703
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 5 17:15:51 2016 -0400

    8128: Update crunch-dispatch-slurm to use new Locked state.

diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index bebef79..84a3bff 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -13,6 +13,9 @@ const (
 	FooBarDirCollection   = "zzzzz-4zz18-foonbarfilesdir"
 	FooPdh                = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
 	HelloWorldPdh         = "55713e6a34081eb03609e7ad5fcad129+62"
+
+	Dispatch1Token    = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
+	Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
 )
 
 // A valid manifest designed to test various edge cases and parsing
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 5856edd..a014552 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -27,6 +27,7 @@ class Container < ArvadosModel
     t.add :environment
     t.add :exit_code
     t.add :finished_at
+    t.add :locked_by_uuid
     t.add :log
     t.add :mounts
     t.add :output
@@ -74,6 +75,13 @@ class Container < ArvadosModel
     end
   end
 
+  def locked_by_uuid
+    # Stub to permit a single dispatch to recognize its own containers
+    if current_user.is_admin
+      Thread.current[:api_client_authorization].andand.uuid
+    end
+  end
+
   protected
 
   def fill_field_defaults
diff --git a/services/api/test/fixtures/api_client_authorizations.yml b/services/api/test/fixtures/api_client_authorizations.yml
index f99a9fb..485b6d1 100644
--- a/services/api/test/fixtures/api_client_authorizations.yml
+++ b/services/api/test/fixtures/api_client_authorizations.yml
@@ -271,3 +271,9 @@ fuse:
   api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
   expires_at: 2038-01-01 00:00:00
 
+dispatch1:
+  uuid: zzzzz-gj3su-k9dvestay1plssr
+  api_client: trusted
+  user: system_user
+  api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
+  expires_at: 2038-01-01 00:00:00
\ No newline at end of file
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 8fbc0fa..a76b4e9 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"bufio"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -84,6 +85,15 @@ func doMain() error {
 	return nil
 }
 
+type apiClientAuthorization struct {
+	UUID     string `json:"uuid"`
+	APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+	Items []apiClientAuthorization `json:"items"`
+}
+
 // Poll for queued containers using pollInterval.
 // Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
@@ -91,12 +101,21 @@ func doMain() error {
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
-	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+	var authList apiClientAuthorizationList
+	err := arv.List("api_client_authorizations", map[string]interface{}{
+		"filters": [][]interface{}{{"api_token", "=", arv.ApiToken}},
+	}, &authList)
+	if err != nil || len(authList.Items) != 1 {
+		log.Printf("Error getting my token UUID: %v (%d)", err, len(authList.Items))
+		return
+	}
+	auth := authList.Items[0]
 
+	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
 	for {
 		select {
 		case <-ticker.C:
-			dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
+			dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
 		case <-doneProcessing:
 			ticker.Stop()
 			return
@@ -106,9 +125,10 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman
 
 // Container data
 type Container struct {
-	UUID     string `json:"uuid"`
-	State    string `json:"state"`
-	Priority int    `json:"priority"`
+	UUID         string `json:"uuid"`
+	State        string `json:"state"`
+	Priority     int    `json:"priority"`
+	LockedByUUID string `json:"locked_by_uuid"`
 }
 
 // ContainerList is a list of the containers from api
@@ -116,10 +136,11 @@ type ContainerList struct {
 	Items []Container `json:"items"`
 }
 
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
+// Get the list of queued containers from API server and invoke run
+// for each container.
+func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
 	params := arvadosclient.Dict{
-		"filters": [][]string{[]string{"state", "=", "Queued"}},
+		"filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
 	}
 
 	var containers ContainerList
@@ -129,10 +150,25 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand str
 		return
 	}
 
-	for i := 0; i < len(containers.Items); i++ {
-		log.Printf("About to submit queued container %v", containers.Items[i].UUID)
-		// Run the container
-		go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+	for _, container := range containers.Items {
+		if container.State == "Locked" {
+			if container.LockedByUUID != auth.UUID {
+				// Locked by a different dispatcher
+				continue
+			} else if checkMine(container.UUID) {
+				// I already have a goroutine running
+				// for this container: it just hasn't
+				// gotten past Locked state yet.
+				continue
+			}
+			log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. Assuming it was left behind by a previous dispatch process, and waiting for it to finish.", container.UUID, auth.UUID)
+			setMine(container.UUID, true)
+			go func() {
+				waitContainer(container, pollInterval)
+				setMine(container.UUID, false)
+			}()
+		}
+		go run(container, crunchRunCommand, finishCommand, pollInterval)
 	}
 }
 
@@ -155,17 +191,19 @@ var striggerCmd = striggerFunc
 func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
 	submitErr = nil
 
-	// Mark record as complete if anything errors out.
 	defer func() {
-		if submitErr != nil {
-			// This really should be an "Error" state, see #8018
-			updateErr := arv.Update("containers", container.UUID,
-				arvadosclient.Dict{
-					"container": arvadosclient.Dict{"state": "Complete"}},
-				nil)
-			if updateErr != nil {
-				log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
-			}
+		// If we didn't get as far as submitting a slurm job,
+		// unlock the container and return it to the queue.
+		if submitErr == nil {
+			// OK, no cleanup needed
+			return
+		}
+		err := arv.Update("containers", container.UUID,
+			arvadosclient.Dict{
+				"container": arvadosclient.Dict{"state": "Queued"}},
+			nil)
+		if err != nil {
+			log.Printf("Error unlocking container %s: %v", container.UUID, err)
 		}
 	}()
 
@@ -198,6 +236,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 	stdoutChan := make(chan []byte)
 	go func() {
 		b, _ := ioutil.ReadAll(stdoutReader)
+		stdoutReader.Close()
 		stdoutChan <- b
 		close(stdoutChan)
 	}()
@@ -205,6 +244,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
 	stderrChan := make(chan []byte)
 	go func() {
 		b, _ := ioutil.ReadAll(stderrReader)
+		stderrReader.Close()
 		stderrChan <- b
 		close(stderrChan)
 	}()
@@ -239,18 +279,23 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
 	err := cmd.Run()
 	if err != nil {
 		log.Printf("While setting up strigger: %v", err)
+		// BUG: we drop the error here and forget about it. A
+		// human has to notice the container is stuck in
+		// Running state, and fix it manually.
 	}
 }
 
-// Run a queued container.
-// Set container state to locked (TBD)
-// Submit job to slurm to execute crunch-run command for the container
-// If the container priority becomes zero while crunch job is still running, cancel the job.
-func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+// Run a queued container: [1] Set container state to locked. [2]
+// Execute crunch-run as a slurm batch job. [3] waitContainer().
+func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
+	setMine(container.UUID, true)
+	defer setMine(container.UUID, false)
+
+	log.Printf("About to submit queued container %v", container.UUID)
 
 	jobid, err := submit(container, crunchRunCommand)
 	if err != nil {
-		log.Printf("Error queuing container run: %v", err)
+		log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
 		return
 	}
 
@@ -260,41 +305,111 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
 	}
 	finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 
-	// Update container status to Running, this is a temporary workaround
-	// to avoid resubmitting queued containers because record locking isn't
-	// implemented yet.
+	// Update container status to Locked. This will fail if
+	// another dispatcher (token) has already locked it. It will
+	// succeed if *this* dispatcher has already locked it.
 	err = arv.Update("containers", container.UUID,
 		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Running"}},
+			"container": arvadosclient.Dict{"state": "Locked"}},
 		nil)
 	if err != nil {
-		log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
+		log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
+		return
 	}
+	log.Printf("Submitted container %v to slurm", container.UUID)
+	waitContainer(container, pollInterval)
+}
 
-	log.Printf("Submitted container run for %v", container.UUID)
-
-	containerUUID := container.UUID
+// Wait for a container to finish. Cancel the slurm job if the
+// container priority changes to zero before it ends.
+func waitContainer(container Container, pollInterval time.Duration) {
+	log.Printf("Monitoring container %v started", container.UUID)
+	defer log.Printf("Monitoring container %v finished", container.UUID)
+
+	pollTicker := time.NewTicker(pollInterval)
+	defer pollTicker.Stop()
+	for _ = range pollTicker.C {
+		var updated Container
+		err := arv.Get("containers", container.UUID, nil, &updated)
+		if err != nil {
+			log.Printf("Error getting container %s: %q", container.UUID, err)
+			continue
+		}
+		if updated.State == "Complete" || updated.State == "Cancelled" {
+			return
+		}
+		if updated.Priority != 0 {
+			continue
+		}
 
-	// A goroutine to terminate the runner if container priority becomes zero
-	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
-	go func() {
-		for _ = range priorityTicker.C {
-			var container Container
-			err := arv.Get("containers", containerUUID, nil, &container)
-			if err != nil {
-				log.Printf("Error getting container info for %v: %q", container.UUID, err)
-			} else {
-				if container.Priority == 0 {
-					log.Printf("Canceling container %v", container.UUID)
-					priorityTicker.Stop()
-					cancelcmd := exec.Command("scancel", "--name="+container.UUID)
-					cancelcmd.Run()
-				}
-				if container.State == "Complete" {
-					priorityTicker.Stop()
-				}
+		// Priority is zero, but state is Running or Locked
+		log.Printf("Canceling container %s", container.UUID)
+
+		err = exec.Command("scancel", "--name="+container.UUID).Run()
+		if err != nil {
+			log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+			if inQ, err := checkSqueue(container.UUID); err != nil {
+				log.Printf("Error running squeue: %v", err)
+				continue
+			} else if inQ {
+				log.Printf("Container %s is still in squeue; will retry", container.UUID)
+				continue
 			}
 		}
-	}()
 
+		err = arv.Update("containers", container.UUID,
+			arvadosclient.Dict{
+				"container": arvadosclient.Dict{"state": "Cancelled"}},
+			nil)
+		if err != nil {
+			log.Printf("Error updating state for container %s: %s", container.UUID, err)
+			continue
+		}
+
+		return
+	}
+}
+
+func checkSqueue(uuid string) (bool, error) {
+	cmd := exec.Command("squeue", "--format=%j")
+	sq, err := cmd.StdoutPipe()
+	if err != nil {
+		return false, err
+	}
+	cmd.Start()
+	defer cmd.Wait()
+	scanner := bufio.NewScanner(sq)
+	found := false
+	for scanner.Scan() {
+		if scanner.Text() == uuid {
+			found = true
+		}
+	}
+	if err := scanner.Err(); err != nil {
+		return false, err
+	}
+	return found, nil
+}
+
+var mineMutex sync.RWMutex
+var mineMap = make(map[string]bool)
+
+// Goroutine-safely add/remove uuid to the set of "my" containers,
+// i.e., ones for which this process has a goroutine running.
+func setMine(uuid string, t bool) {
+	mineMutex.Lock()
+	if t {
+		mineMap[uuid] = true
+	} else {
+		delete(mineMap, uuid)
+	}
+	mineMutex.Unlock()
+}
+
+// Check whether there is already a goroutine running for this
+// container.
+func checkMine(uuid string) bool {
+	mineMutex.RLocker().Lock()
+	defer mineMutex.RLocker().Unlock()
+	return mineMap[uuid]
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 7355cff..1ce1b8b 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -4,7 +4,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 
-	"io/ioutil"
+	"bytes"
 	"log"
 	"net/http"
 	"net/http/httptest"
@@ -49,6 +49,7 @@ func (s *TestSuite) SetUpTest(c *C) {
 	if err != nil {
 		c.Fatalf("Error making arvados client: %s", err)
 	}
+	os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
@@ -86,10 +87,12 @@ func (s *TestSuite) Test_doMain(c *C) {
 			apiHost, apiToken, apiInsecure).Args
 		go func() {
 			time.Sleep(5 * time.Second)
-			arv.Update("containers", containerUUID,
-				arvadosclient.Dict{
-					"container": arvadosclient.Dict{"state": "Complete"}},
-				nil)
+			for _, state := range []string{"Running", "Complete"} {
+				arv.Update("containers", containerUUID,
+					arvadosclient.Dict{
+						"container": arvadosclient.Dict{"state": state}},
+					nil)
+			}
 		}()
 		return exec.Command("echo", "strigger")
 	}
@@ -113,7 +116,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 
 	c.Check(sbatchCmdLine, DeepEquals, []string{"sbatch", "--job-name=zzzzz-dz642-queuedcontainer", "--share", "--parsable"})
 	c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
-		"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h 1 zzzzz-dz642-queuedcontainer"})
+		"--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
 
 	// There should be no queued containers now
 	err = arv.List("containers", params, &containers)
@@ -129,6 +132,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
 	apiStubResponses := make(map[string]arvadostest.StubResponse)
+	apiStubResponses["/arvados/v1/api_client_authorizations"] = arvadostest.StubResponse{200, string(`{"items":[{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}]}`)}
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
 	testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
@@ -148,18 +152,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 		Retries:   0,
 	}
 
-	tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
-	c.Check(err, IsNil)
-	defer os.Remove(tempfile.Name())
-	log.SetOutput(tempfile)
+	buf := bytes.NewBuffer(nil)
+	log.SetOutput(buf)
+	defer log.SetOutput(os.Stderr)
 
 	go func() {
-		time.Sleep(2 * time.Second)
+		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+			time.Sleep(100 * time.Millisecond)
+		}
 		sigChan <- syscall.SIGTERM
 	}()
 
 	runQueuedContainers(2, 1, crunchCmd, crunchCmd)
 
-	buf, _ := ioutil.ReadFile(tempfile.Name())
-	c.Check(strings.Contains(string(buf), expected), Equals, true)
+	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }

commit 0f8d4bdadd37522d0eb80f071bba8311c76fddf7
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 5 15:46:20 2016 -0400

    8128: Add Locked state to Container model.

diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 787047d..5856edd 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -42,6 +42,7 @@ class Container < ArvadosModel
   States =
     [
      (Queued = 'Queued'),
+     (Locked = 'Locked'),
      (Running = 'Running'),
      (Complete = 'Complete'),
      (Cancelled = 'Cancelled')
@@ -49,7 +50,8 @@ class Container < ArvadosModel
 
   State_transitions = {
     nil => [Queued],
-    Queued => [Running, Cancelled],
+    Queued => [Locked, Cancelled],
+    Locked => [Queued, Running, Cancelled],
     Running => [Complete, Cancelled]
   }
 
@@ -102,47 +104,40 @@ class Container < ArvadosModel
   end
 
   def validate_change
-    permitted = []
+    permitted = [:state]
 
     if self.new_record?
-      permitted.push :owner_uuid, :command, :container_image, :cwd, :environment,
-                     :mounts, :output_path, :priority, :runtime_constraints, :state
+      permitted.push(:owner_uuid, :command, :container_image, :cwd,
+                     :environment, :mounts, :output_path, :priority,
+                     :runtime_constraints)
     end
 
     case self.state
-    when Queued
-      # permit priority change only.
+    when Queued, Locked
       permitted.push :priority
 
     when Running
+      permitted.push :priority, :progress
       if self.state_changed?
-        # At point of state change, can set state and started_at
-        permitted.push :state, :started_at
-      else
-        # While running, can update priority and progress.
-        permitted.push :priority, :progress
+        permitted.push :started_at
       end
 
     when Complete
-      if self.state_changed?
-        permitted.push :state, :finished_at, :output, :log, :exit_code
-      else
-        errors.add :state, "cannot update record"
+      if self.state_was == Running
+        permitted.push :finished_at, :output, :log, :exit_code
       end
 
     when Cancelled
-      if self.state_changed?
-        if self.state_was == Running
-          permitted.push :state, :finished_at, :output, :log
-        elsif self.state_was == Queued
-          permitted.push :state, :finished_at
-        end
-      else
-        errors.add :state, "cannot update record"
+      case self.state_was
+      when Running
+        permitted.push :finished_at, :output, :log
+      when Queued, Locked
+        permitted.push :finished_at
       end
 
     else
-      errors.add :state, "invalid state"
+      # The state_transitions check will add an error message for this
+      return false
     end
 
     check_update_whitelist permitted
diff --git a/services/api/lib/whitelist_update.rb b/services/api/lib/whitelist_update.rb
index a81f992..8fccd0f 100644
--- a/services/api/lib/whitelist_update.rb
+++ b/services/api/lib/whitelist_update.rb
@@ -2,7 +2,7 @@ module WhitelistUpdate
   def check_update_whitelist permitted_fields
     attribute_names.each do |field|
       if not permitted_fields.include? field.to_sym and self.send((field.to_s + "_changed?").to_sym)
-        errors.add field, "illegal update of field"
+        errors.add field, "cannot be modified in this state"
       end
     end
   end
@@ -10,7 +10,7 @@ module WhitelistUpdate
   def validate_state_change
     if self.state_changed?
       unless state_transitions[self.state_was].andand.include? self.state
-        errors.add :state, "invalid state change from #{self.state_was} to #{self.state}"
+        errors.add :state, "cannot change from #{self.state_was} to #{self.state}"
         return false
       end
     end
diff --git a/services/api/test/unit/container_request_test.rb b/services/api/test/unit/container_request_test.rb
index d0def57..701147c 100644
--- a/services/api/test/unit/container_request_test.rb
+++ b/services/api/test/unit/container_request_test.rb
@@ -306,18 +306,18 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal "Committed", cr.state
 
     c = Container.find_by_uuid cr.container_uuid
-    assert_equal "Queued", c.state
+    assert_equal Container::Queued, c.state
 
     act_as_system_user do
-      c.state = "Running"
-      c.save!
+      c.update_attributes! state: Container::Locked
+      c.update_attributes! state: Container::Running
     end
 
     cr.reload
     assert_equal "Committed", cr.state
 
     act_as_system_user do
-      c.state = "Complete"
+      c.update_attributes! state: Container::Complete
       c.save!
     end
 
diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index a25f2af..84713c2 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -9,91 +9,42 @@ class ContainerTest < ActiveSupport::TestCase
     c
   end
 
-  def check_illegal_modify c
-    c.reload
-    c.command = ["echo", "bar"]
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.container_image = "img2"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.cwd = "/tmp2"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.environment = {"FOO" => "BAR"}
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.mounts = {"FOO" => "BAR"}
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+  def show_errors c
+    return lambda { c.errors.full_messages.inspect }
+  end
 
-    c.reload
-    c.output_path = "/tmp3"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
+  def check_illegal_updates c, bad_updates
+    bad_updates.each do |u|
+      refute c.update_attributes(u), u.inspect
+      refute c.valid?
+      c.reload
     end
+  end
 
-    c.reload
-    c.runtime_constraints = {"FOO" => "BAR"}
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+  def check_illegal_modify c
+    check_illegal_updates c, [{command: ["echo", "bar"]},
+                              {container_image: "img2"},
+                              {cwd: "/tmp2"},
+                              {environment: {"FOO" => "BAR"}},
+                              {mounts: {"FOO" => "BAR"}},
+                              {output_path: "/tmp3"},
+                              {runtime_constraints: {"FOO" => "BAR"}}]
   end
 
   def check_bogus_states c
-    c.reload
-    c.state = nil
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Flubber"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+    check_illegal_updates c, [{state: nil},
+                              {state: "Flubber"}]
   end
 
-  def check_no_change_from_complete c
+  def check_no_change_from_cancelled c
     check_illegal_modify c
     check_bogus_states c
 
-    c.reload
-    c.priority = 3
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Queued"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Running"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
-
-    c.reload
-    c.state = "Complete"
-    assert_raises(ActiveRecord::RecordInvalid) do
-      c.save!
-    end
+    check_illegal_updates c, [{ priority: 3 },
+                              { state: Container::Queued },
+                              { state: Container::Locked },
+                              { state: Container::Running },
+                              { state: Container::Complete }]
   end
 
   test "Container create" do
@@ -120,58 +71,79 @@ class ContainerTest < ActiveSupport::TestCase
       c = minimal_new
       c.save!
 
-      c.reload
-      c.state = "Complete"
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
+      check_illegal_updates c, [{state: Container::Running},
+                                {state: Container::Complete}]
 
-      c.reload
-      c.state = "Running"
-      c.save!
+      c.update_attributes! state: Container::Locked
+      c.update_attributes! state: Container::Running
 
       check_illegal_modify c
       check_bogus_states c
 
+      check_illegal_updates c, [{state: Container::Queued}]
       c.reload
-      c.state = "Queued"
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
 
-      c.reload
-      c.priority = 3
-      c.save!
+      c.update_attributes! priority: 3
     end
   end
 
-  test "Container queued cancel" do
+  test "Lock and unlock" do
     act_as_system_user do
       c = minimal_new
       c.save!
+      assert_equal Container::Queued, c.state
 
+      refute c.update_attributes(state: Container::Running), "not locked"
+      c.reload
+      refute c.update_attributes(state: Container::Complete), "not locked"
       c.reload
-      c.state = "Cancelled"
-      c.save!
 
-      check_no_change_from_complete c
+      assert c.update_attributes(state: Container::Locked), show_errors(c)
+      assert c.update_attributes(state: Container::Queued), show_errors(c)
+
+      refute c.update_attributes(state: Container::Running), "not locked"
+      c.reload
+
+      assert c.update_attributes(state: Container::Locked), show_errors(c)
+      assert c.update_attributes(state: Container::Running), show_errors(c)
+
+      refute c.update_attributes(state: Container::Locked), "already running"
+      c.reload
+      refute c.update_attributes(state: Container::Queued), "already running"
+      c.reload
+
+      assert c.update_attributes(state: Container::Complete), show_errors(c)
     end
   end
 
-  test "Container running cancel" do
+  test "Container queued cancel" do
     act_as_system_user do
       c = minimal_new
       c.save!
+      assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+      check_no_change_from_cancelled c
+    end
+  end
 
-      c.reload
-      c.state = "Running"
+  test "Container locked cancel" do
+    act_as_system_user do
+      c = minimal_new
       c.save!
+      assert c.update_attributes(state: Container::Locked), show_errors(c)
+      assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+      check_no_change_from_cancelled c
+    end
+  end
 
-      c.reload
-      c.state = "Cancelled"
+  test "Container running cancel" do
+    act_as_system_user do
+      c = minimal_new
       c.save!
-
-      check_no_change_from_complete c
+      c.update_attributes! state: Container::Queued
+      c.update_attributes! state: Container::Locked
+      c.update_attributes! state: Container::Running
+      c.update_attributes! state: Container::Cancelled
+      check_no_change_from_cancelled c
     end
   end
 
@@ -192,28 +164,13 @@ class ContainerTest < ActiveSupport::TestCase
     act_as_system_user do
       c = minimal_new
       c.save!
+      c.update_attributes! state: Container::Locked
+      c.update_attributes! state: Container::Running
 
-      c.reload
-      c.state = "Running"
-      c.save!
-
-      c.reload
-      c.exit_code = 1
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
+      check_illegal_updates c, [{exit_code: 1},
+                                {exit_code: 1, state: Container::Cancelled}]
 
-      c.reload
-      c.exit_code = 1
-      c.state = "Cancelled"
-      assert_raises(ActiveRecord::RecordInvalid) do
-        c.save!
-      end
-
-      c.reload
-      c.exit_code = 1
-      c.state = "Complete"
-      c.save!
+      assert c.update_attributes(exit_code: 1, state: Container::Complete)
     end
   end
 end

commit aa76afd84456af352ba78f6e2b2d9e315bb60687
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Apr 28 11:16:50 2016 -0400

    8128: De-dup container unit tests

diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index 0cac6ac..a25f2af 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -1,6 +1,14 @@
 require 'test_helper'
 
 class ContainerTest < ActiveSupport::TestCase
+  def minimal_new
+    c = Container.new
+    c.command = ["echo", "foo"]
+    c.container_image = "img"
+    c.output_path = "/tmp"
+    c
+  end
+
   def check_illegal_modify c
     c.reload
     c.command = ["echo", "bar"]
@@ -90,10 +98,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container create" do
     act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.cwd = "/tmp"
+      c = minimal_new
       c.environment = {}
       c.mounts = {"BAR" => "FOO"}
       c.output_path = "/tmp"
@@ -112,10 +117,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container running" do
     act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
+      c = minimal_new
       c.save!
 
       c.reload
@@ -145,10 +147,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container queued cancel" do
     act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
+      c = minimal_new
       c.save!
 
       c.reload
@@ -161,10 +160,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container running cancel" do
     act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
+      c = minimal_new
       c.save!
 
       c.reload
@@ -181,10 +177,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container create forbidden for non-admin" do
     set_user_from_auth :active_trustedclient
-    c = Container.new
-    c.command = ["echo", "foo"]
-    c.container_image = "img"
-    c.cwd = "/tmp"
+    c = minimal_new
     c.environment = {}
     c.mounts = {"BAR" => "FOO"}
     c.output_path = "/tmp"
@@ -197,10 +190,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container only set exit code on complete" do
     act_as_system_user do
-      c = Container.new
-      c.command = ["echo", "foo"]
-      c.container_image = "img"
-      c.output_path = "/tmp"
+      c = minimal_new
       c.save!
 
       c.reload

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list