[ARVADOS] updated: c8cba416487f240c6e8395b1581d9f4e441cc5f7
git at public.curoverse.com
git at public.curoverse.com
Fri Feb 12 10:46:37 EST 2016
Summary of changes:
services/crunch-run/crunchrun.go | 5 +++
services/crunch-run/upload.go | 67 ++++++++++++++++++++++++++++------------
2 files changed, 53 insertions(+), 19 deletions(-)
via c8cba416487f240c6e8395b1581d9f4e441cc5f7 (commit)
from 08d080d5a4edccaa579f9a5b5cfa18c9ac471430 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c8cba416487f240c6e8395b1581d9f4e441cc5f7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Feb 12 10:46:33 2016 -0500
8015: Working on WriteTree() and setting up mounts.
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 1248854..8f6f05c 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -178,6 +178,7 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig)
pdhOnly := true
tmpcount := 0
arvMountCmd := []string{"--foreground"}
+ collections := []string{}
for bind, mnt := range runner.ContainerRecord.Mounts {
if mnt.Kind == "collection" {
@@ -207,6 +208,7 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig)
} else {
hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
}
+ collections = append(collections, src)
} else if mnt.Kind == "tmp" {
hostConfig.Binds = append(hostConfig.Binds, bind)
} else {
@@ -226,6 +228,9 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig)
if err != nil {
return err
}
+
+ // XXX need to go through and os.Stat() each file or dir in "sources"
+ // to make sure they show up for Docker.
}
// StartContainer creates the container and runs it.
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index d50ea58..c3b8c37 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -17,7 +17,9 @@ import (
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
+ "log"
"strings"
+ "sync"
)
// Block is a data block in a manifest stream
@@ -77,6 +79,12 @@ func (m *CollectionFileWriter) Close() error {
return nil
}
+func (m *CollectionFileWriter) NewFile(fn string) {
+ m.offset += m.length
+ m.length = 0
+ m.fn = fn
+}
+
func (m *CollectionFileWriter) goUpload() {
var errors []error
uploader := m.uploader
@@ -98,6 +106,7 @@ func (m *CollectionFileWriter) goUpload() {
type CollectionWriter struct {
IKeepClient
Streams []*CollectionFileWriter
+ mtx sync.Mutex
}
// Open a new file for writing in the Keep collection.
@@ -125,6 +134,8 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
fn}
go fw.goUpload()
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
m.Streams = append(m.Streams, fw)
return fw
@@ -133,6 +144,9 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
// Finish writing the collection, wait for all blocks to complete uploading.
func (m *CollectionWriter) Finish() error {
var errstring string
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+
for _, stream := range m.Streams {
if stream.uploader == nil {
continue
@@ -168,6 +182,8 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
var buf bytes.Buffer
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
for _, v := range m.Streams {
k := v.StreamName
if k == "." {
@@ -192,7 +208,15 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
return buf.String(), nil
}
-func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, stripPrefix string) error {
+// WalkFunc walks a directory tree, uploads each file found and adds it to the
+// CollectionWriter.
+func (m *CollectionWriter) WalkFunc(path string,
+ info os.FileInfo,
+ err error,
+ stripPrefix string,
+ streamMap map[string]*manifest.ManifestStream,
+ status log.Logger) error {
+
if info.IsDir() {
return nil
}
@@ -207,8 +231,8 @@ func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, st
fn := path[(len(path) - len(info.Name())):]
- if m.Streams[dir] == nil {
- m.Streams[dir] = &CollectionFileWriter{
+ if streamMap[dir] == nil {
+ streamMap[dir] = &CollectionFileWriter{
m.IKeepClient,
&manifest.ManifestStream{StreamName: dir},
0,
@@ -217,48 +241,53 @@ func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, st
make(chan *Block),
make(chan []error),
""}
- go m.Streams[dir].goUpload()
+ go streamMap[dir].goUpload()
}
- stream := m.Streams[dir]
+ fileWriter := streamMap[dir]
- fileStart := stream.offset
+ // Reset the CollectionFileWriter for a new file
+ fileWriter.NewFile(fn)
file, err := os.Open(path)
if err != nil {
return err
}
- log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+ status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
var count int64
- count, err = io.Copy(stream, file)
+ count, err = io.Copy(fileWriter, file)
if err != nil {
return err
}
- stream.offset += count
-
- stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
- manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
+ // Commits the current file. Legal to call this repeatedly.
+ fileWriter.Close()
return nil
}
-func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
- mw := CollectionWriter{kc, root, map[string]*ManifestStreamWriter{}}
+func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest string, err error) {
+ streamMap := make(map[string]*ManifestStreamWriter)
err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
- return mw.WalkFunc(path, info, err, root)
+ return cw.WalkFunc(path,
+ info,
+ err,
+ root,
+ streamMap,
+ status)
})
if err != nil {
return "", err
}
- err = mw.Finish()
- if err != nil {
- return "", err
+ cw.mtx.Lock()
+ for _, st := range streamMap {
+ cw.Streams = append(cw.Streams, st)
}
+ cw.mtx.Unlock()
- return mw.ManifestText(), nil
+ return mw.ManifestText()
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list