[ARVADOS] updated: 90bf8cae5f1ba931c7cc3be7864cad8ecca1ca5e

git at public.curoverse.com git at public.curoverse.com
Sun May 11 21:59:49 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/keepclient/keepclient.go | 82 +++++++++++++++++++++++--
 1 file changed, 76 insertions(+), 6 deletions(-)

       via  90bf8cae5f1ba931c7cc3be7864cad8ecca1ca5e (commit)
      from  33d63c6d42e824744305df3e720f8e9cbcc87d78 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 90bf8cae5f1ba931c7cc3be7864cad8ecca1ca5e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Sun May 11 21:59:44 2014 -0400

    2798: Working on Transfer function, which will be the core function for
    mananging concurrent writes to Keep servers.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 9c1210d..ba73523 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -115,13 +115,12 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
 	return pseq
 }
 
-func Fill(buffer []byte, r io.Reader, c chan []byte, errorchan chan error) {
+func ReadIntoBuffer(buffer []byte, r io.Reader, c chan []byte, reader_error chan error) {
 	ptr := buffer[:]
-
 	for {
 		n, err := r.Read(ptr)
 		if err != nil {
-			errorchan <- err
+			reader_error <- err
 			return
 		}
 		c <- ptr[:n]
@@ -129,13 +128,84 @@ func Fill(buffer []byte, r io.Reader, c chan []byte, errorchan chan error) {
 	}
 }
 
+type Sink struct {
+	out io.Writer
+	err chan error
+}
+
+// Transfer data from a buffer into one or more 'sinks'.
+//
+// Forwards all data read to the writers in "Sinks", including any previous
+// reads into the buffer.  Either one of buffer or io.Reader must be valid, and
+// the other must be nil.  If 'source' is valid, it will read from the reader,
+// store the data in the buffer, and send the data to the sinks.  Otherwise
+// 'buffer' must be valid, and it will send the contents of the buffer to the
+// sinks.
+func Transfer(buffer []byte, source io.Reader, sinks chan Sink, errorchan chan error) {
+	// currently buffered data
+	var ptr []byte
+
+	// for receiving slices from ReadIntoBuffer
+	var slices chan []byte
+
+	// indicates whether the buffered data is complete
+	var complete bool = false
+
+	// for receiving errors from ReadIntoBuffer
+	var reader_error chan error = nil
+
+	if source != nil {
+		// allocate the scratch buffer at 64 MiB
+		buffer = make([]byte, 1024*1024*64)
+
+		// 'ptr' is a slice indicating the buffer slice that has been
+		// read so far
+		ptr = buffer[0:0]
+
+		// used to communicate slices of the buffer as read
+		slices := make(chan []byte)
+
+		// communicate read errors
+		reader_error = make(chan error)
+
+		// Spin it off
+		go ReadIntoBuffer(buffer, source, slices, reader_error)
+	} else {
+		// use the whole buffer
+		ptr = buffer[:]
+
+		// that's it
+		complete = true
+	}
+
+	// list of sinks to send to
+	sinks_slice := make([]io.Writer, 0)
+
+	select {
+	case e := <-reader_error:
+		// barf
+	case s, valid := <-sinks:
+		if !valid {
+			// sinks channel closed
+			return
+		}
+		sinks_slice = append(sinks_slice, s)
+		go s.Write(ptr)
+	case bk := <-slices:
+		ptr = buffer[0 : len(ptr)+len(bk)]
+		for _, s := range sinks_slice {
+			go s.Write(bk)
+		}
+	}
+}
+
 func (this KeepClient) KeepPut(hash string, r io.Reader) {
 	//sv := this.ShuffledServiceRoots(hash)
 	//n := 0
-	buffer := make([]byte, 0, 1024*1024*64)
+
 	//success := make(chan int)
-	reads := make(chan []byte)
+	sinks := make(chan []io.Writer)
 	errorchan := make(chan error)
 
-	go Fill(buffer, r, reads, errorchan)
+	go Transfer(nil, r, reads, errorchan)
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list