[ARVADOS] updated: ff30b2754f2517fff513f766398ec04eac14c11c
Git user
git at public.curoverse.com
Wed Jun 15 09:53:42 EDT 2016
Summary of changes:
sdk/go/arvados/api_client_authorization.go | 12 ++
sdk/go/arvados/client.go | 3 +-
sdk/go/arvados/container.go | 54 +++++++
sdk/go/arvados/error.go | 43 ++++++
sdk/go/dispatch/dispatch.go | 62 +++-----
.../crunch-dispatch-local/crunch-dispatch-local.go | 7 +-
.../crunch-dispatch-local_test.go | 21 +--
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 21 +--
.../crunch-dispatch-slurm_test.go | 35 ++---
services/crunch-run/crunchrun.go | 112 +++++----------
services/crunch-run/crunchrun_test.go | 158 +++++++++++----------
11 files changed, 290 insertions(+), 238 deletions(-)
create mode 100644 sdk/go/arvados/api_client_authorization.go
create mode 100644 sdk/go/arvados/container.go
create mode 100644 sdk/go/arvados/error.go
via ff30b2754f2517fff513f766398ec04eac14c11c (commit)
via 36288f952d89249e7b52c714b0df0e4d0a4b0305 (commit)
via 83fed933f8d4d6000613024caec7d62dd7651209 (commit)
via e1db90809fe1eb94b0021e741af428fde3321c73 (commit)
via 0040468b872f7a888ed08be15538e314ee10a1c9 (commit)
via 19ad5dbdf1dc18d46f7fad9ca30b69126b224c96 (commit)
via 424181391748ec018b6157574dd65e5306d19f5d (commit)
from 2cd1c3ed705e639fb9e4ef067a32b278a6d3d4ee (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit ff30b2754f2517fff513f766398ec04eac14c11c
Merge: 2cd1c3e 36288f9
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jun 15 09:51:55 2016 -0400
Merge branch '9374-go-sdk'
refs #9374
commit 36288f952d89249e7b52c714b0df0e4d0a4b0305
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jun 15 09:47:59 2016 -0400
9374: Remove unused receivers.
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 0185374..9880230 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -288,36 +288,38 @@ func (s *TestSuite) TestLoadImage(c *C) {
}
type ArvErrorTestClient struct{}
-type KeepErrorTestClient struct{}
-type KeepReadErrorTestClient struct{}
-func (client ArvErrorTestClient) Create(resourceType string,
+func (ArvErrorTestClient) Create(resourceType string,
parameters arvadosclient.Dict,
output interface{}) error {
return nil
}
-func (client ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+func (ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
return errors.New("ArvError")
}
-func (client ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+func (ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
return errors.New("ArvError")
}
-func (client ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+func (ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
return nil
}
-func (client KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+type KeepErrorTestClient struct{}
+
+func (KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
return "", 0, errors.New("KeepError")
}
-func (client KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
return nil, errors.New("KeepError")
}
-func (client KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+type KeepReadErrorTestClient struct{}
+
+func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
return "", 0, nil
}
commit 83fed933f8d4d6000613024caec7d62dd7651209
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jun 14 10:30:06 2016 -0400
9374: Fix uses of "this" as receiver name.
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index dd9d0a1..0185374 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -132,25 +132,25 @@ func (*TestDockerClient) RemoveImage(name string, force bool) ([]*dockerclient.I
return nil, nil
}
-func (this *ArvTestClient) Create(resourceType string,
+func (client *ArvTestClient) Create(resourceType string,
parameters arvadosclient.Dict,
output interface{}) error {
- this.Mutex.Lock()
- defer this.Mutex.Unlock()
+ client.Mutex.Lock()
+ defer client.Mutex.Unlock()
- this.Calls += 1
- this.Content = append(this.Content, parameters)
+ client.Calls += 1
+ client.Content = append(client.Content, parameters)
if resourceType == "logs" {
et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
- if this.Logs == nil {
- this.Logs = make(map[string]*bytes.Buffer)
+ if client.Logs == nil {
+ client.Logs = make(map[string]*bytes.Buffer)
}
- if this.Logs[et] == nil {
- this.Logs[et] = &bytes.Buffer{}
+ if client.Logs[et] == nil {
+ client.Logs[et] = &bytes.Buffer{}
}
- this.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
+ client.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
}
if resourceType == "collections" && output != nil {
@@ -162,7 +162,7 @@ func (this *ArvTestClient) Create(resourceType string,
return nil
}
-func (this *ArvTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+func (client *ArvTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
switch {
case method == "GET" && resourceType == "containers" && action == "auth":
return json.Unmarshal([]byte(`{
@@ -175,7 +175,7 @@ func (this *ArvTestClient) Call(method, resourceType, uuid, action string, param
}
}
-func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+func (client *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
if resourceType == "collections" {
if uuid == hwPDH {
output.(*arvados.Collection).ManifestText = hwManifest
@@ -184,19 +184,19 @@ func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arva
}
}
if resourceType == "containers" {
- (*output.(*arvados.Container)) = this.Container
+ (*output.(*arvados.Container)) = client.Container
}
return nil
}
-func (this *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
- this.Mutex.Lock()
- defer this.Mutex.Unlock()
- this.Calls += 1
- this.Content = append(this.Content, parameters)
+func (client *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ client.Mutex.Lock()
+ defer client.Mutex.Unlock()
+ client.Calls += 1
+ client.Content = append(client.Content, parameters)
if resourceType == "containers" {
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
- this.WasSetRunning = true
+ client.WasSetRunning = true
}
}
return nil
@@ -206,9 +206,9 @@ func (this *ArvTestClient) Update(resourceType string, uuid string, parameters a
// parameters match jpath/string. E.g., CalledWith(c, "foo.bar",
// "baz") returns parameters with parameters["foo"]["bar"]=="baz". If
// no call matches, it returns nil.
-func (this *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
+func (client *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
call:
- for _, content := range this.Content {
+ for _, content := range client.Content {
var v interface{} = content
for _, k := range strings.Split(jpath, ".") {
if dict, ok := v.(arvadosclient.Dict); !ok {
@@ -224,8 +224,8 @@ call:
return nil
}
-func (this *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- this.Content = buf
+func (client *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ client.Content = buf
return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
}
@@ -234,14 +234,14 @@ type FileWrapper struct {
len uint64
}
-func (this FileWrapper) Len() uint64 {
- return this.len
+func (fw FileWrapper) Len() uint64 {
+ return fw.len
}
-func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
if filename == hwImageId+".tar" {
rdr := ioutil.NopCloser(&bytes.Buffer{})
- this.Called = true
+ client.Called = true
return FileWrapper{rdr, 1321984}, nil
}
return nil, nil
@@ -291,51 +291,51 @@ type ArvErrorTestClient struct{}
type KeepErrorTestClient struct{}
type KeepReadErrorTestClient struct{}
-func (this ArvErrorTestClient) Create(resourceType string,
+func (client ArvErrorTestClient) Create(resourceType string,
parameters arvadosclient.Dict,
output interface{}) error {
return nil
}
-func (this ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
+func (client ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
return errors.New("ArvError")
}
-func (this ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+func (client ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
return errors.New("ArvError")
}
-func (this ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+func (client ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
return nil
}
-func (this KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+func (client KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
return "", 0, errors.New("KeepError")
}
-func (this KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (client KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
return nil, errors.New("KeepError")
}
-func (this KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+func (client KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
return "", 0, nil
}
type ErrorReader struct{}
-func (this ErrorReader) Read(p []byte) (n int, err error) {
+func (ErrorReader) Read(p []byte) (n int, err error) {
return 0, errors.New("ErrorReader")
}
-func (this ErrorReader) Close() error {
+func (ErrorReader) Close() error {
return nil
}
-func (this ErrorReader) Len() uint64 {
+func (ErrorReader) Len() uint64 {
return 0
}
-func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
return ErrorReader{}, nil
}
@@ -381,21 +381,21 @@ type ClosableBuffer struct {
bytes.Buffer
}
+func (*ClosableBuffer) Close() error {
+ return nil
+}
+
type TestLogs struct {
Stdout ClosableBuffer
Stderr ClosableBuffer
}
-func (this *ClosableBuffer) Close() error {
- return nil
-}
-
-func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
+func (tl *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
if logstr == "stdout" {
- return &this.Stdout
+ return &tl.Stdout
}
if logstr == "stderr" {
- return &this.Stderr
+ return &tl.Stderr
}
return nil
}
commit e1db90809fe1eb94b0021e741af428fde3321c73
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jun 14 10:24:21 2016 -0400
9374: Use arvados.Collection instead of own CollectionRecord.
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index c4ea929..7da1beb 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -40,12 +40,6 @@ type IKeepClient interface {
ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
}
-// Collection record returned by the API server.
-type CollectionRecord struct {
- ManifestText string `json:"manifest_text"`
- PortableDataHash string `json:"portable_data_hash"`
-}
-
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) io.WriteCloser
@@ -128,7 +122,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
- var collection CollectionRecord
+ var collection arvados.Collection
err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
if err != nil {
return fmt.Errorf("While getting container image collection: %v", err)
@@ -527,7 +521,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
}
defer file.Close()
- rec := CollectionRecord{}
+ var rec arvados.Collection
err = json.NewDecoder(file).Decode(&rec)
if err != nil {
return fmt.Errorf("While reading FUSE metafile: %v", err)
@@ -535,7 +529,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
manifestText = rec.ManifestText
}
- var response CollectionRecord
+ var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
"collection": arvadosclient.Dict{
@@ -599,7 +593,7 @@ func (runner *ContainerRunner) CommitLogs() error {
return fmt.Errorf("While creating log manifest: %v", err)
}
- var response CollectionRecord
+ var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
"collection": arvadosclient.Dict{
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 242e207..dd9d0a1 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -155,7 +155,7 @@ func (this *ArvTestClient) Create(resourceType string,
if resourceType == "collections" && output != nil {
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
- outmap := output.(*CollectionRecord)
+ outmap := output.(*arvados.Collection)
outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
}
@@ -178,9 +178,9 @@ func (this *ArvTestClient) Call(method, resourceType, uuid, action string, param
func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
if resourceType == "collections" {
if uuid == hwPDH {
- output.(*CollectionRecord).ManifestText = hwManifest
+ output.(*arvados.Collection).ManifestText = hwManifest
} else if uuid == otherPDH {
- output.(*CollectionRecord).ManifestText = otherManifest
+ output.(*arvados.Collection).ManifestText = otherManifest
}
}
if resourceType == "containers" {
commit 0040468b872f7a888ed08be15538e314ee10a1c9
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jun 13 15:03:50 2016 -0400
9374: Propagate API transaction error details from response body to caller.
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index ee830c8..ee7db23 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -3,7 +3,6 @@ package arvados
import (
"crypto/tls"
"encoding/json"
- "fmt"
"io"
"io/ioutil"
"net/http"
@@ -73,7 +72,7 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
return err
}
if resp.StatusCode != 200 {
- return fmt.Errorf("request failed (%s): %s", req.URL, resp.Status)
+ return newTransactionError(req, resp, buf)
}
if dst == nil {
return nil
diff --git a/sdk/go/arvados/error.go b/sdk/go/arvados/error.go
new file mode 100644
index 0000000..4b24e6f
--- /dev/null
+++ b/sdk/go/arvados/error.go
@@ -0,0 +1,43 @@
+package arvados
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/url"
+ "strings"
+)
+
+type TransactionError struct {
+ Method string
+ URL url.URL
+ StatusCode int
+ Status string
+ errors []string
+}
+
+func (e TransactionError) Error() (s string) {
+ s = fmt.Sprintf("request failed: %s", e.URL)
+ if e.Status != "" {
+ s = s + ": " + e.Status
+ }
+ if len(e.errors) > 0 {
+ s = s + ": " + strings.Join(e.errors, "; ")
+ }
+ return
+}
+
+func newTransactionError(req *http.Request, resp *http.Response, buf []byte) *TransactionError {
+ var e TransactionError
+ if json.Unmarshal(buf, &e) != nil {
+ // No JSON-formatted error response
+ e.errors = nil
+ }
+ e.Method = req.Method
+ e.URL = *req.URL
+ if resp != nil {
+ e.Status = resp.Status
+ e.StatusCode = resp.StatusCode
+ }
+ return &e
+}
commit 19ad5dbdf1dc18d46f7fad9ca30b69126b224c96
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 9 23:18:24 2016 -0400
9374: Add arvados.APIClientAuthorization.
diff --git a/sdk/go/arvados/api_client_authorization.go b/sdk/go/arvados/api_client_authorization.go
new file mode 100644
index 0000000..b7f9db6
--- /dev/null
+++ b/sdk/go/arvados/api_client_authorization.go
@@ -0,0 +1,12 @@
+package arvados
+
+// APIClientAuthorization is an arvados#apiClientAuthorization resource.
+type APIClientAuthorization struct {
+ UUID string `json:"uuid"`
+ APIToken string `json:"api_token"`
+}
+
+// APIClientAuthorizationList is an arvados#apiClientAuthorizationList resource.
+type APIClientAuthorizationList struct {
+ Items []APIClientAuthorization `json:"items"`
+}
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 4b66c23..ce536de 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -22,15 +22,6 @@ const (
Cancelled = arvados.ContainerStateCancelled
)
-type apiClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
- Items []apiClientAuthorization `json:"items"`
-}
-
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
// The Arvados client
@@ -58,7 +49,7 @@ type Dispatcher struct {
mineMutex sync.Mutex
mineMap map[string]chan arvados.Container
- Auth apiClientAuthorization
+ Auth arvados.APIClientAuthorization
containers chan arvados.Container
}
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 9eee309..c4ea929 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -46,12 +46,6 @@ type CollectionRecord struct {
PortableDataHash string `json:"portable_data_hash"`
}
-// APIClientAuthorization is an arvados#api_client_authorization resource.
-type APIClientAuthorization struct {
- UUID string `json:"uuid"`
- APIToken string `json:"api_token"`
-}
-
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) io.WriteCloser
@@ -639,7 +633,7 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
return runner.token, nil
}
- var auth APIClientAuthorization
+ var auth arvados.APIClientAuthorization
err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
if err != nil {
return "", err
commit 424181391748ec018b6157574dd65e5306d19f5d
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 9 23:04:50 2016 -0400
9374: Consolidate various Container structs as arvados.Container.
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
new file mode 100644
index 0000000..ac12952
--- /dev/null
+++ b/sdk/go/arvados/container.go
@@ -0,0 +1,54 @@
+package arvados
+
+// Container is an arvados#container resource.
+type Container struct {
+ UUID string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ LockedByUUID string `json:"locked_by_uuid"`
+ Mounts map[string]Mount `json:"mounts"`
+ Output string `json:"output"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ State ContainerState `json:"state"`
+}
+
+// Mount is special behavior to attach to a filesystem path or device.
+type Mount struct {
+ Kind string `json:"kind"`
+ Writable bool `json:"writable"`
+ PortableDataHash string `json:"portable_data_hash"`
+ UUID string `json:"uuid"`
+ DeviceType string `json:"device_type"`
+ Path string `json:"path"`
+}
+
+// RuntimeConstraints specify a container's compute resources (RAM,
+// CPU) and network connectivity.
+type RuntimeConstraints struct {
+ API *bool
+ RAM int `json:"ram"`
+ VCPUs int `json:"vcpus"`
+}
+
+// ContainerList is an arvados#containerList resource.
+type ContainerList struct {
+ Items []Container `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
+
+// ContainerState is a string corresponding to a valid Container state.
+type ContainerState string
+
+const (
+ ContainerStateQueued = ContainerState("Queued")
+ ContainerStateLocked = ContainerState("Locked")
+ ContainerStateRunning = ContainerState("Running")
+ ContainerStateComplete = ContainerState("Complete")
+ ContainerStateCancelled = ContainerState("Cancelled")
+)
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 54d596f..4b66c23 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -4,6 +4,7 @@
package dispatch
import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"log"
"os"
@@ -13,13 +14,12 @@ import (
"time"
)
-// Constants for container states
const (
- Queued = "Queued"
- Locked = "Locked"
- Running = "Running"
- Complete = "Complete"
- Cancelled = "Cancelled"
+ Queued = arvados.ContainerStateQueued
+ Locked = arvados.ContainerStateLocked
+ Running = arvados.ContainerStateRunning
+ Complete = arvados.ContainerStateComplete
+ Cancelled = arvados.ContainerStateCancelled
)
type apiClientAuthorization struct {
@@ -31,21 +31,6 @@ type apiClientAuthorizationList struct {
Items []apiClientAuthorization `json:"items"`
}
-// Represents an Arvados container record
-type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
- Items []Container `json:"items"`
- ItemsAvailable int `json:"items_available"`
-}
-
// Dispatcher holds the state of the dispatcher
type Dispatcher struct {
// The Arvados client
@@ -63,7 +48,7 @@ type Dispatcher struct {
// handled by this dispatcher and the goroutine should terminate. The
// goroutine is responsible for draining the 'status' channel, failure
// to do so may deadlock the dispatcher.
- RunContainer func(*Dispatcher, Container, chan Container)
+ RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
// Amount of time to wait between polling for updates.
PollInterval time.Duration
@@ -72,22 +57,22 @@ type Dispatcher struct {
DoneProcessing chan struct{}
mineMutex sync.Mutex
- mineMap map[string]chan Container
+ mineMap map[string]chan arvados.Container
Auth apiClientAuthorization
- containers chan Container
+ containers chan arvados.Container
}
// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
// for which this process is actively starting/monitoring. Returns channel to
// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
if ch, ok := dispatcher.mineMap[uuid]; ok {
return ch
}
- ch := make(chan Container)
+ ch := make(chan arvados.Container)
dispatcher.mineMap[uuid] = ch
return ch
}
@@ -102,10 +87,10 @@ func (dispatcher *Dispatcher) notMine(uuid string) {
}
}
-// checkMine returns true/false if there is a channel for updates associated
+// checkMine returns true if there is a channel for updates associated
// with container c. If update is true, also send the container record on
// the channel.
-func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
+func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
dispatcher.mineMutex.Lock()
defer dispatcher.mineMutex.Unlock()
ch, ok := dispatcher.mineMap[c.UUID]
@@ -119,7 +104,7 @@ func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
}
func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
- var containers ContainerList
+ var containers arvados.ContainerList
err := dispatcher.Arv.List("containers", params, &containers)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
@@ -175,7 +160,7 @@ func (dispatcher *Dispatcher) pollContainers() {
}
}
-func (dispatcher *Dispatcher) handleUpdate(container Container) {
+func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
if container.State == Queued && dispatcher.checkMine(container, false) {
// If we previously started the job, something failed, and it
// was re-queued, this dispatcher might still be monitoring it.
@@ -216,7 +201,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
}
// UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
err := dispatcher.Arv.Update("containers", uuid,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": newState}},
@@ -237,8 +222,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
return
}
- dispatcher.mineMap = make(map[string]chan Container)
- dispatcher.containers = make(chan Container)
+ dispatcher.mineMap = make(map[string]chan arvados.Container)
+ dispatcher.containers = make(chan arvados.Container)
// Graceful shutdown on signal
sigChan := make(chan os.Signal)
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 73a3895..936a908 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -4,6 +4,7 @@ package main
import (
"flag"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"log"
@@ -76,7 +77,7 @@ func doMain() error {
return nil
}
-func startFunc(container dispatch.Container, cmd *exec.Cmd) error {
+func startFunc(container arvados.Container, cmd *exec.Cmd) error {
return cmd.Start()
}
@@ -91,8 +92,8 @@ var startCmd = startFunc
// If the container is in any other state, or is not Complete/Cancelled after
// crunch-run terminates, mark the container as Cancelled.
func run(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
uuid := container.UUID
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index d4a2708..9628bf2 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -2,6 +2,7 @@ package main
import (
"bytes"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
@@ -64,16 +65,16 @@ func (s *TestSuite) TestIntegration(c *C) {
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
Arv: arv,
- PollInterval: time.Duration(1) * time.Second,
+ PollInterval: time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
run(dispatcher, container, status)
doneProcessing <- struct{}{}
},
DoneProcessing: doneProcessing}
- startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
dispatcher.UpdateState(container.UUID, "Complete")
return cmd.Start()
@@ -89,16 +90,16 @@ func (s *TestSuite) TestIntegration(c *C) {
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers dispatch.ContainerList
+ var containers arvados.ContainerList
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container dispatch.Container
+ var container arvados.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- c.Check(container.State, Equals, "Complete")
+ c.Check(string(container.State), Equals, "Complete")
}
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -168,14 +169,14 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
run(dispatcher, container, status)
doneProcessing <- struct{}{}
},
DoneProcessing: doneProcessing}
- startCmd = func(container dispatch.Container, cmd *exec.Cmd) error {
+ startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
dispatcher.UpdateState(container.UUID, "Running")
dispatcher.UpdateState(container.UUID, "Complete")
return cmd.Start()
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index f718fbc..4bfff6a 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -5,6 +5,7 @@ package main
import (
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"io/ioutil"
@@ -69,17 +70,17 @@ func doMain() error {
}
// sbatchCmd
-func sbatchFunc(container dispatch.Container) *exec.Cmd {
- memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
+func sbatchFunc(container arvados.Container) *exec.Cmd {
+ memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
return exec.Command("sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", container.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
- fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs),
fmt.Sprintf("--priority=%d", container.Priority))
}
// scancelCmd
-func scancelFunc(container dispatch.Container) *exec.Cmd {
+func scancelFunc(container arvados.Container) *exec.Cmd {
return exec.Command("scancel", "--name="+container.UUID)
}
@@ -89,7 +90,7 @@ var scancelCmd = scancelFunc
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
- container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
+ container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) {
submitErr = nil
defer func() {
@@ -181,7 +182,7 @@ func submit(dispatcher *dispatch.Dispatcher,
//
// If the container is marked as Running, check if it is in the slurm queue.
// If not, mark it as Cancelled.
-func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.Container, monitorDone *bool) {
+func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
if squeueUpdater.CheckSqueue(container.UUID) {
@@ -207,13 +208,13 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C
// release it back to the Queue, if it is Running then
// clean up the record.
- var con dispatch.Container
+ var con arvados.Container
err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
if err != nil {
log.Printf("Error getting final container state: %v", err)
}
- var st string
+ var st arvados.ContainerState
switch con.State {
case dispatch.Locked:
st = dispatch.Queued
@@ -236,8 +237,8 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container dispatch.C
// Monitor status updates. If the priority changes to zero, cancel the
// container using scancel.
func run(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
log.Printf("Monitoring container %v started", container.UUID)
defer log.Printf("Monitoring container %v finished", container.UUID)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index cddbe8c..b72ad9f 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -3,6 +3,7 @@ package main
import (
"bytes"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
@@ -59,29 +60,29 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
func (s *TestSuite) TestIntegrationNormal(c *C) {
container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
[]string(nil),
- func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
- c.Check(container.State, Equals, "Complete")
+ c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
func (s *TestSuite) TestIntegrationCancel(c *C) {
// Override sbatchCmd
var scancelCmdLine []string
- defer func(orig func(dispatch.Container) *exec.Cmd) {
+ defer func(orig func(arvados.Container) *exec.Cmd) {
scancelCmd = orig
}(scancelCmd)
- scancelCmd = func(container dispatch.Container) *exec.Cmd {
+ scancelCmd = func(container arvados.Container) *exec.Cmd {
scancelCmdLine = scancelFunc(container).Args
return exec.Command("echo")
}
container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
[]string(nil),
- func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(1 * time.Second)
dispatcher.Arv.Update("containers", container.UUID,
@@ -89,7 +90,7 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
"container": arvadosclient.Dict{"priority": 0}},
nil)
})
- c.Check(container.State, Equals, "Cancelled")
+ c.Check(container.State, Equals, arvados.ContainerStateCancelled)
c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
}
@@ -99,18 +100,18 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
fmt.Sprintf("--mem-per-cpu=%d", 2862),
fmt.Sprintf("--cpus-per-task=%d", 4),
fmt.Sprintf("--priority=%d", 1)},
- func(dispatcher *dispatch.Dispatcher, container dispatch.Container) {
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
- c.Check(container.State, Equals, "Cancelled")
+ c.Check(container.State, Equals, arvados.ContainerStateCancelled)
}
func (s *TestSuite) integrationTest(c *C,
newSqueueCmd func() *exec.Cmd,
sbatchCmdComps []string,
- runContainer func(*dispatch.Dispatcher, dispatch.Container)) dispatch.Container {
+ runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
arvadostest.ResetEnv()
arv, err := arvadosclient.MakeArvadosClient()
@@ -119,10 +120,10 @@ func (s *TestSuite) integrationTest(c *C,
var sbatchCmdLine []string
// Override sbatchCmd
- defer func(orig func(dispatch.Container) *exec.Cmd) {
+ defer func(orig func(arvados.Container) *exec.Cmd) {
sbatchCmd = orig
}(sbatchCmd)
- sbatchCmd = func(container dispatch.Container) *exec.Cmd {
+ sbatchCmd = func(container arvados.Container) *exec.Cmd {
sbatchCmdLine = sbatchFunc(container).Args
return exec.Command("sh")
}
@@ -137,7 +138,7 @@ func (s *TestSuite) integrationTest(c *C,
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
- var containers dispatch.ContainerList
+ var containers arvados.ContainerList
err = arv.List("containers", params, &containers)
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
@@ -150,8 +151,8 @@ func (s *TestSuite) integrationTest(c *C,
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
go runContainer(dispatcher, container)
run(dispatcher, container, status)
doneProcessing <- struct{}{}
@@ -173,7 +174,7 @@ func (s *TestSuite) integrationTest(c *C,
c.Check(len(containers.Items), Equals, 0)
// Previously "Queued" container should now be in "Complete" state
- var container dispatch.Container
+ var container arvados.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
return container
@@ -212,8 +213,8 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
Arv: arv,
PollInterval: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
- container dispatch.Container,
- status chan dispatch.Container) {
+ container arvados.Container,
+ status chan arvados.Container) {
go func() {
time.Sleep(1 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Running)
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index d508ffb..9eee309 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
@@ -39,41 +40,12 @@ type IKeepClient interface {
ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
}
-// Mount describes the mount points to create inside the container.
-type Mount struct {
- Kind string `json:"kind"`
- Writable bool `json:"writable"`
- PortableDataHash string `json:"portable_data_hash"`
- UUID string `json:"uuid"`
- DeviceType string `json:"device_type"`
- Path string `json:"path"`
-}
-
// Collection record returned by the API server.
type CollectionRecord struct {
ManifestText string `json:"manifest_text"`
PortableDataHash string `json:"portable_data_hash"`
}
-type RuntimeConstraints struct {
- API *bool
-}
-
-// ContainerRecord is the container record returned by the API server.
-type ContainerRecord struct {
- UUID string `json:"uuid"`
- Command []string `json:"command"`
- ContainerImage string `json:"container_image"`
- Cwd string `json:"cwd"`
- Environment map[string]string `json:"environment"`
- Mounts map[string]Mount `json:"mounts"`
- OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
- RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
- State string `json:"state"`
- Output string `json:"output"`
-}
-
// APIClientAuthorization is an arvados#api_client_authorization resource.
type APIClientAuthorization struct {
UUID string `json:"uuid"`
@@ -105,7 +77,7 @@ type ContainerRunner struct {
Docker ThinDockerClient
ArvClient IArvadosClient
Kc IKeepClient
- ContainerRecord
+ arvados.Container
dockerclient.ContainerConfig
dockerclient.HostConfig
token string
@@ -160,10 +132,10 @@ func (runner *ContainerRunner) SetupSignals() {
// the image from Keep.
func (runner *ContainerRunner) LoadImage() (err error) {
- runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
+ runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.Container.ContainerImage)
var collection CollectionRecord
- err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
+ err = runner.ArvClient.Get("collections", runner.Container.ContainerImage, nil, &collection)
if err != nil {
return fmt.Errorf("While getting container image collection: %v", err)
}
@@ -271,7 +243,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
collectionPaths := []string{}
runner.Binds = nil
- for bind, mnt := range runner.ContainerRecord.Mounts {
+ for bind, mnt := range runner.Container.Mounts {
if bind == "stdout" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
@@ -279,7 +251,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
}
// Does path start with OutputPath?
- prefix := runner.ContainerRecord.OutputPath
+ prefix := runner.Container.OutputPath
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
@@ -311,7 +283,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
tmpcount += 1
}
if mnt.Writable {
- if bind == runner.ContainerRecord.OutputPath {
+ if bind == runner.Container.OutputPath {
runner.HostOutputDir = src
}
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
@@ -320,7 +292,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
}
collectionPaths = append(collectionPaths, src)
} else if mnt.Kind == "tmp" {
- if bind == runner.ContainerRecord.OutputPath {
+ if bind == runner.Container.OutputPath {
runner.HostOutputDir, err = runner.MkTempDir("", "")
if err != nil {
return fmt.Errorf("While creating mount temp dir: %v", err)
@@ -428,8 +400,8 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
runner.loggingDone = make(chan bool)
- if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok {
- stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
+ if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
+ stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
index := strings.LastIndex(stdoutPath, "/")
if index > 0 {
subdirs := stdoutPath[:index]
@@ -464,15 +436,15 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
- runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
- if runner.ContainerRecord.Cwd != "." {
- runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
+ runner.ContainerConfig.Cmd = runner.Container.Command
+ if runner.Container.Cwd != "." {
+ runner.ContainerConfig.WorkingDir = runner.Container.Cwd
}
- for k, v := range runner.ContainerRecord.Environment {
+ for k, v := range runner.Container.Environment {
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
- if wantAPI := runner.ContainerRecord.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+ if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
tok, err := runner.ContainerToken()
if err != nil {
return err
@@ -616,7 +588,7 @@ func (runner *ContainerRunner) CommitLogs() error {
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
"crunch-run", nil})
if runner.LogsPDH != nil {
@@ -637,7 +609,7 @@ func (runner *ContainerRunner) CommitLogs() error {
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
"collection": arvadosclient.Dict{
- "name": "logs for " + runner.ContainerRecord.UUID,
+ "name": "logs for " + runner.Container.UUID,
"manifest_text": mt}},
&response)
if err != nil {
@@ -649,14 +621,14 @@ func (runner *ContainerRunner) CommitLogs() error {
return nil
}
-// UpdateContainerRecordRunning updates the container state to "Running"
-func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
+// UpdateContainerRunning updates the container state to "Running"
+func (runner *ContainerRunner) UpdateContainerRunning() error {
runner.CancelLock.Lock()
defer runner.CancelLock.Unlock()
if runner.Cancelled {
return ErrCancelled
}
- return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
+ return runner.ArvClient.Update("containers", runner.Container.UUID,
arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
}
@@ -668,7 +640,7 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
}
var auth APIClientAuthorization
- err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth)
+ err := runner.ArvClient.Call("GET", "containers", runner.Container.UUID, "auth", nil, &auth)
if err != nil {
return "", err
}
@@ -676,9 +648,9 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
return runner.token, nil
}
-// UpdateContainerRecordComplete updates the container record state on API
+// UpdateContainerComplete updates the container record state on API
// server to "Complete" or "Cancelled"
-func (runner *ContainerRunner) UpdateContainerRecordFinal() error {
+func (runner *ContainerRunner) UpdateContainerFinal() error {
update := arvadosclient.Dict{}
update["state"] = runner.finalState
if runner.finalState == "Complete" {
@@ -692,7 +664,7 @@ func (runner *ContainerRunner) UpdateContainerRecordFinal() error {
update["output"] = *runner.OutputPDH
}
}
- return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
+ return runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
// IsCancelled returns the value of Cancelled, with goroutine safety.
@@ -704,12 +676,12 @@ func (runner *ContainerRunner) IsCancelled() bool {
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
}
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
- runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
+ runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
hostname, hosterr := os.Hostname()
if hosterr != nil {
@@ -744,7 +716,7 @@ func (runner *ContainerRunner) Run() (err error) {
checkErr(err)
if runner.finalState == "Queued" {
- runner.UpdateContainerRecordFinal()
+ runner.UpdateContainerFinal()
return
}
@@ -756,7 +728,7 @@ func (runner *ContainerRunner) Run() (err error) {
checkErr(runner.CaptureOutput())
checkErr(runner.CommitLogs())
- checkErr(runner.UpdateContainerRecordFinal())
+ checkErr(runner.UpdateContainerFinal())
// The real log is already closed, but then we opened
// a new one in case we needed to log anything while
@@ -764,7 +736,7 @@ func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Close()
}()
- err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
+ err = runner.ArvClient.Get("containers", runner.Container.UUID, nil, &runner.Container)
if err != nil {
err = fmt.Errorf("While getting container record: %v", err)
return
@@ -796,7 +768,7 @@ func (runner *ContainerRunner) Run() (err error) {
return
}
- err = runner.UpdateContainerRecordRunning()
+ err = runner.UpdateContainerRunning()
if err != nil {
return
}
@@ -825,7 +797,7 @@ func NewContainerRunner(api IArvadosClient,
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
- cr.ContainerRecord.UUID = containerUUID
+ cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
return cr
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 998c4bc..242e207 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
@@ -38,7 +39,7 @@ type ArvTestClient struct {
Total int64
Calls int
Content []arvadosclient.Dict
- ContainerRecord
+ arvados.Container
Logs map[string]*bytes.Buffer
WasSetRunning bool
sync.Mutex
@@ -183,7 +184,7 @@ func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arva
}
}
if resourceType == "containers" {
- (*output.(*ContainerRecord)) = this.ContainerRecord
+ (*output.(*arvados.Container)) = this.Container
}
return nil
}
@@ -206,7 +207,8 @@ func (this *ArvTestClient) Update(resourceType string, uuid string, parameters a
// "baz") returns parameters with parameters["foo"]["bar"]=="baz". If
// no call matches, it returns nil.
func (this *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
- call: for _, content := range this.Content {
+call:
+ for _, content := range this.Content {
var v interface{} = content
for _, k := range strings.Split(jpath, ".") {
if dict, ok := v.(arvadosclient.Dict); !ok {
@@ -255,7 +257,7 @@ func (s *TestSuite) TestLoadImage(c *C) {
_, err = cr.Docker.InspectImage(hwImageId)
c.Check(err, NotNil)
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
// (1) Test loading image from keep
c.Check(kc.Called, Equals, false)
@@ -340,7 +342,7 @@ func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, file
func (s *TestSuite) TestLoadImageArvError(c *C) {
// (1) Arvados error
cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
@@ -350,7 +352,7 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
docker := NewTestDockerClient()
cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
@@ -359,7 +361,7 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) {
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = otherPDH
+ cr.Container.ContainerImage = otherPDH
err := cr.LoadImage()
c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
@@ -369,7 +371,7 @@ func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
docker := NewTestDockerClient()
cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- cr.ContainerRecord.ContainerImage = hwPDH
+ cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
c.Check(err, NotNil)
@@ -418,8 +420,8 @@ func (s *TestSuite) TestRunContainer(c *C) {
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
- cr.ContainerRecord.ContainerImage = hwPDH
- cr.ContainerRecord.Command = []string{"./hw"}
+ cr.Container.ContainerImage = hwPDH
+ cr.Container.Command = []string{"./hw"}
err := cr.LoadImage()
c.Check(err, IsNil)
@@ -455,18 +457,18 @@ func (s *TestSuite) TestCommitLogs(c *C) {
c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
}
-func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
+func (s *TestSuite) TestUpdateContainerRunning(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- err := cr.UpdateContainerRecordRunning()
+ err := cr.UpdateContainerRunning()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
}
-func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
+func (s *TestSuite) TestUpdateContainerComplete(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
@@ -478,7 +480,7 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
*cr.ExitCode = 42
cr.finalState = "Complete"
- err := cr.UpdateContainerRecordFinal()
+ err := cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
@@ -486,14 +488,14 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
}
-func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
+func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Cancelled = true
cr.finalState = "Cancelled"
- err := cr.UpdateContainerRecordFinal()
+ err := cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil)
@@ -504,7 +506,7 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
- rec := ContainerRecord{}
+ rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
@@ -512,7 +514,7 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
docker.fn = fn
docker.RemoveImage(hwImageId, true)
- api = &ArvTestClient{ContainerRecord: rec}
+ api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
@@ -643,7 +645,7 @@ func (s *TestSuite) TestCancel(c *C) {
"runtime_constraints": {}
}`
- rec := ContainerRecord{}
+ rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
@@ -656,7 +658,7 @@ func (s *TestSuite) TestCancel(c *C) {
}
docker.RemoveImage(hwImageId, true)
- api := &ArvTestClient{ContainerRecord: rec}
+ api := &ArvTestClient{Container: rec}
cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
@@ -735,8 +737,8 @@ func (s *TestSuite) TestSetupMounts(c *C) {
}
{
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
cr.OutputPath = "/tmp"
err := cr.SetupMounts()
@@ -748,8 +750,8 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts["/keeptmp"] = arvados.Mount{Kind: "collection", Writable: true}
cr.OutputPath = "/keeptmp"
os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
@@ -763,9 +765,9 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
- cr.ContainerRecord.Mounts["/keepout"] = Mount{Kind: "collection", Writable: true}
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts["/keepinp"] = arvados.Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
+ cr.Container.Mounts["/keepout"] = arvados.Mount{Kind: "collection", Writable: true}
cr.OutputPath = "/keepout"
os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
@@ -808,7 +810,7 @@ func (s *TestSuite) TestStdout(c *C) {
// Used by the TestStdoutWithWrongPath*()
func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
- rec := ContainerRecord{}
+ rec := arvados.Container{}
err = json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
@@ -816,7 +818,7 @@ func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (ap
docker.fn = fn
docker.RemoveImage(hwImageId, true)
- api = &ArvTestClient{ContainerRecord: rec}
+ api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list