[ARVADOS] updated: 5fb633ee021c613d99280e8958a6598602041011

git at public.curoverse.com git at public.curoverse.com
Thu Oct 22 09:51:45 EDT 2015


Summary of changes:
 sdk/go/crunchrunner/crunchrunner.go      | 22 +++++---
 sdk/go/crunchrunner/crunchrunner_test.go | 59 ++++++++-----------
 sdk/go/crunchrunner/upload.go            | 87 ++++++++++++++++++----------
 sdk/go/crunchrunner/upload_test.go       | 97 ++++++++++++++++++++++++++++++--
 4 files changed, 191 insertions(+), 74 deletions(-)

       via  5fb633ee021c613d99280e8958a6598602041011 (commit)
       via  30bfda443a98efc1a717f35258e3c3ffd7369d7c (commit)
      from  7600537f3f34ee88a76688dbb0e1d73723905fa7 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 5fb633ee021c613d99280e8958a6598602041011
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 22 09:51:37 2015 -0400

    7582: Runner uploads results.  Feature complete.

diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index 0ca7ce9..0aacfeb 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -2,7 +2,7 @@ package main
 
 import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	//"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"log"
 	"os"
 	"os/exec"
@@ -145,6 +145,7 @@ func (s PermFail) Error() string {
 }
 
 func runner(api arvadosclient.IArvadosClient,
+	kc IKeepClient,
 	jobUuid, taskUuid, tmpdir, keepmount string,
 	jobStruct Job, taskStruct Task) error {
 
@@ -218,6 +219,8 @@ func runner(api arvadosclient.IArvadosClient,
 
 	exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
 
+	log.Printf("Completed with exit code %v", exitCode)
+
 	if inCodes(exitCode, taskp.successCodes) {
 		status = success
 	} else if inCodes(exitCode, taskp.permanentFailCodes) {
@@ -231,15 +234,18 @@ func runner(api arvadosclient.IArvadosClient,
 	}
 
 	// Upload output directory
-	// TODO
+	manifest, err := WriteTree(kc, outdir)
+	if err != nil {
+		return TempFail{err}
+	}
 
 	// Set status
 	err = api.Update("job_tasks", taskUuid,
 		map[string]interface{}{
-			"job_task": map[string]interface{}{
-				"output":   "",
-				"success":  status == success,
-				"progress": 1.0}},
+			"job_task": Task{
+				output:   manifest,
+				success:  status == success,
+				progress: 1}},
 		nil)
 	if err != nil {
 		return TempFail{err}
@@ -277,7 +283,9 @@ func main() {
 		log.Fatal(err)
 	}
 
-	err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+	var kc IKeepClient
+	kc, err = keepclient.MakeKeepClient(&api)
+	err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
 
 	if err == nil {
 		os.Exit(0)
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
index 29b092b..e67c9ee 100644
--- a/sdk/go/crunchrunner/crunchrunner_test.go
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -19,6 +19,9 @@ type TestSuite struct{}
 var _ = Suite(&TestSuite{})
 
 type ArvTestClient struct {
+	c        *C
+	manifest string
+	success  bool
 }
 
 func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
@@ -30,6 +33,11 @@ func (t ArvTestClient) Delete(resource string, uuid string, parameters arvadoscl
 }
 
 func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	t.c.Check(resourceType, Equals, "job_tasks")
+	t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": Task{
+		output:   t.manifest,
+		success:  t.success,
+		progress: 1}})
 	return nil
 }
 
@@ -42,15 +50,13 @@ func (t ArvTestClient) List(resource string, parameters arvadosclient.Dict, outp
 }
 
 func (s *TestSuite) TestSimpleRun(c *C) {
-
-	api := ArvTestClient{}
-
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c, "", true},
+		KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -74,15 +80,14 @@ func checkOutput(c *C, tmpdir string) {
 }
 
 func (s *TestSuite) TestSimpleRunSubtask(c *C) {
-
-	api := ArvTestClient{}
-
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c,
+		". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+		KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -100,9 +105,6 @@ func (s *TestSuite) TestSimpleRunSubtask(c *C) {
 }
 
 func (s *TestSuite) TestRedirect(c *C) {
-
-	api := ArvTestClient{}
-
 	tmpfile, _ := ioutil.TempFile("", "")
 	tmpfile.Write([]byte("foo\n"))
 	tmpfile.Close()
@@ -113,7 +115,9 @@ func (s *TestSuite) TestRedirect(c *C) {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c,
+		". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+		KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -129,15 +133,13 @@ func (s *TestSuite) TestRedirect(c *C) {
 }
 
 func (s *TestSuite) TestEnv(c *C) {
-
-	api := ArvTestClient{}
-
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+		KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -201,7 +203,7 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(&api,
+	err := runner(&api, KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -215,15 +217,12 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
 }
 
 func (s *TestSuite) TestRunFail(c *C) {
-
-	api := ArvTestClient{}
-
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -235,15 +234,12 @@ func (s *TestSuite) TestRunFail(c *C) {
 }
 
 func (s *TestSuite) TestRunSuccessCode(c *C) {
-
-	api := ArvTestClient{}
-
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c, "", true}, KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -256,14 +252,12 @@ func (s *TestSuite) TestRunSuccessCode(c *C) {
 }
 
 func (s *TestSuite) TestRunFailCode(c *C) {
-	api := ArvTestClient{}
-
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -276,14 +270,12 @@ func (s *TestSuite) TestRunFailCode(c *C) {
 }
 
 func (s *TestSuite) TestRunTempFailCode(c *C) {
-	api := ArvTestClient{}
-
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
@@ -296,8 +288,6 @@ func (s *TestSuite) TestRunTempFailCode(c *C) {
 }
 
 func (s *TestSuite) TestVwd(c *C) {
-	api := ArvTestClient{}
-
 	tmpfile, _ := ioutil.TempFile("", "")
 	tmpfile.Write([]byte("foo\n"))
 	tmpfile.Close()
@@ -308,7 +298,8 @@ func (s *TestSuite) TestVwd(c *C) {
 		os.RemoveAll(tmpdir)
 	}()
 
-	err := runner(api,
+	err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+		KeepTestClient{},
 		"zzzz-8i9sb-111111111111111",
 		"zzzz-ot0gb-111111111111111",
 		tmpdir,
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index 7f1fd8a..4feb142 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -8,6 +8,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"io"
+	"log"
 	"os"
 	"path/filepath"
 )
@@ -95,6 +96,9 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
 	if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
 		dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
 	}
+	if dir == "" {
+		dir = "."
+	}
 
 	fn := path[(len(path) - len(info.Name())):]
 
@@ -118,6 +122,8 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
 		return err
 	}
 
+	log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
 	var count int64
 	count, err = io.Copy(stream, file)
 	if err != nil && err != io.EOF {
@@ -164,7 +170,7 @@ func (m *ManifestWriter) ManifestText() string {
 	m.Finish()
 	var buf bytes.Buffer
 	for k, v := range m.Streams {
-		if k == "" {
+		if k == "." {
 			buf.WriteString(".")
 		} else {
 			buf.WriteString("./" + k)

commit 30bfda443a98efc1a717f35258e3c3ffd7369d7c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 22 09:20:13 2015 -0400

    7582: Uploader passes tests

diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index 2196a9d..7f1fd8a 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -3,11 +3,11 @@ package main
 import (
 	"bytes"
 	"crypto/md5"
+	"errors"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"io"
-	"log"
 	"os"
 	"path/filepath"
 )
@@ -23,6 +23,7 @@ type ManifestStreamWriter struct {
 	offset int64
 	*Block
 	uploader chan *Block
+	finish   chan []error
 }
 
 type IKeepClient interface {
@@ -58,16 +59,25 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
 }
 
 func (m *ManifestStreamWriter) goUpload() {
-	select {
-	case block, valid := <-m.uploader:
-		if !valid {
-			return
+	var errors []error
+	uploader := m.uploader
+	finish := m.finish
+	for true {
+		select {
+		case block, valid := <-uploader:
+			if !valid {
+				finish <- errors
+				return
+			}
+			hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+			signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+			if err != nil {
+				errors = append(errors, err)
+			} else {
+				m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+			}
 		}
-		hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-		signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
-		m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
 	}
-
 }
 
 type ManifestWriter struct {
@@ -76,23 +86,16 @@ type ManifestWriter struct {
 	Streams     map[string]*ManifestStreamWriter
 }
 
-type walker struct {
-	currentDir string
-	m          *ManifestWriter
-}
-
-func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
-	log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
-
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
 	if info.IsDir() {
-		if path == w.currentDir {
-			return nil
-		}
-		return filepath.Walk(path, walker{path, w.m}.WalkFunc)
+		return nil
+	}
+
+	var dir string
+	if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+		dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
 	}
-	m := w.m
 
-	dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
 	fn := path[(len(path) - len(info.Name())):]
 
 	if m.Streams[dir] == nil {
@@ -101,7 +104,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
 			&manifest.ManifestStream{StreamName: dir},
 			0,
 			nil,
-			make(chan *Block)}
+			make(chan *Block),
+			make(chan []error)}
 		go m.Streams[dir].goUpload()
 	}
 
@@ -128,7 +132,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
 	return nil
 }
 
-func (m *ManifestWriter) Finish() {
+func (m *ManifestWriter) Finish() error {
+	var errstring string
 	for _, v := range m.Streams {
 		if v.uploader != nil {
 			if v.Block != nil {
@@ -136,8 +141,23 @@ func (m *ManifestWriter) Finish() {
 			}
 			close(v.uploader)
 			v.uploader = nil
+
+			errors := <-v.finish
+			close(v.finish)
+			v.finish = nil
+
+			if errors != nil {
+				for _, r := range errors {
+					errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+				}
+			}
 		}
 	}
+	if errstring != "" {
+		return errors.New(errstring)
+	} else {
+		return nil
+	}
 }
 
 func (m *ManifestWriter) ManifestText() string {
@@ -164,13 +184,16 @@ func (m *ManifestWriter) ManifestText() string {
 
 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
 	mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
-	err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
-	mw.Finish()
+	err = filepath.Walk(root, mw.WalkFunc)
 
 	if err != nil {
 		return "", err
-	} else {
-		return mw.ManifestText(), nil
 	}
 
+	err = mw.Finish()
+	if err != nil {
+		return "", err
+	}
+
+	return mw.ManifestText(), nil
 }
diff --git a/sdk/go/crunchrunner/upload_test.go b/sdk/go/crunchrunner/upload_test.go
index 6e0e103..e337b76 100644
--- a/sdk/go/crunchrunner/upload_test.go
+++ b/sdk/go/crunchrunner/upload_test.go
@@ -2,9 +2,11 @@ package main
 
 import (
 	"crypto/md5"
+	"errors"
 	"fmt"
 	. "gopkg.in/check.v1"
 	"io/ioutil"
+	"log"
 	"os"
 )
 
@@ -21,6 +23,8 @@ func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
 }
 
 func (s *TestSuite) TestSimpleUpload(c *C) {
+	log.Print("--TestSimpleUpload--")
+
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
@@ -34,6 +38,8 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
 }
 
 func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+	log.Print("--TestSimpleUploadTwofiles--")
+
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
@@ -48,19 +54,102 @@ func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
 }
 
 func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+	log.Print("--TestSimpleUploadSubdir--")
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	os.Mkdir(tmpdir+"/subdir", 0700)
+
+	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+	ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+	str, err := WriteTree(KeepTestClient{}, tmpdir)
+	c.Check(err, IsNil)
+	c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+`)
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+	log.Print("--TestSimpleUploadLarge--")
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	file, _ := os.Create(tmpdir + "/" + "file1.txt")
+	data := make([]byte, 1024*1024-1)
+	for i := 0; i < 1024*1024-1; i++ {
+		data[i] = byte(i % 10)
+	}
+	for i := 0; i < 65; i++ {
+		file.Write(data)
+	}
+	file.Close()
+
+	ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+	str, err := WriteTree(KeepTestClient{}, tmpdir)
+	c.Check(err, IsNil)
+	c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+	log.Print("--TestUploadEmptySubdir--")
+
 	tmpdir, _ := ioutil.TempDir("", "")
 	defer func() {
 		os.RemoveAll(tmpdir)
 	}()
 
-	os.Mkdir(tmpdir+"/"+"subdir", 0600)
+	os.Mkdir(tmpdir+"/subdir", 0700)
 
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-	ioutil.WriteFile(tmpdir+"/"+"subdir/file2.txt", []byte("bar"), 0600)
 
 	str, err := WriteTree(KeepTestClient{}, tmpdir)
 	c.Check(err, IsNil)
-	c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file1.txt
-./subdir acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file2.txt
+	c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+	log.Print("--TestUploadEmptyFile--")
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+	str, err := WriteTree(KeepTestClient{}, tmpdir)
+	c.Check(err, IsNil)
+	c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
 `)
 }
+
+type KeepErrorTestClient struct {
+}
+
+func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	return "", 0, errors.New("Failed!")
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+	log.Print("--TestSimpleUpload--")
+
+	tmpdir, _ := ioutil.TempDir("", "")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+	str, err := WriteTree(KeepErrorTestClient{}, tmpdir)
+	c.Check(err, NotNil)
+	c.Check(str, Equals, "")
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list