[ARVADOS] updated: 5fb633ee021c613d99280e8958a6598602041011
git at public.curoverse.com
git at public.curoverse.com
Thu Oct 22 09:51:45 EDT 2015
Summary of changes:
sdk/go/crunchrunner/crunchrunner.go | 22 +++++---
sdk/go/crunchrunner/crunchrunner_test.go | 59 ++++++++-----------
sdk/go/crunchrunner/upload.go | 87 ++++++++++++++++++----------
sdk/go/crunchrunner/upload_test.go | 97 ++++++++++++++++++++++++++++++--
4 files changed, 191 insertions(+), 74 deletions(-)
via 5fb633ee021c613d99280e8958a6598602041011 (commit)
via 30bfda443a98efc1a717f35258e3c3ffd7369d7c (commit)
from 7600537f3f34ee88a76688dbb0e1d73723905fa7 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 5fb633ee021c613d99280e8958a6598602041011
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Oct 22 09:51:37 2015 -0400
7582: Runner uploads results. Feature complete.
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index 0ca7ce9..0aacfeb 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -2,7 +2,7 @@ package main
import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- //"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"log"
"os"
"os/exec"
@@ -145,6 +145,7 @@ func (s PermFail) Error() string {
}
func runner(api arvadosclient.IArvadosClient,
+ kc IKeepClient,
jobUuid, taskUuid, tmpdir, keepmount string,
jobStruct Job, taskStruct Task) error {
@@ -218,6 +219,8 @@ func runner(api arvadosclient.IArvadosClient,
exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
+ log.Printf("Completed with exit code %v", exitCode)
+
if inCodes(exitCode, taskp.successCodes) {
status = success
} else if inCodes(exitCode, taskp.permanentFailCodes) {
@@ -231,15 +234,18 @@ func runner(api arvadosclient.IArvadosClient,
}
// Upload output directory
- // TODO
+ manifest, err := WriteTree(kc, outdir)
+ if err != nil {
+ return TempFail{err}
+ }
// Set status
err = api.Update("job_tasks", taskUuid,
map[string]interface{}{
- "job_task": map[string]interface{}{
- "output": "",
- "success": status == success,
- "progress": 1.0}},
+ "job_task": Task{
+ output: manifest,
+ success: status == success,
+ progress: 1}},
nil)
if err != nil {
return TempFail{err}
@@ -277,7 +283,9 @@ func main() {
log.Fatal(err)
}
- err = runner(api, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+ var kc IKeepClient
+ kc, err = keepclient.MakeKeepClient(&api)
+ err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
if err == nil {
os.Exit(0)
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
index 29b092b..e67c9ee 100644
--- a/sdk/go/crunchrunner/crunchrunner_test.go
+++ b/sdk/go/crunchrunner/crunchrunner_test.go
@@ -19,6 +19,9 @@ type TestSuite struct{}
var _ = Suite(&TestSuite{})
type ArvTestClient struct {
+ c *C
+ manifest string
+ success bool
}
func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
@@ -30,6 +33,11 @@ func (t ArvTestClient) Delete(resource string, uuid string, parameters arvadoscl
}
func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ t.c.Check(resourceType, Equals, "job_tasks")
+ t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": Task{
+ output: t.manifest,
+ success: t.success,
+ progress: 1}})
return nil
}
@@ -42,15 +50,13 @@ func (t ArvTestClient) List(resource string, parameters arvadosclient.Dict, outp
}
func (s *TestSuite) TestSimpleRun(c *C) {
-
- api := ArvTestClient{}
-
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c, "", true},
+ KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -74,15 +80,14 @@ func checkOutput(c *C, tmpdir string) {
}
func (s *TestSuite) TestSimpleRunSubtask(c *C) {
-
- api := ArvTestClient{}
-
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -100,9 +105,6 @@ func (s *TestSuite) TestSimpleRunSubtask(c *C) {
}
func (s *TestSuite) TestRedirect(c *C) {
-
- api := ArvTestClient{}
-
tmpfile, _ := ioutil.TempFile("", "")
tmpfile.Write([]byte("foo\n"))
tmpfile.Close()
@@ -113,7 +115,9 @@ func (s *TestSuite) TestRedirect(c *C) {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -129,15 +133,13 @@ func (s *TestSuite) TestRedirect(c *C) {
}
func (s *TestSuite) TestEnv(c *C) {
-
- api := ArvTestClient{}
-
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -201,7 +203,7 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
os.RemoveAll(tmpdir)
}()
- err := runner(&api,
+ err := runner(&api, KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -215,15 +217,12 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
}
func (s *TestSuite) TestRunFail(c *C) {
-
- api := ArvTestClient{}
-
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -235,15 +234,12 @@ func (s *TestSuite) TestRunFail(c *C) {
}
func (s *TestSuite) TestRunSuccessCode(c *C) {
-
- api := ArvTestClient{}
-
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c, "", true}, KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -256,14 +252,12 @@ func (s *TestSuite) TestRunSuccessCode(c *C) {
}
func (s *TestSuite) TestRunFailCode(c *C) {
- api := ArvTestClient{}
-
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -276,14 +270,12 @@ func (s *TestSuite) TestRunFailCode(c *C) {
}
func (s *TestSuite) TestRunTempFailCode(c *C) {
- api := ArvTestClient{}
-
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
@@ -296,8 +288,6 @@ func (s *TestSuite) TestRunTempFailCode(c *C) {
}
func (s *TestSuite) TestVwd(c *C) {
- api := ArvTestClient{}
-
tmpfile, _ := ioutil.TempFile("", "")
tmpfile.Write([]byte("foo\n"))
tmpfile.Close()
@@ -308,7 +298,8 @@ func (s *TestSuite) TestVwd(c *C) {
os.RemoveAll(tmpdir)
}()
- err := runner(api,
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
"zzzz-8i9sb-111111111111111",
"zzzz-ot0gb-111111111111111",
tmpdir,
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index 7f1fd8a..4feb142 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -8,6 +8,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
+ "log"
"os"
"path/filepath"
)
@@ -95,6 +96,9 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
}
+ if dir == "" {
+ dir = "."
+ }
fn := path[(len(path) - len(info.Name())):]
@@ -118,6 +122,8 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
return err
}
+ log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
var count int64
count, err = io.Copy(stream, file)
if err != nil && err != io.EOF {
@@ -164,7 +170,7 @@ func (m *ManifestWriter) ManifestText() string {
m.Finish()
var buf bytes.Buffer
for k, v := range m.Streams {
- if k == "" {
+ if k == "." {
buf.WriteString(".")
} else {
buf.WriteString("./" + k)
commit 30bfda443a98efc1a717f35258e3c3ffd7369d7c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Oct 22 09:20:13 2015 -0400
7582: Uploader passes tests
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index 2196a9d..7f1fd8a 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -3,11 +3,11 @@ package main
import (
"bytes"
"crypto/md5"
+ "errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
- "log"
"os"
"path/filepath"
)
@@ -23,6 +23,7 @@ type ManifestStreamWriter struct {
offset int64
*Block
uploader chan *Block
+ finish chan []error
}
type IKeepClient interface {
@@ -58,16 +59,25 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
}
func (m *ManifestStreamWriter) goUpload() {
- select {
- case block, valid := <-m.uploader:
- if !valid {
- return
+ var errors []error
+ uploader := m.uploader
+ finish := m.finish
+ for true {
+ select {
+ case block, valid := <-uploader:
+ if !valid {
+ finish <- errors
+ return
+ }
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
}
- hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
}
-
}
type ManifestWriter struct {
@@ -76,23 +86,16 @@ type ManifestWriter struct {
Streams map[string]*ManifestStreamWriter
}
-type walker struct {
- currentDir string
- m *ManifestWriter
-}
-
-func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
- log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
-
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
if info.IsDir() {
- if path == w.currentDir {
- return nil
- }
- return filepath.Walk(path, walker{path, w.m}.WalkFunc)
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
}
- m := w.m
- dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
fn := path[(len(path) - len(info.Name())):]
if m.Streams[dir] == nil {
@@ -101,7 +104,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
&manifest.ManifestStream{StreamName: dir},
0,
nil,
- make(chan *Block)}
+ make(chan *Block),
+ make(chan []error)}
go m.Streams[dir].goUpload()
}
@@ -128,7 +132,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
return nil
}
-func (m *ManifestWriter) Finish() {
+func (m *ManifestWriter) Finish() error {
+ var errstring string
for _, v := range m.Streams {
if v.uploader != nil {
if v.Block != nil {
@@ -136,8 +141,23 @@ func (m *ManifestWriter) Finish() {
}
close(v.uploader)
v.uploader = nil
+
+ errors := <-v.finish
+ close(v.finish)
+ v.finish = nil
+
+ if errors != nil {
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+ }
+ }
}
}
+ if errstring != "" {
+ return errors.New(errstring)
+ } else {
+ return nil
+ }
}
func (m *ManifestWriter) ManifestText() string {
@@ -164,13 +184,16 @@ func (m *ManifestWriter) ManifestText() string {
func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
- err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
- mw.Finish()
+ err = filepath.Walk(root, mw.WalkFunc)
if err != nil {
return "", err
- } else {
- return mw.ManifestText(), nil
}
+ err = mw.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ return mw.ManifestText(), nil
}
diff --git a/sdk/go/crunchrunner/upload_test.go b/sdk/go/crunchrunner/upload_test.go
index 6e0e103..e337b76 100644
--- a/sdk/go/crunchrunner/upload_test.go
+++ b/sdk/go/crunchrunner/upload_test.go
@@ -2,9 +2,11 @@ package main
import (
"crypto/md5"
+ "errors"
"fmt"
. "gopkg.in/check.v1"
"io/ioutil"
+ "log"
"os"
)
@@ -21,6 +23,8 @@ func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
}
func (s *TestSuite) TestSimpleUpload(c *C) {
+ log.Print("--TestSimpleUpload--")
+
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
@@ -34,6 +38,8 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
}
func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+ log.Print("--TestSimpleUploadTwofiles--")
+
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
@@ -48,19 +54,102 @@ func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
}
func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+ log.Print("--TestSimpleUploadSubdir--")
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/subdir", 0700)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+`)
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+ log.Print("--TestSimpleUploadLarge--")
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ file, _ := os.Create(tmpdir + "/" + "file1.txt")
+ data := make([]byte, 1024*1024-1)
+ for i := 0; i < 1024*1024-1; i++ {
+ data[i] = byte(i % 10)
+ }
+ for i := 0; i < 65; i++ {
+ file.Write(data)
+ }
+ file.Close()
+
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+ log.Print("--TestUploadEmptySubdir--")
+
tmpdir, _ := ioutil.TempDir("", "")
defer func() {
os.RemoveAll(tmpdir)
}()
- os.Mkdir(tmpdir+"/"+"subdir", 0600)
+ os.Mkdir(tmpdir+"/subdir", 0700)
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- ioutil.WriteFile(tmpdir+"/"+"subdir/file2.txt", []byte("bar"), 0600)
str, err := WriteTree(KeepTestClient{}, tmpdir)
c.Check(err, IsNil)
- c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file1.txt
-./subdir acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file2.txt
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+ log.Print("--TestUploadEmptyFile--")
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
`)
}
+
+type KeepErrorTestClient struct {
+}
+
+func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return "", 0, errors.New("Failed!")
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+ log.Print("--TestSimpleUpload--")
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepErrorTestClient{}, tmpdir)
+ c.Check(err, NotNil)
+ c.Check(str, Equals, "")
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list