[ARVADOS] created: 2757c83be8fefbd5ac6a70970ab03d7803569b58
Git user
git at public.curoverse.com
Thu May 5 17:51:10 EDT 2016
at 2757c83be8fefbd5ac6a70970ab03d7803569b58 (commit)
commit 2757c83be8fefbd5ac6a70970ab03d7803569b58
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 5 17:50:44 2016 -0400
8128: Update crunch-dispatch-local to use new Locked state.
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index e05c0c5..70fe640 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -72,7 +72,7 @@ func doMain() error {
}(sigChan)
// Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+ runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
// Finished dispatching; interrupt any crunch jobs that are still running
for _, cmd := range runningCmds {
@@ -91,8 +91,8 @@ func doMain() error {
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+ ticker := time.NewTicker(pollInterval)
for {
select {
@@ -118,7 +118,7 @@ type ContainerList struct {
}
// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
@@ -133,7 +133,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
for i := 0; i < len(containers.Items); i++ {
log.Printf("About to run queued container %v", containers.Items[i].UUID)
// Run the container
- go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
}
}
@@ -142,7 +142,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
// Run container using the given crunch-run command
// Set the container state to Running
// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
cmd := exec.Command(crunchRunCommand, uuid)
cmd.Stdin = nil
@@ -158,7 +158,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
runningCmds[uuid] = cmd
runningCmdsMutex.Unlock()
- log.Printf("Started container run for %v", uuid)
+ log.Printf("Starting container %v", uuid)
// Add this crunch job to waitGroup
waitGroup.Add(1)
@@ -167,41 +167,48 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
// Update container status to Running
err := arv.Update("containers", uuid,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Running"}},
+ "container": arvadosclient.Dict{"state": "Locked"}},
nil)
if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+ log.Printf("Error updating container %v to 'Locked' state: %v", uuid, err)
}
- // A goroutine to terminate the runner if container priority becomes zero
- priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+ cmdExited := make(chan struct{})
+
+ // Kill the child process if container priority changes to zero
go func() {
- for _ = range priorityTicker.C {
+ ticker := time.NewTicker(pollInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-cmdExited:
+ return
+ case <-ticker.C:
+ }
var container Container
err := arv.Get("containers", uuid, nil, &container)
if err != nil {
- log.Printf("Error getting container info for %v: %q", uuid, err)
- } else {
- if container.Priority == 0 {
- priorityTicker.Stop()
- cmd.Process.Signal(os.Interrupt)
- }
+ log.Printf("Error getting container %v: %q", uuid, err)
+ continue
+ }
+ if container.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
}
}
}()
- // Wait for the crunch job to exit
+ // Wait for crunch-run to exit
if _, err := cmd.Process.Wait(); err != nil {
log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
}
+ close(cmdExited)
// Remove the crunch job to runningCmds
runningCmdsMutex.Lock()
delete(runningCmds, uuid)
runningCmdsMutex.Unlock()
- priorityTicker.Stop()
-
// The container state should be 'Complete'
var container Container
err = arv.Get("containers", uuid, nil, &container)
commit 8285405a41257da35a482f87add07c7692548703
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 5 17:15:51 2016 -0400
8128: Update crunch-dispatch-slurm to use new Locked state.
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index bebef79..84a3bff 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -13,6 +13,9 @@ const (
FooBarDirCollection = "zzzzz-4zz18-foonbarfilesdir"
FooPdh = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62"
+
+ Dispatch1Token = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
+ Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
)
// A valid manifest designed to test various edge cases and parsing
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 5856edd..a014552 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -27,6 +27,7 @@ class Container < ArvadosModel
t.add :environment
t.add :exit_code
t.add :finished_at
+ t.add :locked_by_uuid
t.add :log
t.add :mounts
t.add :output
@@ -74,6 +75,13 @@ class Container < ArvadosModel
end
end
+ def locked_by_uuid
+ # Stub to permit a single dispatch to recognize its own containers
+ if current_user.is_admin
+ Thread.current[:api_client_authorization].andand.uuid
+ end
+ end
+
protected
def fill_field_defaults
diff --git a/services/api/test/fixtures/api_client_authorizations.yml b/services/api/test/fixtures/api_client_authorizations.yml
index f99a9fb..485b6d1 100644
--- a/services/api/test/fixtures/api_client_authorizations.yml
+++ b/services/api/test/fixtures/api_client_authorizations.yml
@@ -271,3 +271,9 @@ fuse:
api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
expires_at: 2038-01-01 00:00:00
+dispatch1:
+ uuid: zzzzz-gj3su-k9dvestay1plssr
+ api_client: trusted
+ user: system_user
+ api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
+ expires_at: 2038-01-01 00:00:00
\ No newline at end of file
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 8fbc0fa..a76b4e9 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -1,6 +1,7 @@
package main
import (
+ "bufio"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -84,6 +85,15 @@ func doMain() error {
return nil
}
+type apiClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+ Items []apiClientAuthorization `json:"items"`
+}
+
// Poll for queued containers using pollInterval.
// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
//
@@ -91,12 +101,21 @@ func doMain() error {
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+ var authList apiClientAuthorizationList
+ err := arv.List("api_client_authorizations", map[string]interface{}{
+ "filters": [][]interface{}{{"api_token", "=", arv.ApiToken}},
+ }, &authList)
+ if err != nil || len(authList.Items) != 1 {
+ log.Printf("Error getting my token UUID: %v (%d)", err, len(authList.Items))
+ return
+ }
+ auth := authList.Items[0]
+ ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
for {
select {
case <-ticker.C:
- dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
+ dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
case <-doneProcessing:
ticker.Stop()
return
@@ -106,9 +125,10 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman
// Container data
type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
@@ -116,10 +136,11 @@ type ContainerList struct {
Items []Container `json:"items"`
}
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
+// Get the list of queued containers from API server and invoke run
+// for each container.
+func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
params := arvadosclient.Dict{
- "filters": [][]string{[]string{"state", "=", "Queued"}},
+ "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
}
var containers ContainerList
@@ -129,10 +150,25 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand str
return
}
- for i := 0; i < len(containers.Items); i++ {
- log.Printf("About to submit queued container %v", containers.Items[i].UUID)
- // Run the container
- go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+ for _, container := range containers.Items {
+ if container.State == "Locked" {
+ if container.LockedByUUID != auth.UUID {
+ // Locked by a different dispatcher
+ continue
+ } else if checkMine(container.UUID) {
+ // I already have a goroutine running
+ // for this container: it just hasn't
+ // gotten past Locked state yet.
+ continue
+ }
+ log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. Assuming it was left behind by a previous dispatch process, and waiting for it to finish.", container.UUID, auth.UUID)
+ setMine(container.UUID, true)
+ go func() {
+ waitContainer(container, pollInterval)
+ setMine(container.UUID, false)
+ }()
+ }
+ go run(container, crunchRunCommand, finishCommand, pollInterval)
}
}
@@ -155,17 +191,19 @@ var striggerCmd = striggerFunc
func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
- // Mark record as complete if anything errors out.
defer func() {
- if submitErr != nil {
- // This really should be an "Error" state, see #8018
- updateErr := arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
- if updateErr != nil {
- log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
- }
+ // If we didn't get as far as submitting a slurm job,
+ // unlock the container and return it to the queue.
+ if submitErr == nil {
+ // OK, no cleanup needed
+ return
+ }
+ err := arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Queued"}},
+ nil)
+ if err != nil {
+ log.Printf("Error unlocking container %s: %v", container.UUID, err)
}
}()
@@ -198,6 +236,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
stdoutChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stdoutReader)
+ stdoutReader.Close()
stdoutChan <- b
close(stdoutChan)
}()
@@ -205,6 +244,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
stderrChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stderrReader)
+ stderrReader.Close()
stderrChan <- b
close(stderrChan)
}()
@@ -239,18 +279,23 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
err := cmd.Run()
if err != nil {
log.Printf("While setting up strigger: %v", err)
+ // BUG: we drop the error here and forget about it. A
+ // human has to notice the container is stuck in
+ // Running state, and fix it manually.
}
}
-// Run a queued container.
-// Set container state to locked (TBD)
-// Submit job to slurm to execute crunch-run command for the container
-// If the container priority becomes zero while crunch job is still running, cancel the job.
-func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+// Run a queued container: [1] Set container state to locked. [2]
+// Execute crunch-run as a slurm batch job. [3] waitContainer().
+func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
+ setMine(container.UUID, true)
+ defer setMine(container.UUID, false)
+
+ log.Printf("About to submit queued container %v", container.UUID)
jobid, err := submit(container, crunchRunCommand)
if err != nil {
- log.Printf("Error queuing container run: %v", err)
+ log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
return
}
@@ -260,41 +305,111 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
}
finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- // Update container status to Running, this is a temporary workaround
- // to avoid resubmitting queued containers because record locking isn't
- // implemented yet.
+ // Update container status to Locked. This will fail if
+ // another dispatcher (token) has already locked it. It will
+ // succeed if *this* dispatcher has already locked it.
err = arv.Update("containers", container.UUID,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Running"}},
+ "container": arvadosclient.Dict{"state": "Locked"}},
nil)
if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
+ log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
+ return
}
+ log.Printf("Submitted container %v to slurm", container.UUID)
+ waitContainer(container, pollInterval)
+}
- log.Printf("Submitted container run for %v", container.UUID)
-
- containerUUID := container.UUID
+// Wait for a container to finish. Cancel the slurm job if the
+// container priority changes to zero before it ends.
+func waitContainer(container Container, pollInterval time.Duration) {
+ log.Printf("Monitoring container %v started", container.UUID)
+ defer log.Printf("Monitoring container %v finished", container.UUID)
+
+ pollTicker := time.NewTicker(pollInterval)
+ defer pollTicker.Stop()
+ for _ = range pollTicker.C {
+ var updated Container
+ err := arv.Get("containers", container.UUID, nil, &updated)
+ if err != nil {
+ log.Printf("Error getting container %s: %q", container.UUID, err)
+ continue
+ }
+ if updated.State == "Complete" || updated.State == "Cancelled" {
+ return
+ }
+ if updated.Priority != 0 {
+ continue
+ }
- // A goroutine to terminate the runner if container priority becomes zero
- priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
- go func() {
- for _ = range priorityTicker.C {
- var container Container
- err := arv.Get("containers", containerUUID, nil, &container)
- if err != nil {
- log.Printf("Error getting container info for %v: %q", container.UUID, err)
- } else {
- if container.Priority == 0 {
- log.Printf("Canceling container %v", container.UUID)
- priorityTicker.Stop()
- cancelcmd := exec.Command("scancel", "--name="+container.UUID)
- cancelcmd.Run()
- }
- if container.State == "Complete" {
- priorityTicker.Stop()
- }
+ // Priority is zero, but state is Running or Locked
+ log.Printf("Canceling container %s", container.UUID)
+
+ err = exec.Command("scancel", "--name="+container.UUID).Run()
+ if err != nil {
+ log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+ if inQ, err := checkSqueue(container.UUID); err != nil {
+ log.Printf("Error running squeue: %v", err)
+ continue
+ } else if inQ {
+ log.Printf("Container %s is still in squeue; will retry", container.UUID)
+ continue
}
}
- }()
+ err = arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Cancelled"}},
+ nil)
+ if err != nil {
+ log.Printf("Error updating state for container %s: %s", container.UUID, err)
+ continue
+ }
+
+ return
+ }
+}
+
+func checkSqueue(uuid string) (bool, error) {
+ cmd := exec.Command("squeue", "--format=%j")
+ sq, err := cmd.StdoutPipe()
+ if err != nil {
+ return false, err
+ }
+ cmd.Start()
+ defer cmd.Wait()
+ scanner := bufio.NewScanner(sq)
+ found := false
+ for scanner.Scan() {
+ if scanner.Text() == uuid {
+ found = true
+ }
+ }
+ if err := scanner.Err(); err != nil {
+ return false, err
+ }
+ return found, nil
+}
+
+var mineMutex sync.RWMutex
+var mineMap = make(map[string]bool)
+
+// Goroutine-safely add/remove uuid to the set of "my" containers,
+// i.e., ones for which this process has a goroutine running.
+func setMine(uuid string, t bool) {
+ mineMutex.Lock()
+ if t {
+ mineMap[uuid] = true
+ } else {
+ delete(mineMap, uuid)
+ }
+ mineMutex.Unlock()
+}
+
+// Check whether there is already a goroutine running for this
+// container.
+func checkMine(uuid string) bool {
+ mineMutex.RLocker().Lock()
+ defer mineMutex.RLocker().Unlock()
+ return mineMap[uuid]
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 7355cff..1ce1b8b 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -4,7 +4,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "io/ioutil"
+ "bytes"
"log"
"net/http"
"net/http/httptest"
@@ -49,6 +49,7 @@ func (s *TestSuite) SetUpTest(c *C) {
if err != nil {
c.Fatalf("Error making arvados client: %s", err)
}
+ os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
func (s *TestSuite) TearDownTest(c *C) {
@@ -86,10 +87,12 @@ func (s *TestSuite) Test_doMain(c *C) {
apiHost, apiToken, apiInsecure).Args
go func() {
time.Sleep(5 * time.Second)
- arv.Update("containers", containerUUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
- nil)
+ for _, state := range []string{"Running", "Complete"} {
+ arv.Update("containers", containerUUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": state}},
+ nil)
+ }
}()
return exec.Command("echo", "strigger")
}
@@ -113,7 +116,7 @@ func (s *TestSuite) Test_doMain(c *C) {
c.Check(sbatchCmdLine, DeepEquals, []string{"sbatch", "--job-name=zzzzz-dz642-queuedcontainer", "--share", "--parsable"})
c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
- "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h 1 zzzzz-dz642-queuedcontainer"})
+ "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
// There should be no queued containers now
err = arv.List("containers", params, &containers)
@@ -129,6 +132,7 @@ func (s *TestSuite) Test_doMain(c *C) {
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
+ apiStubResponses["/arvados/v1/api_client_authorizations"] = arvadostest.StubResponse{200, string(`{"items":[{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}]}`)}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
@@ -148,18 +152,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
Retries: 0,
}
- tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
- c.Check(err, IsNil)
- defer os.Remove(tempfile.Name())
- log.SetOutput(tempfile)
+ buf := bytes.NewBuffer(nil)
+ log.SetOutput(buf)
+ defer log.SetOutput(os.Stderr)
go func() {
- time.Sleep(2 * time.Second)
+ for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+ time.Sleep(100 * time.Millisecond)
+ }
sigChan <- syscall.SIGTERM
}()
runQueuedContainers(2, 1, crunchCmd, crunchCmd)
- buf, _ := ioutil.ReadFile(tempfile.Name())
- c.Check(strings.Contains(string(buf), expected), Equals, true)
+ c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
commit 0f8d4bdadd37522d0eb80f071bba8311c76fddf7
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 5 15:46:20 2016 -0400
8128: Add Locked state to Container model.
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 787047d..5856edd 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -42,6 +42,7 @@ class Container < ArvadosModel
States =
[
(Queued = 'Queued'),
+ (Locked = 'Locked'),
(Running = 'Running'),
(Complete = 'Complete'),
(Cancelled = 'Cancelled')
@@ -49,7 +50,8 @@ class Container < ArvadosModel
State_transitions = {
nil => [Queued],
- Queued => [Running, Cancelled],
+ Queued => [Locked, Cancelled],
+ Locked => [Queued, Running, Cancelled],
Running => [Complete, Cancelled]
}
@@ -102,47 +104,40 @@ class Container < ArvadosModel
end
def validate_change
- permitted = []
+ permitted = [:state]
if self.new_record?
- permitted.push :owner_uuid, :command, :container_image, :cwd, :environment,
- :mounts, :output_path, :priority, :runtime_constraints, :state
+ permitted.push(:owner_uuid, :command, :container_image, :cwd,
+ :environment, :mounts, :output_path, :priority,
+ :runtime_constraints)
end
case self.state
- when Queued
- # permit priority change only.
+ when Queued, Locked
permitted.push :priority
when Running
+ permitted.push :priority, :progress
if self.state_changed?
- # At point of state change, can set state and started_at
- permitted.push :state, :started_at
- else
- # While running, can update priority and progress.
- permitted.push :priority, :progress
+ permitted.push :started_at
end
when Complete
- if self.state_changed?
- permitted.push :state, :finished_at, :output, :log, :exit_code
- else
- errors.add :state, "cannot update record"
+ if self.state_was == Running
+ permitted.push :finished_at, :output, :log, :exit_code
end
when Cancelled
- if self.state_changed?
- if self.state_was == Running
- permitted.push :state, :finished_at, :output, :log
- elsif self.state_was == Queued
- permitted.push :state, :finished_at
- end
- else
- errors.add :state, "cannot update record"
+ case self.state_was
+ when Running
+ permitted.push :finished_at, :output, :log
+ when Queued, Locked
+ permitted.push :finished_at
end
else
- errors.add :state, "invalid state"
+ # The state_transitions check will add an error message for this
+ return false
end
check_update_whitelist permitted
diff --git a/services/api/lib/whitelist_update.rb b/services/api/lib/whitelist_update.rb
index a81f992..8fccd0f 100644
--- a/services/api/lib/whitelist_update.rb
+++ b/services/api/lib/whitelist_update.rb
@@ -2,7 +2,7 @@ module WhitelistUpdate
def check_update_whitelist permitted_fields
attribute_names.each do |field|
if not permitted_fields.include? field.to_sym and self.send((field.to_s + "_changed?").to_sym)
- errors.add field, "illegal update of field"
+ errors.add field, "cannot be modified in this state"
end
end
end
@@ -10,7 +10,7 @@ module WhitelistUpdate
def validate_state_change
if self.state_changed?
unless state_transitions[self.state_was].andand.include? self.state
- errors.add :state, "invalid state change from #{self.state_was} to #{self.state}"
+ errors.add :state, "cannot change from #{self.state_was} to #{self.state}"
return false
end
end
diff --git a/services/api/test/unit/container_request_test.rb b/services/api/test/unit/container_request_test.rb
index d0def57..701147c 100644
--- a/services/api/test/unit/container_request_test.rb
+++ b/services/api/test/unit/container_request_test.rb
@@ -306,18 +306,18 @@ class ContainerRequestTest < ActiveSupport::TestCase
assert_equal "Committed", cr.state
c = Container.find_by_uuid cr.container_uuid
- assert_equal "Queued", c.state
+ assert_equal Container::Queued, c.state
act_as_system_user do
- c.state = "Running"
- c.save!
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
end
cr.reload
assert_equal "Committed", cr.state
act_as_system_user do
- c.state = "Complete"
+ c.update_attributes! state: Container::Complete
c.save!
end
diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index a25f2af..84713c2 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -9,91 +9,42 @@ class ContainerTest < ActiveSupport::TestCase
c
end
- def check_illegal_modify c
- c.reload
- c.command = ["echo", "bar"]
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.container_image = "img2"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.cwd = "/tmp2"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.environment = {"FOO" => "BAR"}
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.mounts = {"FOO" => "BAR"}
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ def show_errors c
+ return lambda { c.errors.full_messages.inspect }
+ end
- c.reload
- c.output_path = "/tmp3"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
+ def check_illegal_updates c, bad_updates
+ bad_updates.each do |u|
+ refute c.update_attributes(u), u.inspect
+ refute c.valid?
+ c.reload
end
+ end
- c.reload
- c.runtime_constraints = {"FOO" => "BAR"}
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ def check_illegal_modify c
+ check_illegal_updates c, [{command: ["echo", "bar"]},
+ {container_image: "img2"},
+ {cwd: "/tmp2"},
+ {environment: {"FOO" => "BAR"}},
+ {mounts: {"FOO" => "BAR"}},
+ {output_path: "/tmp3"},
+ {runtime_constraints: {"FOO" => "BAR"}}]
end
def check_bogus_states c
- c.reload
- c.state = nil
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Flubber"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ check_illegal_updates c, [{state: nil},
+ {state: "Flubber"}]
end
- def check_no_change_from_complete c
+ def check_no_change_from_cancelled c
check_illegal_modify c
check_bogus_states c
- c.reload
- c.priority = 3
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Queued"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Running"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.state = "Complete"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ check_illegal_updates c, [{ priority: 3 },
+ { state: Container::Queued },
+ { state: Container::Locked },
+ { state: Container::Running },
+ { state: Container::Complete }]
end
test "Container create" do
@@ -120,58 +71,79 @@ class ContainerTest < ActiveSupport::TestCase
c = minimal_new
c.save!
- c.reload
- c.state = "Complete"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ check_illegal_updates c, [{state: Container::Running},
+ {state: Container::Complete}]
- c.reload
- c.state = "Running"
- c.save!
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
check_illegal_modify c
check_bogus_states c
+ check_illegal_updates c, [{state: Container::Queued}]
c.reload
- c.state = "Queued"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
- c.reload
- c.priority = 3
- c.save!
+ c.update_attributes! priority: 3
end
end
- test "Container queued cancel" do
+ test "Lock and unlock" do
act_as_system_user do
c = minimal_new
c.save!
+ assert_equal Container::Queued, c.state
+ refute c.update_attributes(state: Container::Running), "not locked"
+ c.reload
+ refute c.update_attributes(state: Container::Complete), "not locked"
c.reload
- c.state = "Cancelled"
- c.save!
- check_no_change_from_complete c
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.update_attributes(state: Container::Queued), show_errors(c)
+
+ refute c.update_attributes(state: Container::Running), "not locked"
+ c.reload
+
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.update_attributes(state: Container::Running), show_errors(c)
+
+ refute c.update_attributes(state: Container::Locked), "already running"
+ c.reload
+ refute c.update_attributes(state: Container::Queued), "already running"
+ c.reload
+
+ assert c.update_attributes(state: Container::Complete), show_errors(c)
end
end
- test "Container running cancel" do
+ test "Container queued cancel" do
act_as_system_user do
c = minimal_new
c.save!
+ assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+ check_no_change_from_cancelled c
+ end
+ end
- c.reload
- c.state = "Running"
+ test "Container locked cancel" do
+ act_as_system_user do
+ c = minimal_new
c.save!
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+ check_no_change_from_cancelled c
+ end
+ end
- c.reload
- c.state = "Cancelled"
+ test "Container running cancel" do
+ act_as_system_user do
+ c = minimal_new
c.save!
-
- check_no_change_from_complete c
+ c.update_attributes! state: Container::Queued
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
+ c.update_attributes! state: Container::Cancelled
+ check_no_change_from_cancelled c
end
end
@@ -192,28 +164,13 @@ class ContainerTest < ActiveSupport::TestCase
act_as_system_user do
c = minimal_new
c.save!
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
- c.reload
- c.state = "Running"
- c.save!
-
- c.reload
- c.exit_code = 1
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
+ check_illegal_updates c, [{exit_code: 1},
+ {exit_code: 1, state: Container::Cancelled}]
- c.reload
- c.exit_code = 1
- c.state = "Cancelled"
- assert_raises(ActiveRecord::RecordInvalid) do
- c.save!
- end
-
- c.reload
- c.exit_code = 1
- c.state = "Complete"
- c.save!
+ assert c.update_attributes(exit_code: 1, state: Container::Complete)
end
end
end
commit aa76afd84456af352ba78f6e2b2d9e315bb60687
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Apr 28 11:16:50 2016 -0400
8128: De-dup container unit tests
diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index 0cac6ac..a25f2af 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -1,6 +1,14 @@
require 'test_helper'
class ContainerTest < ActiveSupport::TestCase
+ def minimal_new
+ c = Container.new
+ c.command = ["echo", "foo"]
+ c.container_image = "img"
+ c.output_path = "/tmp"
+ c
+ end
+
def check_illegal_modify c
c.reload
c.command = ["echo", "bar"]
@@ -90,10 +98,7 @@ class ContainerTest < ActiveSupport::TestCase
test "Container create" do
act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.cwd = "/tmp"
+ c = minimal_new
c.environment = {}
c.mounts = {"BAR" => "FOO"}
c.output_path = "/tmp"
@@ -112,10 +117,7 @@ class ContainerTest < ActiveSupport::TestCase
test "Container running" do
act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
+ c = minimal_new
c.save!
c.reload
@@ -145,10 +147,7 @@ class ContainerTest < ActiveSupport::TestCase
test "Container queued cancel" do
act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
+ c = minimal_new
c.save!
c.reload
@@ -161,10 +160,7 @@ class ContainerTest < ActiveSupport::TestCase
test "Container running cancel" do
act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
+ c = minimal_new
c.save!
c.reload
@@ -181,10 +177,7 @@ class ContainerTest < ActiveSupport::TestCase
test "Container create forbidden for non-admin" do
set_user_from_auth :active_trustedclient
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.cwd = "/tmp"
+ c = minimal_new
c.environment = {}
c.mounts = {"BAR" => "FOO"}
c.output_path = "/tmp"
@@ -197,10 +190,7 @@ class ContainerTest < ActiveSupport::TestCase
test "Container only set exit code on complete" do
act_as_system_user do
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
+ c = minimal_new
c.save!
c.reload
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list