[ARVADOS] updated: c8cba416487f240c6e8395b1581d9f4e441cc5f7

git at public.curoverse.com git at public.curoverse.com
Fri Feb 12 10:46:37 EST 2016


Summary of changes:
 services/crunch-run/crunchrun.go |  5 +++
 services/crunch-run/upload.go    | 67 ++++++++++++++++++++++++++++------------
 2 files changed, 53 insertions(+), 19 deletions(-)

       via  c8cba416487f240c6e8395b1581d9f4e441cc5f7 (commit)
      from  08d080d5a4edccaa579f9a5b5cfa18c9ac471430 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c8cba416487f240c6e8395b1581d9f4e441cc5f7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Feb 12 10:46:33 2016 -0500

    8015: Working on WriteTree() and setting up mounts.

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 1248854..8f6f05c 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -178,6 +178,7 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig)
 	pdhOnly := true
 	tmpcount := 0
 	arvMountCmd := []string{"--foreground"}
+	collections := []string{}
 
 	for bind, mnt := range runner.ContainerRecord.Mounts {
 		if mnt.Kind == "collection" {
@@ -207,6 +208,7 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig)
 			} else {
 				hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
 			}
+			collections = append(collections, src)
 		} else if mnt.Kind == "tmp" {
 			hostConfig.Binds = append(hostConfig.Binds, bind)
 		} else {
@@ -226,6 +228,9 @@ func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig)
 	if err != nil {
 		return err
 	}
+
+	// XXX need to go through and os.Stat() each file or dir in "sources"
+	// to make sure they show up for Docker.
 }
 
 // StartContainer creates the container and runs it.
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index d50ea58..c3b8c37 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -17,7 +17,9 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"io"
+	"log"
 	"strings"
+	"sync"
 )
 
 // Block is a data block in a manifest stream
@@ -77,6 +79,12 @@ func (m *CollectionFileWriter) Close() error {
 	return nil
 }
 
+func (m *CollectionFileWriter) NewFile(fn string) {
+	m.offset += m.length
+	m.length = 0
+	m.fn = fn
+}
+
 func (m *CollectionFileWriter) goUpload() {
 	var errors []error
 	uploader := m.uploader
@@ -98,6 +106,7 @@ func (m *CollectionFileWriter) goUpload() {
 type CollectionWriter struct {
 	IKeepClient
 	Streams []*CollectionFileWriter
+	mtx     sync.Mutex
 }
 
 // Open a new file for writing in the Keep collection.
@@ -125,6 +134,8 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
 		fn}
 	go fw.goUpload()
 
+	m.mtx.Lock()
+	defer m.mtx.Unlock()
 	m.Streams = append(m.Streams, fw)
 
 	return fw
@@ -133,6 +144,9 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
 // Finish writing the collection, wait for all blocks to complete uploading.
 func (m *CollectionWriter) Finish() error {
 	var errstring string
+	m.mtx.Lock()
+	defer m.mtx.Unlock()
+
 	for _, stream := range m.Streams {
 		if stream.uploader == nil {
 			continue
@@ -168,6 +182,8 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 
 	var buf bytes.Buffer
 
+	m.mtx.Lock()
+	defer m.mtx.Unlock()
 	for _, v := range m.Streams {
 		k := v.StreamName
 		if k == "." {
@@ -192,7 +208,15 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 	return buf.String(), nil
 }
 
-func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, stripPrefix string) error {
+// WalkFunc walks a directory tree, uploads each file found and adds it to the
+// CollectionWriter.
+func (m *CollectionWriter) WalkFunc(path string,
+	info os.FileInfo,
+	err error,
+	stripPrefix string,
+	streamMap map[string]*manifest.ManifestStream,
+	status log.Logger) error {
+
 	if info.IsDir() {
 		return nil
 	}
@@ -207,8 +231,8 @@ func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, st
 
 	fn := path[(len(path) - len(info.Name())):]
 
-	if m.Streams[dir] == nil {
-		m.Streams[dir] = &CollectionFileWriter{
+	if streamMap[dir] == nil {
+		streamMap[dir] = &CollectionFileWriter{
 			m.IKeepClient,
 			&manifest.ManifestStream{StreamName: dir},
 			0,
@@ -217,48 +241,53 @@ func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, st
 			make(chan *Block),
 			make(chan []error),
 			""}
-		go m.Streams[dir].goUpload()
+		go streamMap[dir].goUpload()
 	}
 
-	stream := m.Streams[dir]
+	fileWriter := streamMap[dir]
 
-	fileStart := stream.offset
+	// Reset the CollectionFileWriter for a new file
+	fileWriter.NewFile(fn)
 
 	file, err := os.Open(path)
 	if err != nil {
 		return err
 	}
 
-	log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+	status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
 	var count int64
-	count, err = io.Copy(stream, file)
+	count, err = io.Copy(fileWriter, file)
 	if err != nil {
 		return err
 	}
 
-	stream.offset += count
-
-	stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
-		manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
+	// Commits the current file.  Legal to call this repeatedly.
+	fileWriter.Close()
 
 	return nil
 }
 
-func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
-	mw := CollectionWriter{kc, root, map[string]*ManifestStreamWriter{}}
+func (cw *CollectionWriter) WriteTree(root string, status log.Logger) (manifest string, err error) {
+	streamMap := make(map[string]*ManifestStreamWriter)
 	err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
-		return mw.WalkFunc(path, info, err, root)
+		return cw.WalkFunc(path,
+			info,
+			err,
+			root,
+			streamMap,
+			status)
 	})
 
 	if err != nil {
 		return "", err
 	}
 
-	err = mw.Finish()
-	if err != nil {
-		return "", err
+	cw.mtx.Lock()
+	for _, st := range streamMap {
+		cw.Streams = append(cw.Streams, st)
 	}
+	cw.mtx.Unlock()
 
-	return mw.ManifestText(), nil
+	return mw.ManifestText()
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list