[ARVADOS] updated: d3b11ddc2506de37b8e6538be69237d6d2a60a4a

git at public.curoverse.com git at public.curoverse.com
Thu May 15 16:47:25 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/buffer/buffer.go            | 243 +++++++++++++
 sdk/go/src/arvados.org/buffer/buffer_test.go       | 364 +++++++++++++++++++
 sdk/go/src/arvados.org/keepclient/keepclient.go    | 355 +++++-------------
 .../src/arvados.org/keepclient/keepclient_test.go  | 401 ++-------------------
 4 files changed, 727 insertions(+), 636 deletions(-)
 create mode 100644 sdk/go/src/arvados.org/buffer/buffer.go
 create mode 100644 sdk/go/src/arvados.org/buffer/buffer_test.go

       via  d3b11ddc2506de37b8e6538be69237d6d2a60a4a (commit)
       via  c3a88cbf511aa0954dac271ce6bda9c6e4f3191c (commit)
       via  4ec57745d2106e955fea4442c9eccb2fce7246c4 (commit)
      from  27f5c1635d56c3f3cb6c5ef069c28db939eec2a1 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d3b11ddc2506de37b8e6538be69237d6d2a60a4a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 15 16:47:14 2014 -0400

    2798: Completed move of Transfer() related code out to 'buffer' package.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 2738cef..829ab0e 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -1,6 +1,7 @@
 package keepclient
 
 import (
+	"arvados.org/buffer"
 	"crypto/md5"
 	"crypto/tls"
 	"encoding/json"
@@ -155,243 +156,6 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
 	return pseq
 }
 
-type ReaderSlice struct {
-	slice        []byte
-	reader_error error
-}
-
-// Read repeatedly from the reader into the specified buffer, and report each
-// read to channel 'c'.  Completes when Reader 'r' reports on the error channel
-// and closes channel 'c'.
-func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
-	defer close(slices)
-
-	// Initially use entire buffer as scratch space
-	ptr := buffer[:]
-	for {
-		var n int
-		var err error
-		if len(ptr) > 0 {
-			// Read into the scratch space
-			n, err = r.Read(ptr)
-		} else {
-			// Ran out of scratch space, try reading one more byte
-			var b [1]byte
-			n, err = r.Read(b[:])
-
-			if n > 0 {
-				// Reader has more data but we have nowhere to
-				// put it, so we're stuffed
-				slices <- ReaderSlice{nil, io.ErrShortBuffer}
-			} else {
-				// Return some other error (hopefully EOF)
-				slices <- ReaderSlice{nil, err}
-			}
-			return
-		}
-
-		// End on error (includes EOF)
-		if err != nil {
-			slices <- ReaderSlice{nil, err}
-			return
-		}
-
-		if n > 0 {
-			// Make a slice with the contents of the read
-			slices <- ReaderSlice{ptr[:n], nil}
-
-			// Adjust the scratch space slice
-			ptr = ptr[n:]
-		}
-	}
-}
-
-// A read request to the Transfer() function
-type ReadRequest struct {
-	offset  int
-	maxsize int
-	result  chan<- ReadResult
-}
-
-// A read result from the Transfer() function
-type ReadResult struct {
-	slice []byte
-	err   error
-}
-
-// Reads from the buffer managed by the Transfer()
-type BufferReader struct {
-	offset    *int
-	requests  chan<- ReadRequest
-	responses chan ReadResult
-}
-
-func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
-	return BufferReader{new(int), requests, make(chan ReadResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this BufferReader) Read(p []byte) (n int, err error) {
-	this.requests <- ReadRequest{*this.offset, len(p), this.responses}
-	rr, valid := <-this.responses
-	if valid {
-		*this.offset += len(rr.slice)
-		return copy(p, rr.slice), rr.err
-	} else {
-		return 0, io.ErrUnexpectedEOF
-	}
-}
-
-func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
-	// Record starting offset in order to correctly report the number of bytes sent
-	starting_offset := *this.offset
-	for {
-		this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
-		rr, valid := <-this.responses
-		if valid {
-			log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
-			*this.offset += len(rr.slice)
-			if rr.err != nil {
-				if rr.err == io.EOF {
-					// EOF is not an error.
-					return int64(*this.offset - starting_offset), nil
-				} else {
-					return int64(*this.offset - starting_offset), rr.err
-				}
-			} else {
-				dest.Write(rr.slice)
-			}
-		} else {
-			return int64(*this.offset), io.ErrUnexpectedEOF
-		}
-	}
-}
-
-// Close the responses channel
-func (this BufferReader) Close() error {
-	close(this.responses)
-	return nil
-}
-
-// Handle a read request.  Returns true if a response was sent, and false if
-// the request should be queued.
-func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
-	log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
-	if req.offset < len(body) {
-		var end int
-		if req.offset+req.maxsize < len(body) {
-			end = req.offset + req.maxsize
-		} else {
-			end = len(body)
-		}
-		req.result <- ReadResult{body[req.offset:end], nil}
-		return true
-	} else if complete && req.offset >= len(body) {
-		req.result <- ReadResult{nil, io.EOF}
-		return true
-	} else {
-		return false
-	}
-}
-
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel.  Completes
-// when 'requests' channel is closed.
-func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
-	// currently buffered data
-	var body []byte
-
-	// for receiving slices from ReadIntoBuffer
-	var slices chan ReaderSlice = nil
-
-	// indicates whether the buffered data is complete
-	var complete bool = false
-
-	if source_reader != nil {
-		// 'body' is the buffer slice representing the body content read so far
-		body = source_buffer[:0]
-
-		// used to communicate slices of the buffer as they are
-		// ReadIntoBuffer will close 'slices' when it is done with it
-		slices = make(chan ReaderSlice)
-
-		// Spin it off
-		go ReadIntoBuffer(source_buffer, source_reader, slices)
-	} else {
-		// use the whole buffer
-		body = source_buffer[:]
-
-		// buffer is complete
-		complete = true
-	}
-
-	pending_requests := make([]ReadRequest, 0)
-
-	for {
-		select {
-		case req, valid := <-requests:
-			// Handle a buffer read request
-			if valid {
-				if !HandleReadRequest(req, body, complete) {
-					pending_requests = append(pending_requests, req)
-				}
-			} else {
-				// closed 'requests' channel indicates we're done
-				return
-			}
-
-		case bk, valid := <-slices:
-			// Got a new slice from the reader
-			if valid {
-				if bk.reader_error != nil {
-					reader_error <- bk.reader_error
-					if bk.reader_error == io.EOF {
-						// EOF indicates the reader is done
-						// sending, so our buffer is complete.
-						complete = true
-					} else {
-						// some other reader error
-						return
-					}
-				}
-
-				if bk.slice != nil {
-					// adjust body bounds now that another slice has been read
-					body = source_buffer[0 : len(body)+len(bk.slice)]
-				}
-
-				// handle pending reads
-				n := 0
-				for n < len(pending_requests) {
-					if HandleReadRequest(pending_requests[n], body, complete) {
-
-						// move the element from the
-						// back of the slice to
-						// position 'n', then shorten
-						// the slice by one element
-						pending_requests[n] = pending_requests[len(pending_requests)-1]
-						pending_requests = pending_requests[0 : len(pending_requests)-1]
-					} else {
-
-						// Request wasn't handled, so keep it in the request slice
-						n += 1
-					}
-				}
-			} else {
-				if complete {
-					// no more reads
-					slices = nil
-				} else {
-					// reader channel closed without signaling EOF
-					reader_error <- io.ErrUnexpectedEOF
-					return
-				}
-			}
-		}
-	}
-}
-
 type UploadStatus struct {
 	Err        error
 	Url        string
@@ -434,7 +198,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
 func (this KeepClient) putReplicas(
 	hash string,
-	requests chan ReadRequest,
+	requests chan buffer.ReadRequest,
 	reader_status chan error,
 	expectedLength int64) (replicas int, err error) {
 
@@ -458,7 +222,7 @@ func (this KeepClient) putReplicas(
 		for active < remaining_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
-				go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status, expectedLength)
+				go this.uploadToKeepServer(sv[next_server], hash, buffer.MakeBufferReader(requests), upload_status, expectedLength)
 				next_server += 1
 				active += 1
 			} else {
@@ -497,18 +261,18 @@ var OversizeBlockError = errors.New("Block too big")
 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
 
 	// Buffer for reads from 'r'
-	var buffer []byte
+	var buf []byte
 	if expectedLength > 0 {
 		if expectedLength > BLOCKSIZE {
 			return 0, OversizeBlockError
 		}
-		buffer = make([]byte, expectedLength)
+		buf = make([]byte, expectedLength)
 	} else {
-		buffer = make([]byte, BLOCKSIZE)
+		buf = make([]byte, BLOCKSIZE)
 	}
 
 	// Read requests on Transfer() buffer
-	requests := make(chan ReadRequest)
+	requests := make(chan buffer.ReadRequest)
 	defer close(requests)
 
 	// Reporting reader error states
@@ -516,20 +280,20 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
 	defer close(reader_status)
 
 	// Start the transfer goroutine
-	go Transfer(buffer, r, requests, reader_status)
+	go buffer.Transfer(buf, r, requests, reader_status)
 
 	return this.putReplicas(hash, requests, reader_status, expectedLength)
 }
 
-func (this KeepClient) PutHB(hash string, buffer []byte) (replicas int, err error) {
+func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
 	// Read requests on Transfer() buffer
-	requests := make(chan ReadRequest)
+	requests := make(chan buffer.ReadRequest)
 	defer close(requests)
 
 	// Start the transfer goroutine
-	go Transfer(buffer, nil, requests, nil)
+	go buffer.Transfer(buf, nil, requests, nil)
 
-	return this.putReplicas(hash, requests, nil, int64(len(buffer)))
+	return this.putReplicas(hash, requests, nil, int64(len(buf)))
 }
 
 func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 3d38c60..5f189fc 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -1,6 +1,7 @@
 package keepclient
 
 import (
+	"arvados.org/buffer"
 	"crypto/md5"
 	"flag"
 	"fmt"
@@ -15,7 +16,6 @@ import (
 	"sort"
 	"strings"
 	"testing"
-	"time"
 )
 
 // Gocheck boilerplate
@@ -89,354 +89,6 @@ func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
 	c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
 }
 
-func ReadIntoBufferHelper(c *C, bufsize int) {
-	buffer := make([]byte, bufsize)
-
-	reader, writer := io.Pipe()
-	slices := make(chan ReaderSlice)
-
-	go ReadIntoBuffer(buffer, reader, slices)
-
-	{
-		out := make([]byte, 128)
-		for i := 0; i < 128; i += 1 {
-			out[i] = byte(i)
-		}
-		writer.Write(out)
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 128)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 128; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
-	}
-	{
-		out := make([]byte, 96)
-		for i := 0; i < 96; i += 1 {
-			out[i] = byte(i / 2)
-		}
-		writer.Write(out)
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 96)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 96; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i/2))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else if i < (128 + 96) {
-				c.Check(buffer[i], Equals, byte((i-128)/2))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
-	}
-	{
-		writer.Close()
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 0)
-		c.Check(s1.reader_error, Equals, io.EOF)
-	}
-}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-	ReadIntoBufferHelper(c, 512)
-	ReadIntoBufferHelper(c, 225)
-	ReadIntoBufferHelper(c, 224)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
-	buffer := make([]byte, 223)
-	reader, writer := io.Pipe()
-	slices := make(chan ReaderSlice)
-
-	go ReadIntoBuffer(buffer, reader, slices)
-
-	{
-		out := make([]byte, 128)
-		for i := 0; i < 128; i += 1 {
-			out[i] = byte(i)
-		}
-		writer.Write(out)
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 128)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 128; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
-	}
-	{
-		out := make([]byte, 96)
-		for i := 0; i < 96; i += 1 {
-			out[i] = byte(i / 2)
-		}
-
-		// Write will deadlock because it can't write all the data, so
-		// spin it off to a goroutine
-		go writer.Write(out)
-		s1 := <-slices
-
-		c.Check(len(s1.slice), Equals, 95)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 95; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i/2))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else if i < (128 + 95) {
-				c.Check(buffer[i], Equals, byte((i-128)/2))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
-	}
-	{
-		writer.Close()
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 0)
-		c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-	}
-
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
-	reader, writer := io.Pipe()
-
-	// Buffer for reads from 'r'
-	buffer := make([]byte, 512)
-
-	// Read requests on Transfer() buffer
-	requests := make(chan ReadRequest)
-	defer close(requests)
-
-	// Reporting reader error states
-	reader_status := make(chan error)
-
-	go Transfer(buffer, reader, requests, reader_status)
-
-	br1 := MakeBufferReader(requests)
-	out := make([]byte, 128)
-
-	{
-		// Write some data, and read into a buffer shorter than
-		// available data
-		for i := 0; i < 128; i += 1 {
-			out[i] = byte(i)
-		}
-
-		writer.Write(out[:100])
-
-		in := make([]byte, 64)
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 64)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 64; i += 1 {
-			c.Check(in[i], Equals, out[i])
-		}
-	}
-
-	{
-		// Write some more data, and read into buffer longer than
-		// available data
-		in := make([]byte, 64)
-		n, err := br1.Read(in)
-		c.Check(n, Equals, 36)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 36; i += 1 {
-			c.Check(in[i], Equals, out[64+i])
-		}
-
-	}
-
-	{
-		// Test read before write
-		type Rd struct {
-			n   int
-			err error
-		}
-		rd := make(chan Rd)
-		in := make([]byte, 64)
-
-		go func() {
-			n, err := br1.Read(in)
-			rd <- Rd{n, err}
-		}()
-
-		time.Sleep(100 * time.Millisecond)
-		writer.Write(out[100:])
-
-		got := <-rd
-
-		c.Check(got.n, Equals, 28)
-		c.Check(got.err, Equals, nil)
-
-		for i := 0; i < 28; i += 1 {
-			c.Check(in[i], Equals, out[100+i])
-		}
-	}
-
-	br2 := MakeBufferReader(requests)
-	{
-		// Test 'catch up' reader
-		in := make([]byte, 256)
-		n, err := br2.Read(in)
-
-		c.Check(n, Equals, 128)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 128; i += 1 {
-			c.Check(in[i], Equals, out[i])
-		}
-	}
-
-	{
-		// Test closing the reader
-		writer.Close()
-		status := <-reader_status
-		c.Check(status, Equals, io.EOF)
-
-		in := make([]byte, 256)
-		n1, err1 := br1.Read(in)
-		n2, err2 := br2.Read(in)
-		c.Check(n1, Equals, 0)
-		c.Check(err1, Equals, io.EOF)
-		c.Check(n2, Equals, 0)
-		c.Check(err2, Equals, io.EOF)
-	}
-
-	{
-		// Test 'catch up' reader after closing
-		br3 := MakeBufferReader(requests)
-		in := make([]byte, 256)
-		n, err := br3.Read(in)
-
-		c.Check(n, Equals, 128)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 128; i += 1 {
-			c.Check(in[i], Equals, out[i])
-		}
-
-		n, err = br3.Read(in)
-
-		c.Check(n, Equals, 0)
-		c.Check(err, Equals, io.EOF)
-	}
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
-	reader, writer := io.Pipe()
-
-	// Buffer for reads from 'r'
-	buffer := make([]byte, 100)
-
-	// Read requests on Transfer() buffer
-	requests := make(chan ReadRequest)
-	defer close(requests)
-
-	// Reporting reader error states
-	reader_status := make(chan error)
-
-	go Transfer(buffer, reader, requests, reader_status)
-
-	out := make([]byte, 101)
-	go writer.Write(out)
-
-	status := <-reader_status
-	c.Check(status, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
-	// Buffer for reads from 'r'
-	buffer := make([]byte, 100)
-	for i := 0; i < 100; i += 1 {
-		buffer[i] = byte(i)
-	}
-
-	// Read requests on Transfer() buffer
-	requests := make(chan ReadRequest)
-	defer close(requests)
-
-	go Transfer(buffer, nil, requests, nil)
-
-	br1 := MakeBufferReader(requests)
-
-	in := make([]byte, 64)
-	{
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 64)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 64; i += 1 {
-			c.Check(in[i], Equals, buffer[i])
-		}
-	}
-	{
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 36)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 36; i += 1 {
-			c.Check(in[i], Equals, buffer[64+i])
-		}
-	}
-	{
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 0)
-		c.Check(err, Equals, io.EOF)
-	}
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
-	// Buffer for reads from 'r'
-	buffer := make([]byte, 100)
-	for i := 0; i < 100; i += 1 {
-		buffer[i] = byte(i)
-	}
-
-	// Read requests on Transfer() buffer
-	requests := make(chan ReadRequest)
-	defer close(requests)
-
-	go Transfer(buffer, nil, requests, nil)
-
-	br1 := MakeBufferReader(requests)
-
-	reader, writer := io.Pipe()
-
-	go func() {
-		p := make([]byte, 100)
-		n, err := reader.Read(p)
-		c.Check(n, Equals, 100)
-		c.Check(err, Equals, nil)
-		c.Check(p, DeepEquals, buffer)
-	}()
-
-	io.Copy(writer, br1)
-}
-
 type StubPutHandler struct {
 	c              *C
 	expectPath     string
@@ -521,18 +173,18 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 			writer io.WriteCloser, upload_status chan UploadStatus) {
 
 			// Buffer for reads from 'r'
-			buffer := make([]byte, 512)
+			buf := make([]byte, 512)
 
 			// Read requests on Transfer() buffer
-			requests := make(chan ReadRequest)
+			requests := make(chan buffer.ReadRequest)
 			defer close(requests)
 
 			// Reporting reader error states
 			reader_status := make(chan error)
 
-			go Transfer(buffer, reader, requests, reader_status)
+			go buffer.Transfer(buf, reader, requests, reader_status)
 
-			br1 := MakeBufferReader(requests)
+			br1 := buffer.MakeBufferReader(requests)
 
 			go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
 

commit c3a88cbf511aa0954dac271ce6bda9c6e4f3191c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 15 16:42:33 2014 -0400

    2798: Added AuthorizedGet(), Ask() and AuthorizedAsk().  Added BLOCKSIZE
    constant and moved errors.New() declarations to the top of the file.  Improved
    test server runner.  Changed some test methods to take copies of KeepClient
    instead of pointer.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index aeb805b..2738cef 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -15,13 +15,19 @@ import (
 	"strconv"
 )
 
+// A Keep "block" is 64MB.
+const BLOCKSIZE = 64 * 1024 * 1024
+
+var BlockNotFound = errors.New("Block not found")
+var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
+
 type KeepClient struct {
 	ApiServer     string
 	ApiToken      string
 	ApiInsecure   bool
 	Service_roots []string
 	Want_replicas int
-	client        *http.Client
+	Client        *http.Client
 }
 
 type KeepDisk struct {
@@ -30,25 +36,24 @@ type KeepDisk struct {
 	SSL      bool   `json:"service_ssl_flag"`
 }
 
-func MakeKeepClient() (kc *KeepClient, err error) {
-	kc = &KeepClient{
-		ApiServer:     os.Getenv("ARVADOS_API_HOST"),
-		ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
-		ApiInsecure:   (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
-		Want_replicas: 2}
-
+func MakeKeepClient() (kc KeepClient, err error) {
 	tr := &http.Transport{
 		TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure},
 	}
 
-	kc.client = &http.Client{Transport: tr}
+	kc = KeepClient{
+		ApiServer:     os.Getenv("ARVADOS_API_HOST"),
+		ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
+		ApiInsecure:   (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
+		Want_replicas: 2,
+		Client:        &http.Client{Transport: tr}}
 
-	err = kc.DiscoverKeepDisks()
+	err = (&kc).DiscoverKeepServers()
 
 	return kc, err
 }
 
-func (this *KeepClient) DiscoverKeepDisks() error {
+func (this *KeepClient) DiscoverKeepServers() error {
 	// Construct request of keep disk list
 	var req *http.Request
 	var err error
@@ -61,7 +66,7 @@ func (this *KeepClient) DiscoverKeepDisks() error {
 
 	// Make the request
 	var resp *http.Response
-	if resp, err = this.client.Do(req); err != nil {
+	if resp, err = this.Client.Do(req); err != nil {
 		return err
 	}
 
@@ -415,7 +420,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	req.Body = body
 
 	var resp *http.Response
-	if resp, err = this.client.Do(req); err != nil {
+	if resp, err = this.Client.Do(req); err != nil {
 		upload_status <- UploadStatus{err, url, 0}
 		return
 	}
@@ -427,8 +432,6 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	}
 }
 
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-
 func (this KeepClient) putReplicas(
 	hash string,
 	requests chan ReadRequest,
@@ -496,12 +499,12 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
 	// Buffer for reads from 'r'
 	var buffer []byte
 	if expectedLength > 0 {
-		if expectedLength > 64*1024*1024 {
+		if expectedLength > BLOCKSIZE {
 			return 0, OversizeBlockError
 		}
 		buffer = make([]byte, expectedLength)
 	} else {
-		buffer = make([]byte, 64*1024*1024)
+		buffer = make([]byte, BLOCKSIZE)
 	}
 
 	// Read requests on Transfer() buffer
@@ -543,18 +546,29 @@ func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error)
 	}
 }
 
-var BlockNotFound = errors.New("Block not found")
-
 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
 	contentLength int64, url string, err error) {
+	return this.AuthorizedGet(hash, "", "")
+}
 
-	// Calculate the ordering for uploading to servers
+func (this KeepClient) AuthorizedGet(hash string,
+	signature string,
+	timestamp string) (reader io.ReadCloser,
+	contentLength int64, url string, err error) {
+
+	// Calculate the ordering for asking servers
 	sv := this.ShuffledServiceRoots(hash)
 
 	for _, host := range sv {
 		var req *http.Request
 		var err error
-		var url = fmt.Sprintf("%s/%s", host, hash)
+		var url string
+		if signature != "" {
+			url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+				signature, timestamp)
+		} else {
+			url = fmt.Sprintf("%s/%s", host, hash)
+		}
 		if req, err = http.NewRequest("GET", url, nil); err != nil {
 			continue
 		}
@@ -562,7 +576,7 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
 		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
 
 		var resp *http.Response
-		if resp, err = this.client.Do(req); err != nil {
+		if resp, err = this.Client.Do(req); err != nil {
 			continue
 		}
 
@@ -573,3 +587,42 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
 
 	return nil, 0, "", BlockNotFound
 }
+
+func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
+	return this.AuthorizedAsk(hash, "", "")
+}
+
+func (this KeepClient) AuthorizedAsk(hash string, signature string,
+	timestamp string) (contentLength int64, url string, err error) {
+	// Calculate the ordering for asking servers
+	sv := this.ShuffledServiceRoots(hash)
+
+	for _, host := range sv {
+		var req *http.Request
+		var err error
+		if signature != "" {
+			url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+				signature, timestamp)
+		} else {
+			url = fmt.Sprintf("%s/%s", host, hash)
+		}
+
+		if req, err = http.NewRequest("HEAD", url, nil); err != nil {
+			continue
+		}
+
+		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+
+		var resp *http.Response
+		if resp, err = this.Client.Do(req); err != nil {
+			continue
+		}
+
+		if resp.StatusCode == http.StatusOK {
+			return resp.ContentLength, url, nil
+		}
+	}
+
+	return 0, "", BlockNotFound
+
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 00a2063..3d38c60 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,6 +13,7 @@ import (
 	"os"
 	"os/exec"
 	"sort"
+	"strings"
 	"testing"
 	"time"
 )
@@ -32,18 +33,23 @@ type ServerRequiredSuite struct{}
 // Standalone tests
 type StandaloneSuite struct{}
 
+func pythonDir() string {
+	gopath := os.Getenv("GOPATH")
+	return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
+}
+
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
 	if *no_server {
 		c.Skip("Skipping tests that require server")
 	} else {
-		os.Chdir(os.ExpandEnv("$GOPATH../python"))
+		os.Chdir(pythonDir())
 		exec.Command("python", "run_test_server.py", "start").Run()
 		exec.Command("python", "run_test_server.py", "start_keep").Run()
 	}
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
-	os.Chdir(os.ExpandEnv("$GOPATH../python"))
+	os.Chdir(pythonDir())
 	exec.Command("python", "run_test_server.py", "stop_keep").Run()
 	exec.Command("python", "run_test_server.py", "stop").Run()
 }
@@ -464,7 +470,7 @@ func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url s
 	return listener, url
 }
 
-func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
+func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
 	io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
 
 	listener, url := RunBogusKeepServer(st, 2990)
@@ -488,7 +494,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 		make(chan string)}
 
 	UploadToStubHelper(c, st,
-		func(kc *KeepClient, url string, reader io.ReadCloser,
+		func(kc KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan UploadStatus) {
 
 			go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
@@ -511,7 +517,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 		make(chan string)}
 
 	UploadToStubHelper(c, st,
-		func(kc *KeepClient, url string, reader io.ReadCloser,
+		func(kc KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan UploadStatus) {
 
 			// Buffer for reads from 'r'
@@ -559,7 +565,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 	hash := "acbd18db4cc2f85cedef654fccc4a4d8"
 
 	UploadToStubHelper(c, st,
-		func(kc *KeepClient, url string, reader io.ReadCloser,
+		func(kc KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan UploadStatus) {
 
 			go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
@@ -869,7 +875,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 	c.Check(content, DeepEquals, []byte("foo"))
 }
 
-func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
+func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
 	os.Setenv("ARVADOS_API_HOST", "localhost:3001")
 	os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
 	os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
@@ -882,12 +888,21 @@ func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
 	c.Check(replicas, Equals, 2)
 	c.Check(err, Equals, nil)
 
-	r, n, url2, err := kc.Get(hash)
-	c.Check(err, Equals, nil)
-	c.Check(n, Equals, int64(3))
-	c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+	{
+		r, n, url2, err := kc.Get(hash)
+		c.Check(err, Equals, nil)
+		c.Check(n, Equals, int64(3))
+		c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
 
-	content, err2 := ioutil.ReadAll(r)
-	c.Check(err2, Equals, nil)
-	c.Check(content, DeepEquals, []byte("foo"))
+		content, err2 := ioutil.ReadAll(r)
+		c.Check(err2, Equals, nil)
+		c.Check(content, DeepEquals, []byte("foo"))
+	}
+
+	{
+		n, url2, err := kc.Ask(hash)
+		c.Check(err, Equals, nil)
+		c.Check(n, Equals, int64(3))
+		c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+	}
 }

commit 4ec57745d2106e955fea4442c9eccb2fce7246c4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 15 16:36:18 2014 -0400

    Moved non-keep-specific buffering code into a separate package.

diff --git a/sdk/go/src/arvados.org/buffer/buffer.go b/sdk/go/src/arvados.org/buffer/buffer.go
new file mode 100644
index 0000000..6af1dd0
--- /dev/null
+++ b/sdk/go/src/arvados.org/buffer/buffer.go
@@ -0,0 +1,243 @@
+package buffer
+
+import (
+	"io"
+	"log"
+)
+
+type ReaderSlice struct {
+	slice        []byte
+	reader_error error
+}
+
+// Read repeatedly from the reader into the specified buffer, and report each
+// read to channel 'c'.  Completes when Reader 'r' reports on the error channel
+// and closes channel 'c'.
+func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
+	defer close(slices)
+
+	// Initially use entire buffer as scratch space
+	ptr := buffer[:]
+	for {
+		var n int
+		var err error
+		if len(ptr) > 0 {
+			// Read into the scratch space
+			n, err = r.Read(ptr)
+		} else {
+			// Ran out of scratch space, try reading one more byte
+			var b [1]byte
+			n, err = r.Read(b[:])
+
+			if n > 0 {
+				// Reader has more data but we have nowhere to
+				// put it, so we're stuffed
+				slices <- ReaderSlice{nil, io.ErrShortBuffer}
+			} else {
+				// Return some other error (hopefully EOF)
+				slices <- ReaderSlice{nil, err}
+			}
+			return
+		}
+
+		// End on error (includes EOF)
+		if err != nil {
+			slices <- ReaderSlice{nil, err}
+			return
+		}
+
+		if n > 0 {
+			// Make a slice with the contents of the read
+			slices <- ReaderSlice{ptr[:n], nil}
+
+			// Adjust the scratch space slice
+			ptr = ptr[n:]
+		}
+	}
+}
+
+// A read request to the Transfer() function
+type ReadRequest struct {
+	offset  int
+	maxsize int
+	result  chan<- ReadResult
+}
+
+// A read result from the Transfer() function
+type ReadResult struct {
+	slice []byte
+	err   error
+}
+
+// Reads from the buffer managed by the Transfer()
+type BufferReader struct {
+	offset    *int
+	requests  chan<- ReadRequest
+	responses chan ReadResult
+}
+
+func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
+	return BufferReader{new(int), requests, make(chan ReadResult)}
+}
+
+// Reads from the buffer managed by the Transfer()
+func (this BufferReader) Read(p []byte) (n int, err error) {
+	this.requests <- ReadRequest{*this.offset, len(p), this.responses}
+	rr, valid := <-this.responses
+	if valid {
+		*this.offset += len(rr.slice)
+		return copy(p, rr.slice), rr.err
+	} else {
+		return 0, io.ErrUnexpectedEOF
+	}
+}
+
+func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+	// Record starting offset in order to correctly report the number of bytes sent
+	starting_offset := *this.offset
+	for {
+		this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
+		rr, valid := <-this.responses
+		if valid {
+			log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
+			*this.offset += len(rr.slice)
+			if rr.err != nil {
+				if rr.err == io.EOF {
+					// EOF is not an error.
+					return int64(*this.offset - starting_offset), nil
+				} else {
+					return int64(*this.offset - starting_offset), rr.err
+				}
+			} else {
+				dest.Write(rr.slice)
+			}
+		} else {
+			return int64(*this.offset), io.ErrUnexpectedEOF
+		}
+	}
+}
+
+// Close the responses channel
+func (this BufferReader) Close() error {
+	close(this.responses)
+	return nil
+}
+
+// Handle a read request.  Returns true if a response was sent, and false if
+// the request should be queued.
+func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
+	log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
+	if req.offset < len(body) {
+		var end int
+		if req.offset+req.maxsize < len(body) {
+			end = req.offset + req.maxsize
+		} else {
+			end = len(body)
+		}
+		req.result <- ReadResult{body[req.offset:end], nil}
+		return true
+	} else if complete && req.offset >= len(body) {
+		req.result <- ReadResult{nil, io.EOF}
+		return true
+	} else {
+		return false
+	}
+}
+
+// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
+// in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
+// Accepts read requests on the buffer on the 'requests' channel.  Completes
+// when 'requests' channel is closed.
+func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
+	// currently buffered data
+	var body []byte
+
+	// for receiving slices from ReadIntoBuffer
+	var slices chan ReaderSlice = nil
+
+	// indicates whether the buffered data is complete
+	var complete bool = false
+
+	if source_reader != nil {
+		// 'body' is the buffer slice representing the body content read so far
+		body = source_buffer[:0]
+
+		// used to communicate slices of the buffer as they are
+		// ReadIntoBuffer will close 'slices' when it is done with it
+		slices = make(chan ReaderSlice)
+
+		// Spin it off
+		go ReadIntoBuffer(source_buffer, source_reader, slices)
+	} else {
+		// use the whole buffer
+		body = source_buffer[:]
+
+		// buffer is complete
+		complete = true
+	}
+
+	pending_requests := make([]ReadRequest, 0)
+
+	for {
+		select {
+		case req, valid := <-requests:
+			// Handle a buffer read request
+			if valid {
+				if !HandleReadRequest(req, body, complete) {
+					pending_requests = append(pending_requests, req)
+				}
+			} else {
+				// closed 'requests' channel indicates we're done
+				return
+			}
+
+		case bk, valid := <-slices:
+			// Got a new slice from the reader
+			if valid {
+				if bk.reader_error != nil {
+					reader_error <- bk.reader_error
+					if bk.reader_error == io.EOF {
+						// EOF indicates the reader is done
+						// sending, so our buffer is complete.
+						complete = true
+					} else {
+						// some other reader error
+						return
+					}
+				}
+
+				if bk.slice != nil {
+					// adjust body bounds now that another slice has been read
+					body = source_buffer[0 : len(body)+len(bk.slice)]
+				}
+
+				// handle pending reads
+				n := 0
+				for n < len(pending_requests) {
+					if HandleReadRequest(pending_requests[n], body, complete) {
+
+						// move the element from the
+						// back of the slice to
+						// position 'n', then shorten
+						// the slice by one element
+						pending_requests[n] = pending_requests[len(pending_requests)-1]
+						pending_requests = pending_requests[0 : len(pending_requests)-1]
+					} else {
+
+						// Request wasn't handled, so keep it in the request slice
+						n += 1
+					}
+				}
+			} else {
+				if complete {
+					// no more reads
+					slices = nil
+				} else {
+					// reader channel closed without signaling EOF
+					reader_error <- io.ErrUnexpectedEOF
+					return
+				}
+			}
+		}
+	}
+}
diff --git a/sdk/go/src/arvados.org/buffer/buffer_test.go b/sdk/go/src/arvados.org/buffer/buffer_test.go
new file mode 100644
index 0000000..28e1e66
--- /dev/null
+++ b/sdk/go/src/arvados.org/buffer/buffer_test.go
@@ -0,0 +1,364 @@
+package buffer
+
+import (
+	. "gopkg.in/check.v1"
+	"io"
+	"testing"
+	"time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) { TestingT(t) }
+
+var _ = Suite(&StandaloneSuite{})
+
+// Standalone tests
+type StandaloneSuite struct{}
+
+func ReadIntoBufferHelper(c *C, bufsize int) {
+	buffer := make([]byte, bufsize)
+
+	reader, writer := io.Pipe()
+	slices := make(chan ReaderSlice)
+
+	go ReadIntoBuffer(buffer, reader, slices)
+
+	{
+		out := make([]byte, 128)
+		for i := 0; i < 128; i += 1 {
+			out[i] = byte(i)
+		}
+		writer.Write(out)
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 128)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 128; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		out := make([]byte, 96)
+		for i := 0; i < 96; i += 1 {
+			out[i] = byte(i / 2)
+		}
+		writer.Write(out)
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 96)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 96; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i/2))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else if i < (128 + 96) {
+				c.Check(buffer[i], Equals, byte((i-128)/2))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		writer.Close()
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 0)
+		c.Check(s1.reader_error, Equals, io.EOF)
+	}
+}
+
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+	ReadIntoBufferHelper(c, 512)
+	ReadIntoBufferHelper(c, 225)
+	ReadIntoBufferHelper(c, 224)
+}
+
+func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
+	buffer := make([]byte, 223)
+	reader, writer := io.Pipe()
+	slices := make(chan ReaderSlice)
+
+	go ReadIntoBuffer(buffer, reader, slices)
+
+	{
+		out := make([]byte, 128)
+		for i := 0; i < 128; i += 1 {
+			out[i] = byte(i)
+		}
+		writer.Write(out)
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 128)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 128; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		out := make([]byte, 96)
+		for i := 0; i < 96; i += 1 {
+			out[i] = byte(i / 2)
+		}
+
+		// Write will deadlock because it can't write all the data, so
+		// spin it off to a goroutine
+		go writer.Write(out)
+		s1 := <-slices
+
+		c.Check(len(s1.slice), Equals, 95)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 95; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i/2))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else if i < (128 + 95) {
+				c.Check(buffer[i], Equals, byte((i-128)/2))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		writer.Close()
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 0)
+		c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+	}
+
+}
+
+func (s *StandaloneSuite) TestTransfer(c *C) {
+	reader, writer := io.Pipe()
+
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 512)
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	// Reporting reader error states
+	reader_status := make(chan error)
+
+	go Transfer(buffer, reader, requests, reader_status)
+
+	br1 := MakeBufferReader(requests)
+	out := make([]byte, 128)
+
+	{
+		// Write some data, and read into a buffer shorter than
+		// available data
+		for i := 0; i < 128; i += 1 {
+			out[i] = byte(i)
+		}
+
+		writer.Write(out[:100])
+
+		in := make([]byte, 64)
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 64)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 64; i += 1 {
+			c.Check(in[i], Equals, out[i])
+		}
+	}
+
+	{
+		// Write some more data, and read into buffer longer than
+		// available data
+		in := make([]byte, 64)
+		n, err := br1.Read(in)
+		c.Check(n, Equals, 36)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 36; i += 1 {
+			c.Check(in[i], Equals, out[64+i])
+		}
+
+	}
+
+	{
+		// Test read before write
+		type Rd struct {
+			n   int
+			err error
+		}
+		rd := make(chan Rd)
+		in := make([]byte, 64)
+
+		go func() {
+			n, err := br1.Read(in)
+			rd <- Rd{n, err}
+		}()
+
+		time.Sleep(100 * time.Millisecond)
+		writer.Write(out[100:])
+
+		got := <-rd
+
+		c.Check(got.n, Equals, 28)
+		c.Check(got.err, Equals, nil)
+
+		for i := 0; i < 28; i += 1 {
+			c.Check(in[i], Equals, out[100+i])
+		}
+	}
+
+	br2 := MakeBufferReader(requests)
+	{
+		// Test 'catch up' reader
+		in := make([]byte, 256)
+		n, err := br2.Read(in)
+
+		c.Check(n, Equals, 128)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 128; i += 1 {
+			c.Check(in[i], Equals, out[i])
+		}
+	}
+
+	{
+		// Test closing the reader
+		writer.Close()
+		status := <-reader_status
+		c.Check(status, Equals, io.EOF)
+
+		in := make([]byte, 256)
+		n1, err1 := br1.Read(in)
+		n2, err2 := br2.Read(in)
+		c.Check(n1, Equals, 0)
+		c.Check(err1, Equals, io.EOF)
+		c.Check(n2, Equals, 0)
+		c.Check(err2, Equals, io.EOF)
+	}
+
+	{
+		// Test 'catch up' reader after closing
+		br3 := MakeBufferReader(requests)
+		in := make([]byte, 256)
+		n, err := br3.Read(in)
+
+		c.Check(n, Equals, 128)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 128; i += 1 {
+			c.Check(in[i], Equals, out[i])
+		}
+
+		n, err = br3.Read(in)
+
+		c.Check(n, Equals, 0)
+		c.Check(err, Equals, io.EOF)
+	}
+}
+
+func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
+	reader, writer := io.Pipe()
+
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 100)
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	// Reporting reader error states
+	reader_status := make(chan error)
+
+	go Transfer(buffer, reader, requests, reader_status)
+
+	out := make([]byte, 101)
+	go writer.Write(out)
+
+	status := <-reader_status
+	c.Check(status, Equals, io.ErrShortBuffer)
+}
+
+func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 100)
+	for i := 0; i < 100; i += 1 {
+		buffer[i] = byte(i)
+	}
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	go Transfer(buffer, nil, requests, nil)
+
+	br1 := MakeBufferReader(requests)
+
+	in := make([]byte, 64)
+	{
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 64)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 64; i += 1 {
+			c.Check(in[i], Equals, buffer[i])
+		}
+	}
+	{
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 36)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 36; i += 1 {
+			c.Check(in[i], Equals, buffer[64+i])
+		}
+	}
+	{
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 0)
+		c.Check(err, Equals, io.EOF)
+	}
+}
+
+func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 100)
+	for i := 0; i < 100; i += 1 {
+		buffer[i] = byte(i)
+	}
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	go Transfer(buffer, nil, requests, nil)
+
+	br1 := MakeBufferReader(requests)
+
+	reader, writer := io.Pipe()
+
+	go func() {
+		p := make([]byte, 100)
+		n, err := reader.Read(p)
+		c.Check(n, Equals, 100)
+		c.Check(err, Equals, nil)
+		c.Check(p, DeepEquals, buffer)
+	}()
+
+	io.Copy(writer, br1)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list