[ARVADOS] created: 7600537f3f34ee88a76688dbb0e1d73723905fa7

git at public.curoverse.com git at public.curoverse.com
Wed Oct 21 16:41:40 EDT 2015


        at  7600537f3f34ee88a76688dbb0e1d73723905fa7 (commit)


commit 7600537f3f34ee88a76688dbb0e1d73723905fa7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 21 16:41:35 2015 -0400

    7582: Uploader mostly done, writing tests

diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
new file mode 100644
index 0000000..2196a9d
--- /dev/null
+++ b/sdk/go/crunchrunner/upload.go
@@ -0,0 +1,176 @@
+package main
+
+import (
+	"bytes"
+	"crypto/md5"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"io"
+	"log"
+	"os"
+	"path/filepath"
+)
+
+type Block struct {
+	data   []byte
+	offset int64
+}
+
+type ManifestStreamWriter struct {
+	*ManifestWriter
+	*manifest.ManifestStream
+	offset int64
+	*Block
+	uploader chan *Block
+}
+
+type IKeepClient interface {
+	PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
+	// Needed to conform to Writer interface, but not implemented
+	// because io.Copy will actually use ReadFrom instead.
+	return 0, nil
+}
+
+func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
+	var total int64
+	var count int
+
+	for err == nil {
+		if m.Block == nil {
+			m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+		}
+		count, err = r.Read(m.Block.data[m.Block.offset:])
+		total += int64(count)
+		m.Block.offset += int64(count)
+		if count > 0 {
+			if m.Block.offset == keepclient.BLOCKSIZE {
+				m.uploader <- m.Block
+				m.Block = nil
+			}
+		}
+	}
+
+	return total, err
+}
+
+func (m *ManifestStreamWriter) goUpload() {
+	select {
+	case block, valid := <-m.uploader:
+		if !valid {
+			return
+		}
+		hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+		signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+		m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+	}
+
+}
+
+type ManifestWriter struct {
+	IKeepClient
+	stripPrefix string
+	Streams     map[string]*ManifestStreamWriter
+}
+
+type walker struct {
+	currentDir string
+	m          *ManifestWriter
+}
+
+func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
+	log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
+
+	if info.IsDir() {
+		if path == w.currentDir {
+			return nil
+		}
+		return filepath.Walk(path, walker{path, w.m}.WalkFunc)
+	}
+	m := w.m
+
+	dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
+	fn := path[(len(path) - len(info.Name())):]
+
+	if m.Streams[dir] == nil {
+		m.Streams[dir] = &ManifestStreamWriter{
+			m,
+			&manifest.ManifestStream{StreamName: dir},
+			0,
+			nil,
+			make(chan *Block)}
+		go m.Streams[dir].goUpload()
+	}
+
+	stream := m.Streams[dir]
+
+	fileStart := stream.offset
+
+	file, err := os.Open(path)
+	if err != nil {
+		return err
+	}
+
+	var count int64
+	count, err = io.Copy(stream, file)
+	if err != nil && err != io.EOF {
+		return err
+	}
+
+	stream.offset += count
+
+	stream.ManifestStream.Files = append(stream.ManifestStream.Files,
+		fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+
+	return nil
+}
+
+func (m *ManifestWriter) Finish() {
+	for _, v := range m.Streams {
+		if v.uploader != nil {
+			if v.Block != nil {
+				v.uploader <- v.Block
+			}
+			close(v.uploader)
+			v.uploader = nil
+		}
+	}
+}
+
+func (m *ManifestWriter) ManifestText() string {
+	m.Finish()
+	var buf bytes.Buffer
+	for k, v := range m.Streams {
+		if k == "" {
+			buf.WriteString(".")
+		} else {
+			buf.WriteString("./" + k)
+		}
+		for _, b := range v.Blocks {
+			buf.WriteString(" ")
+			buf.WriteString(b)
+		}
+		for _, f := range v.Files {
+			buf.WriteString(" ")
+			buf.WriteString(f)
+		}
+		buf.WriteString("\n")
+	}
+	return buf.String()
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+	mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
+	err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
+	mw.Finish()
+
+	if err != nil {
+		return "", err
+	} else {
+		return mw.ManifestText(), nil
+	}
+
+}
diff --git a/sdk/go/crunchrunner/upload_test.go b/sdk/go/crunchrunner/upload_test.go
new file mode 100644
index 0000000..6e0e103
--- /dev/null
+++ b/sdk/go/crunchrunner/upload_test.go
@@ -0,0 +1,66 @@
+package main
+
+import (
+	"crypto/md5"
+	"fmt"
+	. "gopkg.in/check.v1"
+	"io/ioutil"
+	"os"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+type KeepTestClient struct {
+}
+
+func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	return fmt.Sprintf("%x+%v", md5.Sum(buf), len(buf)), len(buf), nil
+}
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+	str, err := WriteTree(KeepTestClient{}, tmpdir)
+	c.Check(err, IsNil)
+	c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+	ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+	str, err := WriteTree(KeepTestClient{}, tmpdir)
+	c.Check(err, IsNil)
+	c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	os.Mkdir(tmpdir+"/"+"subdir", 0600)
+
+	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+	ioutil.WriteFile(tmpdir+"/"+"subdir/file2.txt", []byte("bar"), 0600)
+
+	str, err := WriteTree(KeepTestClient{}, tmpdir)
+	c.Check(err, IsNil)
+	c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file1.txt
+./subdir acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file2.txt
+`)
+}

commit 5a4cb9d3957905a02716761edb1be662edad0312
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 21 13:38:57 2015 -0400

    7582: More tests, add vwd support

diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index b8b6234..0ca7ce9 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -12,13 +12,13 @@ import (
 
 type TaskDef struct {
 	commands           []string          `json:"commands"`
-	env                map[string]string `json:"task.env"`
-	stdin              string            `json:"task.stdin"`
-	stdout             string            `json:"task.stdout"`
-	vwd                map[string]string `json:"task.vwd"`
-	successCodes       []int             `json:"task.successCodes"`
-	permanentFailCodes []int             `json:"task.permanentFailCodes"`
-	temporaryFailCodes []int             `json:"task.temporaryFailCodes"`
+	env                map[string]string `json:"env"`
+	stdin              string            `json:"stdin"`
+	stdout             string            `json:"stdout"`
+	vwd                map[string]string `json:"vwd"`
+	successCodes       []int             `json:"successCodes"`
+	permanentFailCodes []int             `json:"permanentFailCodes"`
+	temporaryFailCodes []int             `json:"temporaryFailCodes"`
 }
 
 type Tasks struct {
@@ -39,7 +39,7 @@ type Task struct {
 	progress                 float32 `json:"sequence"`
 }
 
-func setupDirectories(tmpdir string) (outdir string, err error) {
+func setupDirectories(tmpdir, taskUuid string) (outdir string, err error) {
 	err = os.Chdir(tmpdir)
 	if err != nil {
 		return "", err
@@ -50,12 +50,12 @@ func setupDirectories(tmpdir string) (outdir string, err error) {
 		return "", err
 	}
 
-	err = os.Mkdir("outdir", 0700)
+	err = os.Mkdir(taskUuid, 0700)
 	if err != nil {
 		return "", err
 	}
 
-	os.Chdir("outdir")
+	os.Chdir(taskUuid)
 	if err != nil {
 		return "", err
 	}
@@ -71,16 +71,17 @@ func setupDirectories(tmpdir string) (outdir string, err error) {
 func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error {
 	var err error
 
-	//if taskp.vwd != nil {
-	// Set up VWD symlinks in outdir
-	// TODO
-	//}
+	if taskp.vwd != nil {
+		for k, v := range taskp.vwd {
+			os.Symlink(keepmount+"/"+v, outdir+"/"+k)
+		}
+	}
 
 	if taskp.stdin != "" {
 		// Set up stdin redirection
 		cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin)
 		if err != nil {
-			log.Fatal(err)
+			return err
 		}
 	}
 
@@ -88,7 +89,7 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error
 		// Set up stdout redirection
 		cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout)
 		if err != nil {
-			log.Fatal(err)
+			return err
 		}
 	} else {
 		cmd.Stdout = os.Stdout
@@ -183,7 +184,7 @@ func runner(api arvadosclient.IArvadosClient,
 	cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...)
 
 	var outdir string
-	outdir, err = setupDirectories(tmpdir)
+	outdir, err = setupDirectories(tmpdir, taskUuid)
 	if err != nil {
 		return TempFail{err}
 	}
@@ -203,7 +204,11 @@ func runner(api arvadosclient.IArvadosClient,
 	err = cmd.Run()
 
 	if err != nil {
-		return TempFail{err}
+		// Run() returns ExitError on non-zero exit code, but we handle
+		// that down below.  So only return if it's not ExitError.
+		if _, ok := err.(*exec.ExitError); !ok {
+			return TempFail{err}
+		}
 	}
 
 	const success = 1
@@ -218,7 +223,7 @@ func runner(api arvadosclient.IArvadosClient,
 	} else if inCodes(exitCode, taskp.permanentFailCodes) {
 		status = permfail
 	} else if inCodes(exitCode, taskp.temporaryFailCodes) {
-		os.Exit(TASK_TEMPFAIL)
+		return TempFail{nil}
 	} else if cmd.ProcessState.Success() {
 		status = success
 	} else {
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
index 127c434..29b092b 100644
--- a/sdk/go/crunchrunner/crunchrunner_test.go
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -63,7 +63,7 @@ func (s *TestSuite) TestSimpleRun(c *C) {
 }
 
 func checkOutput(c *C, tmpdir string) {
-	file, err := os.Open(tmpdir + "/" + "outdir/output.txt")
+	file, err := os.Open(tmpdir + "/zzzz-ot0gb-111111111111111/output.txt")
 	c.Assert(err, IsNil)
 
 	data := make([]byte, 100)
@@ -105,8 +105,8 @@ func (s *TestSuite) TestRedirect(c *C) {
 
 	tmpfile, _ := ioutil.TempFile("", "")
 	tmpfile.Write([]byte("foo\n"))
-	tmpfile.Sync()
-	defer tmpfile.Close()
+	tmpfile.Close()
+	defer os.Remove(tmpfile.Name())
 
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
@@ -213,3 +213,111 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
 	c.Check(err, IsNil)
 
 }
+
+func (s *TestSuite) TestRunFail(c *C) {
+
+	api := ArvTestClient{}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands: []string{"/bin/sh", "-c", "exit 1"}}}}},
+		Task{sequence: 0})
+	c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunSuccessCode(c *C) {
+
+	api := ArvTestClient{}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands:     []string{"/bin/sh", "-c", "exit 1"},
+			successCodes: []int{0, 1}}}}},
+		Task{sequence: 0})
+	c.Check(err, IsNil)
+}
+
+func (s *TestSuite) TestRunFailCode(c *C) {
+	api := ArvTestClient{}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands:           []string{"/bin/sh", "-c", "exit 0"},
+			permanentFailCodes: []int{0, 1}}}}},
+		Task{sequence: 0})
+	c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunTempFailCode(c *C) {
+	api := ArvTestClient{}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands:           []string{"/bin/sh", "-c", "exit 1"},
+			temporaryFailCodes: []int{1}}}}},
+		Task{sequence: 0})
+	c.Check(err, FitsTypeOf, TempFail{})
+}
+
+func (s *TestSuite) TestVwd(c *C) {
+	api := ArvTestClient{}
+
+	tmpfile, _ := ioutil.TempFile("", "")
+	tmpfile.Write([]byte("foo\n"))
+	tmpfile.Close()
+	defer os.Remove(tmpfile.Name())
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands: []string{"ls", "output.txt"},
+			vwd: map[string]string{
+				"output.txt": tmpfile.Name()}}}}},
+		Task{sequence: 0})
+	c.Check(err, IsNil)
+	checkOutput(c, tmpdir)
+}

commit f15a9a1ad40f589d3c40f856a95e1d5556ee7ca1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 21 13:04:07 2015 -0400

    7582: Working on tests.

diff --git a/sdk/go/arvadosclient/arvadosclient.go b/sdk/go/arvadosclient/arvadosclient.go
index 1cce0a7..09170ba 100644
--- a/sdk/go/arvadosclient/arvadosclient.go
+++ b/sdk/go/arvadosclient/arvadosclient.go
@@ -78,6 +78,14 @@ type ArvadosClient struct {
 	DiscoveryDoc Dict
 }
 
+type IArvadosClient interface {
+	Create(resourceType string, parameters Dict, output interface{}) error
+	Delete(resource string, uuid string, parameters Dict, output interface{}) (err error)
+	Update(resourceType string, uuid string, parameters Dict, output interface{}) (err error)
+	Get(resourceType string, uuid string, parameters Dict, output interface{}) (err error)
+	List(resource string, parameters Dict, output interface{}) (err error)
+}
+
 // Create a new ArvadosClient, initialized with standard Arvados environment
 // variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally)
 // ARVADOS_API_HOST_INSECURE.
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index 9f2ae25..b8b6234 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -3,7 +3,6 @@ package main
 import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	//"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"errors"
 	"log"
 	"os"
 	"os/exec"
@@ -11,13 +10,33 @@ import (
 	"syscall"
 )
 
-func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
-	r = make(arvadosclient.Dict)
-	err := api.Get(rsc, uuid, nil, &r)
-	if err != nil {
-		log.Fatal(err)
-	}
-	return r
+type TaskDef struct {
+	commands           []string          `json:"commands"`
+	env                map[string]string `json:"task.env"`
+	stdin              string            `json:"task.stdin"`
+	stdout             string            `json:"task.stdout"`
+	vwd                map[string]string `json:"task.vwd"`
+	successCodes       []int             `json:"task.successCodes"`
+	permanentFailCodes []int             `json:"task.permanentFailCodes"`
+	temporaryFailCodes []int             `json:"task.temporaryFailCodes"`
+}
+
+type Tasks struct {
+	tasks []TaskDef `json:"script_parameters"`
+}
+
+type Job struct {
+	script_parameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+	job_uuid                 string  `json:"job_uuid"`
+	created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
+	parameters               TaskDef `json:"parameters"`
+	sequence                 int     `json:"sequence"`
+	output                   string  `json:"output"`
+	success                  bool    `json:"success"`
+	progress                 float32 `json:"sequence"`
 }
 
 func setupDirectories(tmpdir string) (outdir string, err error) {
@@ -49,34 +68,25 @@ func setupDirectories(tmpdir string) (outdir string, err error) {
 	return outdir, nil
 }
 
-func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, keepmount, outdir string) error {
 	var err error
 
-	if taskp["task.vwd"] != nil {
-		// Set up VWD symlinks in outdir
-		// TODO
-	}
+	//if taskp.vwd != nil {
+	// Set up VWD symlinks in outdir
+	// TODO
+	//}
 
-	if taskp["task.stdin"] != nil {
-		stdin, ok := taskp["task.stdin"].(string)
-		if !ok {
-			return errors.New("Could not cast task.stdin to string")
-		}
+	if taskp.stdin != "" {
 		// Set up stdin redirection
-		cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
+		cmd.Stdin, err = os.Open(keepmount + "/" + taskp.stdin)
 		if err != nil {
 			log.Fatal(err)
 		}
 	}
 
-	if taskp["task.stdout"] != nil {
-		stdout, ok := taskp["task.stdout"].(string)
-		if !ok {
-			return errors.New("Could not cast task.stdout to string")
-		}
-
+	if taskp.stdout != "" {
 		// Set up stdout redirection
-		cmd.Stdout, err = os.Open(outdir + "/" + stdout)
+		cmd.Stdout, err = os.Create(outdir + "/" + taskp.stdout)
 		if err != nil {
 			log.Fatal(err)
 		}
@@ -84,21 +94,11 @@ func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir
 		cmd.Stdout = os.Stdout
 	}
 
-	if taskp["task.env"] != nil {
-		taskenv, ok := taskp["task.env"].(map[string]interface{})
-		if !ok {
-			return errors.New("Could not cast task.env to map")
-		}
-
+	if taskp.env != nil {
 		// Set up subprocess environment
 		cmd.Env = os.Environ()
-		for k, v := range taskenv {
-			var vstr string
-			vstr, ok = v.(string)
-			if !ok {
-				return errors.New("Could not cast environment value to string")
-			}
-			cmd.Env = append(cmd.Env, k+"="+vstr)
+		for k, v := range taskp.env {
+			cmd.Env = append(cmd.Env, k+"="+v)
 		}
 	}
 	return nil
@@ -119,16 +119,10 @@ func setupSignals(cmd *exec.Cmd) {
 	signal.Notify(sigChan, syscall.SIGQUIT)
 }
 
-func inCodes(code int, codes interface{}) bool {
+func inCodes(code int, codes []int) bool {
 	if codes != nil {
-		codesArray, ok := codes.([]interface{})
-		if !ok {
-			return false
-		}
-		for _, c := range codesArray {
-			var num float64
-			num, ok = c.(float64)
-			if ok && code == int(num) {
+		for _, c := range codes {
+			if code == c {
 				return true
 			}
 		}
@@ -149,76 +143,44 @@ func (s PermFail) Error() string {
 	return "PermFail"
 }
 
-func runner(api arvadosclient.ArvadosClient,
-	jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
-	taskStruct arvadosclient.Dict) error {
+func runner(api arvadosclient.IArvadosClient,
+	jobUuid, taskUuid, tmpdir, keepmount string,
+	jobStruct Job, taskStruct Task) error {
 
 	var err error
-	var ok bool
-	var jobp, taskp map[string]interface{}
-	jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
-	if !ok {
-		return errors.New("Could not cast job script_parameters to map")
-	}
-
-	taskp, ok = taskStruct["parameters"].(map[string]interface{})
-	if !ok {
-		return errors.New("Could not cast task parameters to map")
-	}
+	taskp := taskStruct.parameters
 
 	// If this is task 0 and there are multiple tasks, dispatch subtasks
 	// and exit.
-	if taskStruct["sequence"] == 0.0 {
-		var tasks []interface{}
-		tasks, ok = jobp["tasks"].([]interface{})
-		if !ok {
-			return errors.New("Could not cast tasks to array")
-		}
-
-		if len(tasks) == 1 {
-			taskp = tasks[0].(map[string]interface{})
+	if taskStruct.sequence == 0 {
+		if len(jobStruct.script_parameters.tasks) == 1 {
+			taskp = jobStruct.script_parameters.tasks[0]
 		} else {
-			for task := range tasks {
-				err := api.Call("POST", "job_tasks", "", "",
-					arvadosclient.Dict{
-						"job_uuid":                 jobUuid,
-						"created_by_job_task_uuid": "",
-						"sequence":                 1,
-						"parameters":               task},
+			for _, task := range jobStruct.script_parameters.tasks {
+				err := api.Create("job_tasks",
+					map[string]interface{}{
+						"job_task": Task{job_uuid: jobUuid,
+							created_by_job_task_uuid: taskUuid,
+							sequence:                 1,
+							parameters:               task}},
 					nil)
 				if err != nil {
 					return TempFail{err}
 				}
 			}
-			err = api.Call("PUT", "job_tasks", taskUuid, "",
-				arvadosclient.Dict{
-					"job_task": arvadosclient.Dict{
-						"output":   "",
-						"success":  true,
-						"progress": 1.0}},
+			err = api.Update("job_tasks", taskUuid,
+				map[string]interface{}{
+					"job_task": Task{
+						output:   "",
+						success:  true,
+						progress: 1.0}},
 				nil)
 			return nil
 		}
 	}
 
 	// Set up subprocess
-	var commandline []string
-	var commandsarray []interface{}
-
-	commandsarray, ok = taskp["command"].([]interface{})
-	if !ok {
-		return errors.New("Could not cast commands to array")
-	}
-
-	for _, c := range commandsarray {
-		var cstr string
-		cstr, ok = c.(string)
-		if !ok {
-			return errors.New("Could not cast command argument to string")
-		}
-		commandline = append(commandline, cstr)
-	}
-	cmd := exec.Command(commandline[0], commandline[1:]...)
+	cmd := exec.Command(taskp.commands[0], taskp.commands[1:]...)
 
 	var outdir string
 	outdir, err = setupDirectories(tmpdir)
@@ -251,11 +213,11 @@ func runner(api arvadosclient.ArvadosClient,
 
 	exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
 
-	if inCodes(exitCode, taskp["task.successCodes"]) {
+	if inCodes(exitCode, taskp.successCodes) {
 		status = success
-	} else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
+	} else if inCodes(exitCode, taskp.permanentFailCodes) {
 		status = permfail
-	} else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
+	} else if inCodes(exitCode, taskp.temporaryFailCodes) {
 		os.Exit(TASK_TEMPFAIL)
 	} else if cmd.ProcessState.Success() {
 		status = success
@@ -267,9 +229,9 @@ func runner(api arvadosclient.ArvadosClient,
 	// TODO
 
 	// Set status
-	err = api.Call("PUT", "job_tasks", taskUuid, "",
-		arvadosclient.Dict{
-			"job_task": arvadosclient.Dict{
+	err = api.Update("job_tasks", taskUuid,
+		map[string]interface{}{
+			"job_task": map[string]interface{}{
 				"output":   "",
 				"success":  status == success,
 				"progress": 1.0}},
@@ -298,8 +260,17 @@ func main() {
 	tmpdir := os.Getenv("TASK_WORK")
 	keepmount := os.Getenv("TASK_KEEPMOUNT")
 
-	jobStruct := getRecord(api, "jobs", jobUuid)
-	taskStruct := getRecord(api, "job_tasks", taskUuid)
+	var jobStruct Job
+	var taskStruct Task
+
+	err = api.Get("jobs", jobUuid, nil, &jobStruct)
+	if err != nil {
+		log.Fatal(err)
+	}
+	err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+	if err != nil {
+		log.Fatal(err)
+	}
 
 	err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
 
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
index 7d4c62a..127c434 100644
--- a/sdk/go/crunchrunner/crunchrunner_test.go
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -1,16 +1,9 @@
 package main
 
 import (
-	"crypto/md5"
-	"flag"
-	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	. "gopkg.in/check.v1"
-	"io"
 	"io/ioutil"
-	"log"
-	"net"
-	"net/http"
 	"os"
 	"testing"
 )
@@ -20,6 +13,203 @@ func Test(t *testing.T) {
 	TestingT(t)
 }
 
+type TestSuite struct{}
+
 // Gocheck boilerplate
-var _ = Suite(&ServerRequiredSuite{})
-var _ = Suite(&StandaloneSuite{})
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+}
+
+func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+	return nil
+}
+
+func (t ArvTestClient) Delete(resource string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (t ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (t ArvTestClient) List(resource string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (s *TestSuite) TestSimpleRun(c *C) {
+
+	api := ArvTestClient{}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands: []string{"echo", "foo"}}}}},
+		Task{sequence: 0})
+	c.Check(err, IsNil)
+
+}
+
+func checkOutput(c *C, tmpdir string) {
+	file, err := os.Open(tmpdir + "/" + "outdir/output.txt")
+	c.Assert(err, IsNil)
+
+	data := make([]byte, 100)
+	var count int
+	count, err = file.Read(data)
+	c.Assert(err, IsNil)
+	c.Check(string(data[0:count]), Equals, "foo\n")
+}
+
+func (s *TestSuite) TestSimpleRunSubtask(c *C) {
+
+	api := ArvTestClient{}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{
+			TaskDef{commands: []string{"echo", "bar"}},
+			TaskDef{commands: []string{"echo", "foo"}}}}},
+		Task{parameters: TaskDef{
+			commands: []string{"echo", "foo"},
+			stdout:   "output.txt"},
+			sequence: 1})
+	c.Check(err, IsNil)
+
+	checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestRedirect(c *C) {
+
+	api := ArvTestClient{}
+
+	tmpfile, _ := ioutil.TempFile("", "")
+	tmpfile.Write([]byte("foo\n"))
+	tmpfile.Sync()
+	defer tmpfile.Close()
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands: []string{"cat"},
+			stdout:   "output.txt",
+			stdin:    tmpfile.Name()}}}},
+		Task{sequence: 0})
+	c.Check(err, IsNil)
+
+	checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnv(c *C) {
+
+	api := ArvTestClient{}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{TaskDef{
+			commands: []string{"/bin/sh", "-c", "echo $BAR"},
+			stdout:   "output.txt",
+			env:      map[string]string{"BAR": "foo"}}}}},
+		Task{sequence: 0})
+	c.Check(err, IsNil)
+
+	checkOutput(c, tmpdir)
+}
+
+type SubtaskTestClient struct {
+	c     *C
+	parms []Task
+	i     int
+}
+
+func (t *SubtaskTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+	t.c.Check(resourceType, Equals, "job_tasks")
+	t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": t.parms[t.i]})
+	t.i += 1
+	return nil
+}
+
+func (t SubtaskTestClient) Delete(resource string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (t SubtaskTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (t SubtaskTestClient) List(resource string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (s *TestSuite) TestScheduleSubtask(c *C) {
+
+	api := SubtaskTestClient{c, []Task{
+		Task{job_uuid: "zzzz-8i9sb-111111111111111",
+			created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+			sequence:                 1,
+			parameters: TaskDef{
+				commands: []string{"echo", "bar"}}},
+		Task{job_uuid: "zzzz-8i9sb-111111111111111",
+			created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+			sequence:                 1,
+			parameters: TaskDef{
+				commands: []string{"echo", "foo"}}}},
+		0}
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	err := runner(&api,
+		"zzzz-8i9sb-111111111111111",
+		"zzzz-ot0gb-111111111111111",
+		tmpdir,
+		"",
+		Job{script_parameters: Tasks{[]TaskDef{
+			TaskDef{commands: []string{"echo", "bar"}},
+			TaskDef{commands: []string{"echo", "foo"}}}}},
+		Task{sequence: 0})
+	c.Check(err, IsNil)
+
+}

commit 4c2855c80c11b60be9e5c6af384a0a90fb66ae16
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 21 09:03:27 2015 -0400

    7582: Crunchrunner work in progress.

diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
new file mode 100644
index 0000000..9f2ae25
--- /dev/null
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -0,0 +1,316 @@
+package main
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	//"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"errors"
+	"log"
+	"os"
+	"os/exec"
+	"os/signal"
+	"syscall"
+)
+
+func getRecord(api arvadosclient.ArvadosClient, rsc, uuid string) (r arvadosclient.Dict) {
+	r = make(arvadosclient.Dict)
+	err := api.Get(rsc, uuid, nil, &r)
+	if err != nil {
+		log.Fatal(err)
+	}
+	return r
+}
+
+func setupDirectories(tmpdir string) (outdir string, err error) {
+	err = os.Chdir(tmpdir)
+	if err != nil {
+		return "", err
+	}
+
+	err = os.Mkdir("tmpdir", 0700)
+	if err != nil {
+		return "", err
+	}
+
+	err = os.Mkdir("outdir", 0700)
+	if err != nil {
+		return "", err
+	}
+
+	os.Chdir("outdir")
+	if err != nil {
+		return "", err
+	}
+
+	outdir, err = os.Getwd()
+	if err != nil {
+		return "", err
+	}
+
+	return outdir, nil
+}
+
+func setupCommand(cmd *exec.Cmd, taskp map[string]interface{}, keepmount, outdir string) error {
+	var err error
+
+	if taskp["task.vwd"] != nil {
+		// Set up VWD symlinks in outdir
+		// TODO
+	}
+
+	if taskp["task.stdin"] != nil {
+		stdin, ok := taskp["task.stdin"].(string)
+		if !ok {
+			return errors.New("Could not cast task.stdin to string")
+		}
+		// Set up stdin redirection
+		cmd.Stdin, err = os.Open(keepmount + "/" + stdin)
+		if err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	if taskp["task.stdout"] != nil {
+		stdout, ok := taskp["task.stdout"].(string)
+		if !ok {
+			return errors.New("Could not cast task.stdout to string")
+		}
+
+		// Set up stdout redirection
+		cmd.Stdout, err = os.Open(outdir + "/" + stdout)
+		if err != nil {
+			log.Fatal(err)
+		}
+	} else {
+		cmd.Stdout = os.Stdout
+	}
+
+	if taskp["task.env"] != nil {
+		taskenv, ok := taskp["task.env"].(map[string]interface{})
+		if !ok {
+			return errors.New("Could not cast task.env to map")
+		}
+
+		// Set up subprocess environment
+		cmd.Env = os.Environ()
+		for k, v := range taskenv {
+			var vstr string
+			vstr, ok = v.(string)
+			if !ok {
+				return errors.New("Could not cast environment value to string")
+			}
+			cmd.Env = append(cmd.Env, k+"="+vstr)
+		}
+	}
+	return nil
+}
+
+func setupSignals(cmd *exec.Cmd) {
+	// Set up signal handlers
+	// Forward SIGINT, SIGTERM and SIGQUIT to inner process
+	sigChan := make(chan os.Signal, 1)
+	go func(sig <-chan os.Signal) {
+		catch := <-sig
+		if cmd.Process != nil {
+			cmd.Process.Signal(catch)
+		}
+	}(sigChan)
+	signal.Notify(sigChan, syscall.SIGTERM)
+	signal.Notify(sigChan, syscall.SIGINT)
+	signal.Notify(sigChan, syscall.SIGQUIT)
+}
+
+func inCodes(code int, codes interface{}) bool {
+	if codes != nil {
+		codesArray, ok := codes.([]interface{})
+		if !ok {
+			return false
+		}
+		for _, c := range codesArray {
+			var num float64
+			num, ok = c.(float64)
+			if ok && code == int(num) {
+				return true
+			}
+		}
+	}
+	return false
+}
+
+const TASK_TEMPFAIL = 111
+
+type TempFail struct{ InnerError error }
+type PermFail struct{}
+
+func (s TempFail) Error() string {
+	return s.InnerError.Error()
+}
+
+func (s PermFail) Error() string {
+	return "PermFail"
+}
+
+func runner(api arvadosclient.ArvadosClient,
+	jobUuid, taskUuid, tmpdir, keepmount string, jobStruct,
+	taskStruct arvadosclient.Dict) error {
+
+	var err error
+	var ok bool
+	var jobp, taskp map[string]interface{}
+	jobp, ok = jobStruct["script_parameters"].(map[string]interface{})
+	if !ok {
+		return errors.New("Could not cast job script_parameters to map")
+	}
+
+	taskp, ok = taskStruct["parameters"].(map[string]interface{})
+	if !ok {
+		return errors.New("Could not cast task parameters to map")
+	}
+
+	// If this is task 0 and there are multiple tasks, dispatch subtasks
+	// and exit.
+	if taskStruct["sequence"] == 0.0 {
+		var tasks []interface{}
+		tasks, ok = jobp["tasks"].([]interface{})
+		if !ok {
+			return errors.New("Could not cast tasks to array")
+		}
+
+		if len(tasks) == 1 {
+			taskp = tasks[0].(map[string]interface{})
+		} else {
+			for task := range tasks {
+				err := api.Call("POST", "job_tasks", "", "",
+					arvadosclient.Dict{
+						"job_uuid":                 jobUuid,
+						"created_by_job_task_uuid": "",
+						"sequence":                 1,
+						"parameters":               task},
+					nil)
+				if err != nil {
+					return TempFail{err}
+				}
+			}
+			err = api.Call("PUT", "job_tasks", taskUuid, "",
+				arvadosclient.Dict{
+					"job_task": arvadosclient.Dict{
+						"output":   "",
+						"success":  true,
+						"progress": 1.0}},
+				nil)
+			return nil
+		}
+	}
+
+	// Set up subprocess
+	var commandline []string
+	var commandsarray []interface{}
+
+	commandsarray, ok = taskp["command"].([]interface{})
+	if !ok {
+		return errors.New("Could not cast commands to array")
+	}
+
+	for _, c := range commandsarray {
+		var cstr string
+		cstr, ok = c.(string)
+		if !ok {
+			return errors.New("Could not cast command argument to string")
+		}
+		commandline = append(commandline, cstr)
+	}
+	cmd := exec.Command(commandline[0], commandline[1:]...)
+
+	var outdir string
+	outdir, err = setupDirectories(tmpdir)
+	if err != nil {
+		return TempFail{err}
+	}
+
+	cmd.Dir = outdir
+
+	err = setupCommand(cmd, taskp, keepmount, outdir)
+	if err != nil {
+		return err
+	}
+
+	setupSignals(cmd)
+
+	// Run subprocess and wait for it to complete
+	log.Printf("Running %v", cmd.Args)
+
+	err = cmd.Run()
+
+	if err != nil {
+		return TempFail{err}
+	}
+
+	const success = 1
+	const permfail = 2
+	const tempfail = 2
+	var status int
+
+	exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
+
+	if inCodes(exitCode, taskp["task.successCodes"]) {
+		status = success
+	} else if inCodes(exitCode, taskp["task.permanentFailCodes"]) {
+		status = permfail
+	} else if inCodes(exitCode, taskp["task.temporaryFailCodes"]) {
+		os.Exit(TASK_TEMPFAIL)
+	} else if cmd.ProcessState.Success() {
+		status = success
+	} else {
+		status = permfail
+	}
+
+	// Upload output directory
+	// TODO
+
+	// Set status
+	err = api.Call("PUT", "job_tasks", taskUuid, "",
+		arvadosclient.Dict{
+			"job_task": arvadosclient.Dict{
+				"output":   "",
+				"success":  status == success,
+				"progress": 1.0}},
+		nil)
+	if err != nil {
+		return TempFail{err}
+	}
+
+	if status == success {
+		return nil
+	} else {
+		return PermFail{}
+	}
+}
+
+func main() {
+	syscall.Umask(0077)
+
+	api, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	jobUuid := os.Getenv("JOB_UUID")
+	taskUuid := os.Getenv("TASK_UUID")
+	tmpdir := os.Getenv("TASK_WORK")
+	keepmount := os.Getenv("TASK_KEEPMOUNT")
+
+	jobStruct := getRecord(api, "jobs", jobUuid)
+	taskStruct := getRecord(api, "job_tasks", taskUuid)
+
+	err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+
+	if err == nil {
+		os.Exit(0)
+	} else if _, ok := err.(TempFail); ok {
+		log.Print(err)
+		os.Exit(TASK_TEMPFAIL)
+	} else if _, ok := err.(PermFail); ok {
+		os.Exit(1)
+	} else {
+		log.Fatal(err)
+	}
+}
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
new file mode 100644
index 0000000..7d4c62a
--- /dev/null
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -0,0 +1,25 @@
+package main
+
+import (
+	"crypto/md5"
+	"flag"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	. "gopkg.in/check.v1"
+	"io"
+	"io/ioutil"
+	"log"
+	"net"
+	"net/http"
+	"os"
+	"testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&StandaloneSuite{})

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list