[ARVADOS] created: 7600537f3f34ee88a76688dbb0e1d73723905fa7
git at public.curoverse.com
git at public.curoverse.com
Wed Oct 21 16:41:40 EDT 2015
at 7600537f3f34ee88a76688dbb0e1d73723905fa7 (commit)
commit 7600537f3f34ee88a76688dbb0e1d73723905fa7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 21 16:41:35 2015 -0400
7582: Uploader mostly done, writing tests
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
new file mode 100644
index 0000000..2196a9d
--- /dev/null
+++ b/sdk/go/crunchrunner/upload.go
@@ -0,0 +1,176 @@
+package main
+
+import (
+ "bytes"
+ "crypto/md5"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "io"
+ "log"
+ "os"
+ "path/filepath"
+)
+
+type Block struct {
+ data []byte
+ offset int64
+}
+
+type ManifestStreamWriter struct {
+ *ManifestWriter
+ *manifest.ManifestStream
+ offset int64
+ *Block
+ uploader chan *Block
+}
+
+type IKeepClient interface {
+ PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
+ // Needed to conform to Writer interface, but not implemented
+ // because io.Copy will actually use ReadFrom instead.
+ return 0, nil
+}
+
+func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
+ var total int64
+ var count int
+
+ for err == nil {
+ if m.Block == nil {
+ m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+ }
+ count, err = r.Read(m.Block.data[m.Block.offset:])
+ total += int64(count)
+ m.Block.offset += int64(count)
+ if count > 0 {
+ if m.Block.offset == keepclient.BLOCKSIZE {
+ m.uploader <- m.Block
+ m.Block = nil
+ }
+ }
+ }
+
+ return total, err
+}
+
+func (m *ManifestStreamWriter) goUpload() {
+ select {
+ case block, valid := <-m.uploader:
+ if !valid {
+ return
+ }
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
+
+}
+
+type ManifestWriter struct {
+ IKeepClient
+ stripPrefix string
+ Streams map[string]*ManifestStreamWriter
+}
+
+type walker struct {
+ currentDir string
+ m *ManifestWriter
+}
+
+func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
+ log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
+
+ if info.IsDir() {
+ if path == w.currentDir {
+ return nil
+ }
+ return filepath.Walk(path, walker{path, w.m}.WalkFunc)
+ }
+ m := w.m
+
+ dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
+ fn := path[(len(path) - len(info.Name())):]
+
+ if m.Streams[dir] == nil {
+ m.Streams[dir] = &ManifestStreamWriter{
+ m,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ nil,
+ make(chan *Block)}
+ go m.Streams[dir].goUpload()
+ }
+
+ stream := m.Streams[dir]
+
+ fileStart := stream.offset
+
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+
+ var count int64
+ count, err = io.Copy(stream, file)
+ if err != nil && err != io.EOF {
+ return err
+ }
+
+ stream.offset += count
+
+ stream.ManifestStream.Files = append(stream.ManifestStream.Files,
+ fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+
+ return nil
+}
+
+func (m *ManifestWriter) Finish() {
+ for _, v := range m.Streams {
+ if v.uploader != nil {
+ if v.Block != nil {
+ v.uploader <- v.Block
+ }
+ close(v.uploader)
+ v.uploader = nil
+ }
+ }
+}
+
+func (m *ManifestWriter) ManifestText() string {
+ m.Finish()
+ var buf bytes.Buffer
+ for k, v := range m.Streams {
+ if k == "" {
+ buf.WriteString(".")
+ } else {
+ buf.WriteString("./" + k)
+ }
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ for _, f := range v.Files {
+ buf.WriteString(" ")
+ buf.WriteString(f)
+ }
+ buf.WriteString("\n")
+ }
+ return buf.String()
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+ mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
+ err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
+ mw.Finish()
+
+ if err != nil {
+ return "", err
+ } else {
+ return mw.ManifestText(), nil
+ }
+
+}
diff --git a/sdk/go/crunchrunner/upload_test.go b/sdk/go/crunchrunner/upload_test.go
new file mode 100644
index 0000000..6e0e103
--- /dev/null
+++ b/sdk/go/crunchrunner/upload_test.go
@@ -0,0 +1,66 @@
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ . "gopkg.in/check.v1"
+ "io/ioutil"
+ "os"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+type KeepTestClient struct {
+}
+
+func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return fmt.Sprintf("%x+%v", md5.Sum(buf), len(buf)), len(buf), nil
+}
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/"+"subdir", 0600)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/"+"subdir/file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file1.txt
+./subdir acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file2.txt
+`)
+}
commit 5a4cb9d3957905a02716761edb1be662edad0312
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 21 13:38:57 2015 -0400
7582: More tests, add vwd support
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index b8b6234..0ca7ce9 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -12,13 +12,13 @@ import (
type TaskDef struct {
commands []string `json:"commands"`
- env map[string]string `json:"task.env"`
- stdin string `json:"task.stdin"`
- stdout string `json:"task.stdout"`
- vwd map[string]string `json:"task.vwd"`
- successCodes []int `json:"task.successCodes"`
- permanentFailCodes []int `json:"task.permanentFailCodes"`
- temporaryFailCodes []int `json:"task.temporaryFailCodes"`
+ env map[string]string `json:"env"`
+ stdin string `json:"stdin"`
+ stdout string `json:"stdout"`
+ vwd map[string]string `json:"vwd"`
+ successCodes []int `json:"successCodes"`
+ permanentFailCodes []int `json:"permanentFailCodes"`
+ temporaryFailCodes []int `json:"temporaryFailCodes"`
}
type Tasks struct {
@@ -39,7 +39,7 @@ type Task struct {
progress float32 `json:"sequence"`
}
-func setupDirectories(tmpdir string) (outdir string, err error) {
+func setupDirectories(tmpdir, taskUuid string) (outdir string, err error) {
err = os.Chdir(tmpdir)
if err != nil {
return "", err
@@ -50,12 +50,12 @@ func setupDirectories(tmpdir string) (outdir string, err error) {
return "", err
}
- err = os.Mkdir("outdir", 0700)
+ err = os.Mkdir(taskUuid, 0700)
if err != nil {
return "", err
}
- os.Chdir("outdir")
+ os.Chdir(taskUuid)
if err != nil {
return "", err
}
@@ -71,16 +71,17 @@ func setupDirectories(tmpdir string) (outdir string, err error) {
func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error {
var err error
- //if taskp.vwd != nil {
- // Set up VWD symlinks in outdir
- // TODO
- //}
+ if taskp.vwd != nil {
+ for k, v := range taskp.vwd {
+ os.Symlink(keepmount+"/"+v, outdir+"/"+k)
+ }
+ }
if taskp.stdin != "" {
// Set up stdin redirection
cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin)
if err != nil {
- log.Fatal(err)
+ return err
}
}
@@ -88,7 +89,7 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error
// Set up stdout redirection
cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout)
if err != nil {
- log.Fatal(err)
+ return err
}
} else {
cmd.Stdout = os.Stdout
@@ -183,7 +184,7 @@ func runner(api arvadosclient.IArvadosClient,
cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...)
var outdir string
- outdir, err = setupDirectories(tmpdir)
+ outdir, err = setupDirectories(tmpdir, taskUuid)
if err != nil {
return TempFail{err}
}
@@ -203,7 +204,11 @@ func runner(api arvadosclient.IArvadosClient,
err = cmd.Run()
if err != nil {
- return TempFail{err}
+ // Run() returns ExitError on non-zero exit code, but we handle
+ // that down below. So only return if it's not ExitError.
+ if _, ok := err.(*exec.ExitError); !ok {
+ return TempFail{err}
+ }
}
const success = 1
@@ -218,7 +223,7 @@ func runner(api arvadosclient.IArvadosClient,
} else if inCodes(exitCode, taskp.permanentFailCodes) {
status = permfail
} else if inCodes(exitCode, taskp.temporaryFailCodes) {
- os.Exit(TASK_TEMPFAIL)
+ return TempFail{nil}
} else if cmd.ProcessState.Success() {
status = success
} else {
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
index 127c434..29b092b 100644
--- a/sdk/go/crunchrunner/crunchrunner_test.go
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -63,7 +63,7 @@ func (s *TestSuite) TestSimpleRun(c *C) {
}
func checkOutput(c *C, tmpdir string) {
- file, err := os.Open(tmpdir + "/" + "outdir/output.txt")
+ file, err := os.Open(tmpdir + "/zzzz-ot0gb-111111111111111/output.txt")
c.Assert(err, IsNil)
data := make([]byte, 100)
@@ -105,8 +105,8 @@ func (s *TestSuite) TestRedirect(c *C) {
tmpfile, _ := ioutil.TempFile("", "")
tmpfile.Write([]byte("foo\n"))
- tmpfile.Sync()
- defer tmpfile.Close()
+ tmpfile.Close()
+ defer os.Remove(tmpfile.Name())
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
@@ -213,3 +213,111 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
c.Check(err, IsNil)
}
+
+func (s *TestSuite) TestRunFail(c *C) {
+
+ api := ArvTestClient{}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"/bin/sh", "-c", "exit 1"}}}}},
+ Task{sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunSuccessCode(c *C) {
+
+ api := ArvTestClient{}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"/bin/sh", "-c", "exit 1"},
+ successCodes: []int{0, 1}}}}},
+ Task{sequence: 0})
+ c.Check(err, IsNil)
+}
+
+func (s *TestSuite) TestRunFailCode(c *C) {
+ api := ArvTestClient{}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"/bin/sh", "-c", "exit 0"},
+ permanentFailCodes: []int{0, 1}}}}},
+ Task{sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunTempFailCode(c *C) {
+ api := ArvTestClient{}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"/bin/sh", "-c", "exit 1"},
+ temporaryFailCodes: []int{1}}}}},
+ Task{sequence: 0})
+ c.Check(err, FitsTypeOf, TempFail{})
+}
+
+func (s *TestSuite) TestVwd(c *C) {
+ api := ArvTestClient{}
+
+ tmpfile, _ := ioutil.TempFile("", "")
+ tmpfile.Write([]byte("foo\n"))
+ tmpfile.Close()
+ defer os.Remove(tmpfile.Name())
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"ls", "output.txt"},
+ vwd: map[string]string{
+ "output.txt": tmpfile.Name()}}}}},
+ Task{sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
commit f15a9a1ad40f589d3c40f856a95e1d5556ee7ca1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 21 13:04:07 2015 -0400
7582: Working on tests.
diff --git a/sdk/go/arvadosclient/arvadosclient.go b/sdk/go/arvadosclient/arvadosclient.go
index 1cce0a7..09170ba 100644
--- a/sdk/go/arvadosclient/arvadosclient.go
+++ b/sdk/go/arvadosclient/arvadosclient.go
@@ -78,6 +78,14 @@ type ArvadosClient struct {
DiscoveryDoc Dict
}
+type IArvadosClient interface {
+ Create(resourceType string, parameters Dict, output interface{}) error
+ Delete(resource string, uuid string, parameters Dict, output interface{}) (err error)
+ Update(resourceType string, uuid string, parameters Dict, output interface{}) (err error)
+ Get(resourceType string, uuid string, parameters Dict, output interface{}) (err error)
+ List(resource string, parameters Dict, output interface{}) (err error)
+}
+
// Create a new ArvadosClient, initialized with standard Arvados environment
// variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally)
// ARVADOS_API_HOST_INSECURE.
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index 9f2ae25..b8b6234 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -3,7 +3,6 @@ package main
import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
//"git.curoverse.com/arvados.git/sdk/go/keepclient"
- "errors"
"log"
"os"
"os/exec"
@@ -11,13 +10,33 @@ import (
"syscall"
)
-func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
- r = make(arvadosclient.Dict)
- err := api.Get(rsc, uuid, nil, &r)
- if err != nil {
- log.Fatal(err)
- }
- return r
+type TaskDef struct {
+ commands []string `json:"commands"`
+ env map[string]string `json:"task.env"`
+ stdin string `json:"task.stdin"`
+ stdout string `json:"task.stdout"`
+ vwd map[string]string `json:"task.vwd"`
+ successCodes []int `json:"task.successCodes"`
+ permanentFailCodes []int `json:"task.permanentFailCodes"`
+ temporaryFailCodes []int `json:"task.temporaryFailCodes"`
+}
+
+type Tasks struct {
+ tasks []TaskDef `json:"script_parameters"`
+}
+
+type Job struct {
+ script_parameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+ job_uuid string `json:"job_uuid"`
+ created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
+ parameters TaskDef `json:"parameters"`
+ sequence int `json:"sequence"`
+ output string `json:"output"`
+ success bool `json:"success"`
+ progress float32 `json:"sequence"`
}
func setupDirectories(tmpdir string) (outdir string, err error) {
@@ -49,34 +68,25 @@ func setupDirectories(tmpdir string) (outdir string, err error) {
return outdir, nil
}
-func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error {
var err error
- if taskp["task.vwd"] != nil {
- // Set up VWD symlinks in outdir
- // TODO
- }
+ //if taskp.vwd != nil {
+ // Set up VWD symlinks in outdir
+ // TODO
+ //}
- if taskp["task.stdin"] != nil {
- stdin, ok := taskp["task.stdin"].(string)
- if !ok {
- return errors.New("Could not cast task.stdin to string")
- }
+ if taskp.stdin != "" {
// Set up stdin redirection
- cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
+ cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin)
if err != nil {
log.Fatal(err)
}
}
- if taskp["task.stdout"] != nil {
- stdout, ok := taskp["task.stdout"].(string)
- if !ok {
- return errors.New("Could not cast task.stdout to string")
- }
-
+ if taskp.stdout != "" {
// Set up stdout redirection
- cmd.Stdout, err = os.Open(outdir + "/" + stdout)
+ cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout)
if err != nil {
log.Fatal(err)
}
@@ -84,21 +94,11 @@ func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir
cmd.Stdout = os.Stdout
}
- if taskp["task.env"] != nil {
- taskenv, ok := taskp["task.env"].(map[string]interface{})
- if !ok {
- return errors.New("Could not cast task.env to map")
- }
-
+ if taskp.env != nil {
// Set up subprocess environment
cmd.Env = os.Environ()
- for k, v := range taskenv {
- var vstr string
- vstr, ok = v.(string)
- if !ok {
- return errors.New("Could not cast environment value to string")
- }
- cmd.Env = append(cmd.Env, k+"="+vstr)
+ for k, v := range taskp.env {
+ cmd.Env = append(cmd.Env, k+"="+v)
}
}
return nil
@@ -119,16 +119,10 @@ func setupSignals(cmd *exec.Cmd) {
signal.Notify(sigChan, syscall.SIGQUIT)
}
-func inCodes(code int, codes interface{}) bool {
+func inCodes(code int, codes []int) bool {
if codes != nil {
- codesArray, ok := codes.([]interface{})
- if !ok {
- return false
- }
- for _, c := range codesArray {
- var num float64
- num, ok = c.(float64)
- if ok && code == int(num) {
+ for _, c := range codes {
+ if code == c {
return true
}
}
@@ -149,76 +143,44 @@ func (s PermFail) Error() string {
return "PermFail"
}
-func runner(api arvadosclient.ArvadosClient,
- jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
- taskStruct arvadosclient.Dict) error {
+func runner(api arvadosclient.IArvadosClient,
+ jobUuid, taskUuid, tmpdir, keepmount string,
+ jobStruct Job, taskStruct Task) error {
var err error
- var ok bool
- var jobp, taskp map[string]interface{}
- jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
- if !ok {
- return errors.New("Could not cast job script_parameters to map")
- }
-
- taskp, ok = taskStruct["parameters"].(map[string]interface{})
- if !ok {
- return errors.New("Could not cast task parameters to map")
- }
+ taskp := taskStruct.parameters
// If this is task 0 and there are multiple tasks, dispatch subtasks
// and exit.
- if taskStruct["sequence"] == 0.0 {
- var tasks []interface{}
- tasks, ok = jobp["tasks"].([]interface{})
- if !ok {
- return errors.New("Could not cast tasks to array")
- }
-
- if len(tasks) == 1 {
- taskp = tasks[0].(map[string]interface{})
+ if taskStruct.sequence == 0 {
+ if len(jobStruct.script_parameters.tasks) == 1 {
+ taskp = jobStruct.script_parameters.tasks[0]
} else {
- for task := range tasks {
- err := api.Call("POST", "job_tasks", "", "",
- arvadosclient.Dict{
- "job_uuid": jobUuid,
- "created_by_job_task_uuid": "",
- "sequence": 1,
- "parameters": task},
+ for _, task := range jobStruct.script_parameters.tasks {
+ err := api.Create("job_tasks",
+ map[string]interface{}{
+ "job_task": Task{job_uuid: jobUuid,
+ created_by_job_task_uuid: taskUuid,
+ sequence: 1,
+ parameters: task}},
nil)
if err != nil {
return TempFail{err}
}
}
- err = api.Call("PUT", "job_tasks", taskUuid, "",
- arvadosclient.Dict{
- "job_task": arvadosclient.Dict{
- "output": "",
- "success": true,
- "progress": 1.0}},
+ err = api.Update("job_tasks", taskUuid,
+ map[string]interface{}{
+ "job_task": Task{
+ output: "",
+ success: true,
+ progress: 1.0}},
nil)
return nil
}
}
// Set up subprocess
- var commandline []string
- var commandsarray []interface{}
-
- commandsarray, ok = taskp["command"].([]interface{})
- if !ok {
- return errors.New("Could not cast commands to array")
- }
-
- for _, c := range commandsarray {
- var cstr string
- cstr, ok = c.(string)
- if !ok {
- return errors.New("Could not cast command argument to string")
- }
- commandline = append(commandline, cstr)
- }
- cmd := exec.Command(commandline[0], commandline[1:]...)
+ cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...)
var outdir string
outdir, err = setupDirectories(tmpdir)
@@ -251,11 +213,11 @@ func runner(api arvadosclient.ArvadosClient,
exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
- if inCodes(exitCode, taskp["task.successCodes"]) {
+ if inCodes(exitCode, taskp.successCodes) {
status = success
- } else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
+ } else if inCodes(exitCode, taskp.permanentFailCodes) {
status = permfail
- } else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
+ } else if inCodes(exitCode, taskp.temporaryFailCodes) {
os.Exit(TASK_TEMPFAIL)
} else if cmd.ProcessState.Success() {
status = success
@@ -267,9 +229,9 @@ func runner(api arvadosclient.ArvadosClient,
// TODO
// Set status
- err = api.Call("PUT", "job_tasks", taskUuid, "",
- arvadosclient.Dict{
- "job_task": arvadosclient.Dict{
+ err = api.Update("job_tasks", taskUuid,
+ map[string]interface{}{
+ "job_task": map[string]interface{}{
"output": "",
"success": status == success,
"progress": 1.0}},
@@ -298,8 +260,17 @@ func main() {
tmpdir := os.Getenv("TASK_WORK")
keepmount := os.Getenv("TASK_KEEPMOUNT")
- jobStruct := getRecord(api, "jobs", jobUuid)
- taskStruct := getRecord(api, "job_tasks", taskUuid)
+ var jobStruct Job
+ var taskStruct Task
+
+ err = api.Get("jobs", jobUuid, nil, &jobStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
+ err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
index 7d4c62a..127c434 100644
--- a/sdk/go/crunchrunner/crunchrunner_test.go
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -1,16 +1,9 @@
package main
import (
- "crypto/md5"
- "flag"
- "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
- "io"
"io/ioutil"
- "log"
- "net"
- "net/http"
"os"
"testing"
)
@@ -20,6 +13,203 @@ func Test(t *testing.T) {
TestingT(t)
}
+type TestSuite struct{}
+
// Gocheck boilerplate
-var _ = Suite(&ServerRequiredSuite{})
-var _ = Suite(&StandaloneSuite{})
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+}
+
+func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+ return nil
+}
+
+func (t ArvTestClient) Delete(resource string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (t ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (t ArvTestClient) List(resource string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (s *TestSuite) TestSimpleRun(c *C) {
+
+ api := ArvTestClient{}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"echo", "foo"}}}}},
+ Task{sequence: 0})
+ c.Check(err, IsNil)
+
+}
+
+func checkOutput(c *C, tmpdir string) {
+ file, err := os.Open(tmpdir + "/" + "outdir/output.txt")
+ c.Assert(err, IsNil)
+
+ data := make([]byte, 100)
+ var count int
+ count, err = file.Read(data)
+ c.Assert(err, IsNil)
+ c.Check(string(data[0:count]), Equals, "foo\n")
+}
+
+func (s *TestSuite) TestSimpleRunSubtask(c *C) {
+
+ api := ArvTestClient{}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{
+ TaskDef{commands: []string{"echo", "bar"}},
+ TaskDef{commands: []string{"echo", "foo"}}}}},
+ Task{parameters: TaskDef{
+ commands: []string{"echo", "foo"},
+ stdout: "output.txt"},
+ sequence: 1})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestRedirect(c *C) {
+
+ api := ArvTestClient{}
+
+ tmpfile, _ := ioutil.TempFile("", "")
+ tmpfile.Write([]byte("foo\n"))
+ tmpfile.Sync()
+ defer tmpfile.Close()
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"cat"},
+ stdout: "output.txt",
+ stdin: tmpfile.Name()}}}},
+ Task{sequence: 0})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnv(c *C) {
+
+ api := ArvTestClient{}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+ commands: []string{"/bin/sh", "-c", "echo $BAR"},
+ stdout: "output.txt",
+ env: map[string]string{"BAR": "foo"}}}}},
+ Task{sequence: 0})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+type SubtaskTestClient struct {
+ c *C
+ parms []Task
+ i int
+}
+
+func (t *SubtaskTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+ t.c.Check(resourceType, Equals, "job_tasks")
+ t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": t.parms[t.i]})
+ t.i += 1
+ return nil
+}
+
+func (t SubtaskTestClient) Delete(resource string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (t SubtaskTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (t SubtaskTestClient) List(resource string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (s *TestSuite) TestScheduleSubtask(c *C) {
+
+ api := SubtaskTestClient{c, []Task{
+ Task{job_uuid: "zzzz-8i9sb-111111111111111",
+ created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+ sequence: 1,
+ parameters: TaskDef{
+ commands: []string{"echo", "bar"}}},
+ Task{job_uuid: "zzzz-8i9sb-111111111111111",
+ created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+ sequence: 1,
+ parameters: TaskDef{
+ commands: []string{"echo", "foo"}}}},
+ 0}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(&api,
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{script_parameters: Tasks{[]TaskDef{
+ TaskDef{commands: []string{"echo", "bar"}},
+ TaskDef{commands: []string{"echo", "foo"}}}}},
+ Task{sequence: 0})
+ c.Check(err, IsNil)
+
+}
commit 4c2855c80c11b60be9e5c6af384a0a90fb66ae16
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 21 09:03:27 2015 -0400
7582: Crunchrunner work in progress.
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
new file mode 100644
index 0000000..9f2ae25
--- /dev/null
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -0,0 +1,316 @@
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ //"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "errors"
+ "log"
+ "os"
+ "os/exec"
+ "os/signal"
+ "syscall"
+)
+
+func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
+ r = make(arvadosclient.Dict)
+ err := api.Get(rsc, uuid, nil, &r)
+ if err != nil {
+ log.Fatal(err)
+ }
+ return r
+}
+
+func setupDirectories(tmpdir string) (outdir string, err error) {
+ err = os.Chdir(tmpdir)
+ if err != nil {
+ return "", err
+ }
+
+ err = os.Mkdir("tmpdir", 0700)
+ if err != nil {
+ return "", err
+ }
+
+ err = os.Mkdir("outdir", 0700)
+ if err != nil {
+ return "", err
+ }
+
+ os.Chdir("outdir")
+ if err != nil {
+ return "", err
+ }
+
+ outdir, err = os.Getwd()
+ if err != nil {
+ return "", err
+ }
+
+ return outdir, nil
+}
+
+func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
+ var err error
+
+ if taskp["task.vwd"] != nil {
+ // Set up VWD symlinks in outdir
+ // TODO
+ }
+
+ if taskp["task.stdin"] != nil {
+ stdin, ok := taskp["task.stdin"].(string)
+ if !ok {
+ return errors.New("Could not cast task.stdin to string")
+ }
+ // Set up stdin redirection
+ cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
+ if err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ if taskp["task.stdout"] != nil {
+ stdout, ok := taskp["task.stdout"].(string)
+ if !ok {
+ return errors.New("Could not cast task.stdout to string")
+ }
+
+ // Set up stdout redirection
+ cmd.Stdout, err = os.Open(outdir + "/" + stdout)
+ if err != nil {
+ log.Fatal(err)
+ }
+ } else {
+ cmd.Stdout = os.Stdout
+ }
+
+ if taskp["task.env"] != nil {
+ taskenv, ok := taskp["task.env"].(map[string]interface{})
+ if !ok {
+ return errors.New("Could not cast task.env to map")
+ }
+
+ // Set up subprocess environment
+ cmd.Env = os.Environ()
+ for k, v := range taskenv {
+ var vstr string
+ vstr, ok = v.(string)
+ if !ok {
+ return errors.New("Could not cast environment value to string")
+ }
+ cmd.Env = append(cmd.Env, k+"="+vstr)
+ }
+ }
+ return nil
+}
+
+func setupSignals(cmd *exec.Cmd) {
+ // Set up signal handlers
+ // Forward SIGINT, SIGTERM and SIGQUIT to inner process
+ sigChan := make(chan os.Signal, 1)
+ go func(sig <-chan os.Signal) {
+ catch := <-sig
+ if cmd.Process != nil {
+ cmd.Process.Signal(catch)
+ }
+ }(sigChan)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
+ signal.Notify(sigChan, syscall.SIGQUIT)
+}
+
+func inCodes(code int, codes interface{}) bool {
+ if codes != nil {
+ codesArray, ok := codes.([]interface{})
+ if !ok {
+ return false
+ }
+ for _, c := range codesArray {
+ var num float64
+ num, ok = c.(float64)
+ if ok && code == int(num) {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+const TASK_TEMPFAIL = 111
+
+type TempFail struct{ InnerError error }
+type PermFail struct{}
+
+func (s TempFail) Error() string {
+ return s.InnerError.Error()
+}
+
+func (s PermFail) Error() string {
+ return "PermFail"
+}
+
+func runner(api arvadosclient.ArvadosClient,
+ jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
+ taskStruct arvadosclient.Dict) error {
+
+ var err error
+ var ok bool
+ var jobp, taskp map[string]interface{}
+ jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
+ if !ok {
+ return errors.New("Could not cast job script_parameters to map")
+ }
+
+ taskp, ok = taskStruct["parameters"].(map[string]interface{})
+ if !ok {
+ return errors.New("Could not cast task parameters to map")
+ }
+
+ // If this is task 0 and there are multiple tasks, dispatch subtasks
+ // and exit.
+ if taskStruct["sequence"] == 0.0 {
+ var tasks []interface{}
+ tasks, ok = jobp["tasks"].([]interface{})
+ if !ok {
+ return errors.New("Could not cast tasks to array")
+ }
+
+ if len(tasks) == 1 {
+ taskp = tasks[0].(map[string]interface{})
+ } else {
+ for task := range tasks {
+ err := api.Call("POST", "job_tasks", "", "",
+ arvadosclient.Dict{
+ "job_uuid": jobUuid,
+ "created_by_job_task_uuid": "",
+ "sequence": 1,
+ "parameters": task},
+ nil)
+ if err != nil {
+ return TempFail{err}
+ }
+ }
+ err = api.Call("PUT", "job_tasks", taskUuid, "",
+ arvadosclient.Dict{
+ "job_task": arvadosclient.Dict{
+ "output": "",
+ "success": true,
+ "progress": 1.0}},
+ nil)
+ return nil
+ }
+ }
+
+ // Set up subprocess
+ var commandline []string
+ var commandsarray []interface{}
+
+ commandsarray, ok = taskp["command"].([]interface{})
+ if !ok {
+ return errors.New("Could not cast commands to array")
+ }
+
+ for _, c := range commandsarray {
+ var cstr string
+ cstr, ok = c.(string)
+ if !ok {
+ return errors.New("Could not cast command argument to string")
+ }
+ commandline = append(commandline, cstr)
+ }
+ cmd := exec.Command(commandline[0], commandline[1:]...)
+
+ var outdir string
+ outdir, err = setupDirectories(tmpdir)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ cmd.Dir = outdir
+
+ err = setupCommand(cmd, taskp, keepmount, outdir)
+ if err != nil {
+ return err
+ }
+
+ setupSignals(cmd)
+
+ // Run subprocess and wait for it to complete
+ log.Printf("Running %v", cmd.Args)
+
+ err = cmd.Run()
+
+ if err != nil {
+ return TempFail{err}
+ }
+
+ const success = 1
+ const permfail = 2
+ const tempfail = 2
+ var status int
+
+ exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
+
+ if inCodes(exitCode, taskp["task.successCodes"]) {
+ status = success
+ } else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
+ status = permfail
+ } else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
+ os.Exit(TASK_TEMPFAIL)
+ } else if cmd.ProcessState.Success() {
+ status = success
+ } else {
+ status = permfail
+ }
+
+ // Upload output directory
+ // TODO
+
+ // Set status
+ err = api.Call("PUT", "job_tasks", taskUuid, "",
+ arvadosclient.Dict{
+ "job_task": arvadosclient.Dict{
+ "output": "",
+ "success": status == success,
+ "progress": 1.0}},
+ nil)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ if status == success {
+ return nil
+ } else {
+ return PermFail{}
+ }
+}
+
+func main() {
+ syscall.Umask(0077)
+
+ api, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ jobUuid := os.Getenv("JOB_UUID")
+ taskUuid := os.Getenv("TASK_UUID")
+ tmpdir := os.Getenv("TASK_WORK")
+ keepmount := os.Getenv("TASK_KEEPMOUNT")
+
+ jobStruct := getRecord(api, "jobs", jobUuid)
+ taskStruct := getRecord(api, "job_tasks", taskUuid)
+
+ err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+
+ if err == nil {
+ os.Exit(0)
+ } else if _, ok := err.(TempFail); ok {
+ log.Print(err)
+ os.Exit(TASK_TEMPFAIL)
+ } else if _, ok := err.(PermFail); ok {
+ os.Exit(1)
+ } else {
+ log.Fatal(err)
+ }
+}
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
new file mode 100644
index 0000000..7d4c62a
--- /dev/null
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -0,0 +1,25 @@
+package main
+
+import (
+ "crypto/md5"
+ "flag"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ . "gopkg.in/check.v1"
+ "io"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&StandaloneSuite{})
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list