[ARVADOS] updated: d3b11ddc2506de37b8e6538be69237d6d2a60a4a
git at public.curoverse.com
git at public.curoverse.com
Thu May 15 16:47:25 EDT 2014
Summary of changes:
sdk/go/src/arvados.org/buffer/buffer.go | 243 +++++++++++++
sdk/go/src/arvados.org/buffer/buffer_test.go | 364 +++++++++++++++++++
sdk/go/src/arvados.org/keepclient/keepclient.go | 355 +++++-------------
.../src/arvados.org/keepclient/keepclient_test.go | 401 ++-------------------
4 files changed, 727 insertions(+), 636 deletions(-)
create mode 100644 sdk/go/src/arvados.org/buffer/buffer.go
create mode 100644 sdk/go/src/arvados.org/buffer/buffer_test.go
via d3b11ddc2506de37b8e6538be69237d6d2a60a4a (commit)
via c3a88cbf511aa0954dac271ce6bda9c6e4f3191c (commit)
via 4ec57745d2106e955fea4442c9eccb2fce7246c4 (commit)
from 27f5c1635d56c3f3cb6c5ef069c28db939eec2a1 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d3b11ddc2506de37b8e6538be69237d6d2a60a4a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 15 16:47:14 2014 -0400
2798: Completed move of Transfer() related code out to 'buffer' package.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 2738cef..829ab0e 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -1,6 +1,7 @@
package keepclient
import (
+ "arvados.org/buffer"
"crypto/md5"
"crypto/tls"
"encoding/json"
@@ -155,243 +156,6 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
return pseq
}
-type ReaderSlice struct {
- slice []byte
- reader_error error
-}
-
-// Read repeatedly from the reader into the specified buffer, and report each
-// read to channel 'c'. Completes when Reader 'r' reports on the error channel
-// and closes channel 'c'.
-func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
- defer close(slices)
-
- // Initially use entire buffer as scratch space
- ptr := buffer[:]
- for {
- var n int
- var err error
- if len(ptr) > 0 {
- // Read into the scratch space
- n, err = r.Read(ptr)
- } else {
- // Ran out of scratch space, try reading one more byte
- var b [1]byte
- n, err = r.Read(b[:])
-
- if n > 0 {
- // Reader has more data but we have nowhere to
- // put it, so we're stuffed
- slices <- ReaderSlice{nil, io.ErrShortBuffer}
- } else {
- // Return some other error (hopefully EOF)
- slices <- ReaderSlice{nil, err}
- }
- return
- }
-
- // End on error (includes EOF)
- if err != nil {
- slices <- ReaderSlice{nil, err}
- return
- }
-
- if n > 0 {
- // Make a slice with the contents of the read
- slices <- ReaderSlice{ptr[:n], nil}
-
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- }
-}
-
-// A read request to the Transfer() function
-type ReadRequest struct {
- offset int
- maxsize int
- result chan<- ReadResult
-}
-
-// A read result from the Transfer() function
-type ReadResult struct {
- slice []byte
- err error
-}
-
-// Reads from the buffer managed by the Transfer()
-type BufferReader struct {
- offset *int
- requests chan<- ReadRequest
- responses chan ReadResult
-}
-
-func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
- return BufferReader{new(int), requests, make(chan ReadResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this BufferReader) Read(p []byte) (n int, err error) {
- this.requests <- ReadRequest{*this.offset, len(p), this.responses}
- rr, valid := <-this.responses
- if valid {
- *this.offset += len(rr.slice)
- return copy(p, rr.slice), rr.err
- } else {
- return 0, io.ErrUnexpectedEOF
- }
-}
-
-func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
- // Record starting offset in order to correctly report the number of bytes sent
- starting_offset := *this.offset
- for {
- this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
- rr, valid := <-this.responses
- if valid {
- log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
- *this.offset += len(rr.slice)
- if rr.err != nil {
- if rr.err == io.EOF {
- // EOF is not an error.
- return int64(*this.offset - starting_offset), nil
- } else {
- return int64(*this.offset - starting_offset), rr.err
- }
- } else {
- dest.Write(rr.slice)
- }
- } else {
- return int64(*this.offset), io.ErrUnexpectedEOF
- }
- }
-}
-
-// Close the responses channel
-func (this BufferReader) Close() error {
- close(this.responses)
- return nil
-}
-
-// Handle a read request. Returns true if a response was sent, and false if
-// the request should be queued.
-func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
- log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
- if req.offset < len(body) {
- var end int
- if req.offset+req.maxsize < len(body) {
- end = req.offset + req.maxsize
- } else {
- end = len(body)
- }
- req.result <- ReadResult{body[req.offset:end], nil}
- return true
- } else if complete && req.offset >= len(body) {
- req.result <- ReadResult{nil, io.EOF}
- return true
- } else {
- return false
- }
-}
-
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel. Completes
-// when 'requests' channel is closed.
-func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
- // currently buffered data
- var body []byte
-
- // for receiving slices from ReadIntoBuffer
- var slices chan ReaderSlice = nil
-
- // indicates whether the buffered data is complete
- var complete bool = false
-
- if source_reader != nil {
- // 'body' is the buffer slice representing the body content read so far
- body = source_buffer[:0]
-
- // used to communicate slices of the buffer as they are
- // ReadIntoBuffer will close 'slices' when it is done with it
- slices = make(chan ReaderSlice)
-
- // Spin it off
- go ReadIntoBuffer(source_buffer, source_reader, slices)
- } else {
- // use the whole buffer
- body = source_buffer[:]
-
- // buffer is complete
- complete = true
- }
-
- pending_requests := make([]ReadRequest, 0)
-
- for {
- select {
- case req, valid := <-requests:
- // Handle a buffer read request
- if valid {
- if !HandleReadRequest(req, body, complete) {
- pending_requests = append(pending_requests, req)
- }
- } else {
- // closed 'requests' channel indicates we're done
- return
- }
-
- case bk, valid := <-slices:
- // Got a new slice from the reader
- if valid {
- if bk.reader_error != nil {
- reader_error <- bk.reader_error
- if bk.reader_error == io.EOF {
- // EOF indicates the reader is done
- // sending, so our buffer is complete.
- complete = true
- } else {
- // some other reader error
- return
- }
- }
-
- if bk.slice != nil {
- // adjust body bounds now that another slice has been read
- body = source_buffer[0 : len(body)+len(bk.slice)]
- }
-
- // handle pending reads
- n := 0
- for n < len(pending_requests) {
- if HandleReadRequest(pending_requests[n], body, complete) {
-
- // move the element from the
- // back of the slice to
- // position 'n', then shorten
- // the slice by one element
- pending_requests[n] = pending_requests[len(pending_requests)-1]
- pending_requests = pending_requests[0 : len(pending_requests)-1]
- } else {
-
- // Request wasn't handled, so keep it in the request slice
- n += 1
- }
- }
- } else {
- if complete {
- // no more reads
- slices = nil
- } else {
- // reader channel closed without signaling EOF
- reader_error <- io.ErrUnexpectedEOF
- return
- }
- }
- }
- }
-}
-
type UploadStatus struct {
Err error
Url string
@@ -434,7 +198,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
func (this KeepClient) putReplicas(
hash string,
- requests chan ReadRequest,
+ requests chan buffer.ReadRequest,
reader_status chan error,
expectedLength int64) (replicas int, err error) {
@@ -458,7 +222,7 @@ func (this KeepClient) putReplicas(
for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
- go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status, expectedLength)
+ go this.uploadToKeepServer(sv[next_server], hash, buffer.MakeBufferReader(requests), upload_status, expectedLength)
next_server += 1
active += 1
} else {
@@ -497,18 +261,18 @@ var OversizeBlockError = errors.New("Block too big")
func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
// Buffer for reads from 'r'
- var buffer []byte
+ var buf []byte
if expectedLength > 0 {
if expectedLength > BLOCKSIZE {
return 0, OversizeBlockError
}
- buffer = make([]byte, expectedLength)
+ buf = make([]byte, expectedLength)
} else {
- buffer = make([]byte, BLOCKSIZE)
+ buf = make([]byte, BLOCKSIZE)
}
// Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
+ requests := make(chan buffer.ReadRequest)
defer close(requests)
// Reporting reader error states
@@ -516,20 +280,20 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
defer close(reader_status)
// Start the transfer goroutine
- go Transfer(buffer, r, requests, reader_status)
+ go buffer.Transfer(buf, r, requests, reader_status)
return this.putReplicas(hash, requests, reader_status, expectedLength)
}
-func (this KeepClient) PutHB(hash string, buffer []byte) (replicas int, err error) {
+func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
// Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
+ requests := make(chan buffer.ReadRequest)
defer close(requests)
// Start the transfer goroutine
- go Transfer(buffer, nil, requests, nil)
+ go buffer.Transfer(buf, nil, requests, nil)
- return this.putReplicas(hash, requests, nil, int64(len(buffer)))
+ return this.putReplicas(hash, requests, nil, int64(len(buf)))
}
func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 3d38c60..5f189fc 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -1,6 +1,7 @@
package keepclient
import (
+ "arvados.org/buffer"
"crypto/md5"
"flag"
"fmt"
@@ -15,7 +16,6 @@ import (
"sort"
"strings"
"testing"
- "time"
)
// Gocheck boilerplate
@@ -89,354 +89,6 @@ func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
}
-func ReadIntoBufferHelper(c *C, bufsize int) {
- buffer := make([]byte, bufsize)
-
- reader, writer := io.Pipe()
- slices := make(chan ReaderSlice)
-
- go ReadIntoBuffer(buffer, reader, slices)
-
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 512)
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
- buffer := make([]byte, 223)
- reader, writer := io.Pipe()
- slices := make(chan ReaderSlice)
-
- go ReadIntoBuffer(buffer, reader, slices)
-
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
-
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
-
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
- }
-
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
- reader, writer := io.Pipe()
-
- // Buffer for reads from 'r'
- buffer := make([]byte, 512)
-
- // Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
- defer close(requests)
-
- // Reporting reader error states
- reader_status := make(chan error)
-
- go Transfer(buffer, reader, requests, reader_status)
-
- br1 := MakeBufferReader(requests)
- out := make([]byte, 128)
-
- {
- // Write some data, and read into a buffer shorter than
- // available data
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
-
- writer.Write(out[:100])
-
- in := make([]byte, 64)
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Write some more data, and read into buffer longer than
- // available data
- in := make([]byte, 64)
- n, err := br1.Read(in)
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, out[64+i])
- }
-
- }
-
- {
- // Test read before write
- type Rd struct {
- n int
- err error
- }
- rd := make(chan Rd)
- in := make([]byte, 64)
-
- go func() {
- n, err := br1.Read(in)
- rd <- Rd{n, err}
- }()
-
- time.Sleep(100 * time.Millisecond)
- writer.Write(out[100:])
-
- got := <-rd
-
- c.Check(got.n, Equals, 28)
- c.Check(got.err, Equals, nil)
-
- for i := 0; i < 28; i += 1 {
- c.Check(in[i], Equals, out[100+i])
- }
- }
-
- br2 := MakeBufferReader(requests)
- {
- // Test 'catch up' reader
- in := make([]byte, 256)
- n, err := br2.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Test closing the reader
- writer.Close()
- status := <-reader_status
- c.Check(status, Equals, io.EOF)
-
- in := make([]byte, 256)
- n1, err1 := br1.Read(in)
- n2, err2 := br2.Read(in)
- c.Check(n1, Equals, 0)
- c.Check(err1, Equals, io.EOF)
- c.Check(n2, Equals, 0)
- c.Check(err2, Equals, io.EOF)
- }
-
- {
- // Test 'catch up' reader after closing
- br3 := MakeBufferReader(requests)
- in := make([]byte, 256)
- n, err := br3.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
-
- n, err = br3.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
- reader, writer := io.Pipe()
-
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
-
- // Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
- defer close(requests)
-
- // Reporting reader error states
- reader_status := make(chan error)
-
- go Transfer(buffer, reader, requests, reader_status)
-
- out := make([]byte, 101)
- go writer.Write(out)
-
- status := <-reader_status
- c.Check(status, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- // Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
- defer close(requests)
-
- go Transfer(buffer, nil, requests, nil)
-
- br1 := MakeBufferReader(requests)
-
- in := make([]byte, 64)
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, buffer[i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, buffer[64+i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- // Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
- defer close(requests)
-
- go Transfer(buffer, nil, requests, nil)
-
- br1 := MakeBufferReader(requests)
-
- reader, writer := io.Pipe()
-
- go func() {
- p := make([]byte, 100)
- n, err := reader.Read(p)
- c.Check(n, Equals, 100)
- c.Check(err, Equals, nil)
- c.Check(p, DeepEquals, buffer)
- }()
-
- io.Copy(writer, br1)
-}
-
type StubPutHandler struct {
c *C
expectPath string
@@ -521,18 +173,18 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
writer io.WriteCloser, upload_status chan UploadStatus) {
// Buffer for reads from 'r'
- buffer := make([]byte, 512)
+ buf := make([]byte, 512)
// Read requests on Transfer() buffer
- requests := make(chan ReadRequest)
+ requests := make(chan buffer.ReadRequest)
defer close(requests)
// Reporting reader error states
reader_status := make(chan error)
- go Transfer(buffer, reader, requests, reader_status)
+ go buffer.Transfer(buf, reader, requests, reader_status)
- br1 := MakeBufferReader(requests)
+ br1 := buffer.MakeBufferReader(requests)
go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
commit c3a88cbf511aa0954dac271ce6bda9c6e4f3191c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 15 16:42:33 2014 -0400
2798: Added AuthorizedGet(), Ask() and AuthorizedAsk(). Added BLOCKSIZE
constant and moved errors.New() declarations to the top of the file. Improved
test server runner. Changed some test methods to take copies of KeepClient
instead of pointer.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index aeb805b..2738cef 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -15,13 +15,19 @@ import (
"strconv"
)
+// A Keep "block" is 64MB.
+const BLOCKSIZE = 64 * 1024 * 1024
+
+var BlockNotFound = errors.New("Block not found")
+var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
+
type KeepClient struct {
ApiServer string
ApiToken string
ApiInsecure bool
Service_roots []string
Want_replicas int
- client *http.Client
+ Client *http.Client
}
type KeepDisk struct {
@@ -30,25 +36,24 @@ type KeepDisk struct {
SSL bool `json:"service_ssl_flag"`
}
-func MakeKeepClient() (kc *KeepClient, err error) {
- kc = &KeepClient{
- ApiServer: os.Getenv("ARVADOS_API_HOST"),
- ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
- ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
- Want_replicas: 2}
-
+func MakeKeepClient() (kc KeepClient, err error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure},
}
- kc.client = &http.Client{Transport: tr}
+ kc = KeepClient{
+ ApiServer: os.Getenv("ARVADOS_API_HOST"),
+ ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
+ ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
+ Want_replicas: 2,
+ Client: &http.Client{Transport: tr}}
- err = kc.DiscoverKeepDisks()
+ err = (&kc).DiscoverKeepServers()
return kc, err
}
-func (this *KeepClient) DiscoverKeepDisks() error {
+func (this *KeepClient) DiscoverKeepServers() error {
// Construct request of keep disk list
var req *http.Request
var err error
@@ -61,7 +66,7 @@ func (this *KeepClient) DiscoverKeepDisks() error {
// Make the request
var resp *http.Response
- if resp, err = this.client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil {
return err
}
@@ -415,7 +420,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
req.Body = body
var resp *http.Response
- if resp, err = this.client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil {
upload_status <- UploadStatus{err, url, 0}
return
}
@@ -427,8 +432,6 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
}
}
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-
func (this KeepClient) putReplicas(
hash string,
requests chan ReadRequest,
@@ -496,12 +499,12 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
// Buffer for reads from 'r'
var buffer []byte
if expectedLength > 0 {
- if expectedLength > 64*1024*1024 {
+ if expectedLength > BLOCKSIZE {
return 0, OversizeBlockError
}
buffer = make([]byte, expectedLength)
} else {
- buffer = make([]byte, 64*1024*1024)
+ buffer = make([]byte, BLOCKSIZE)
}
// Read requests on Transfer() buffer
@@ -543,18 +546,29 @@ func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error)
}
}
-var BlockNotFound = errors.New("Block not found")
-
func (this KeepClient) Get(hash string) (reader io.ReadCloser,
contentLength int64, url string, err error) {
+ return this.AuthorizedGet(hash, "", "")
+}
- // Calculate the ordering for uploading to servers
+func (this KeepClient) AuthorizedGet(hash string,
+ signature string,
+ timestamp string) (reader io.ReadCloser,
+ contentLength int64, url string, err error) {
+
+ // Calculate the ordering for asking servers
sv := this.ShuffledServiceRoots(hash)
for _, host := range sv {
var req *http.Request
var err error
- var url = fmt.Sprintf("%s/%s", host, hash)
+ var url string
+ if signature != "" {
+ url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+ signature, timestamp)
+ } else {
+ url = fmt.Sprintf("%s/%s", host, hash)
+ }
if req, err = http.NewRequest("GET", url, nil); err != nil {
continue
}
@@ -562,7 +576,7 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
var resp *http.Response
- if resp, err = this.client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil {
continue
}
@@ -573,3 +587,42 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
return nil, 0, "", BlockNotFound
}
+
+func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
+ return this.AuthorizedAsk(hash, "", "")
+}
+
+func (this KeepClient) AuthorizedAsk(hash string, signature string,
+ timestamp string) (contentLength int64, url string, err error) {
+ // Calculate the ordering for asking servers
+ sv := this.ShuffledServiceRoots(hash)
+
+ for _, host := range sv {
+ var req *http.Request
+ var err error
+ if signature != "" {
+ url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+ signature, timestamp)
+ } else {
+ url = fmt.Sprintf("%s/%s", host, hash)
+ }
+
+ if req, err = http.NewRequest("HEAD", url, nil); err != nil {
+ continue
+ }
+
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+
+ var resp *http.Response
+ if resp, err = this.Client.Do(req); err != nil {
+ continue
+ }
+
+ if resp.StatusCode == http.StatusOK {
+ return resp.ContentLength, url, nil
+ }
+ }
+
+ return 0, "", BlockNotFound
+
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 00a2063..3d38c60 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,6 +13,7 @@ import (
"os"
"os/exec"
"sort"
+ "strings"
"testing"
"time"
)
@@ -32,18 +33,23 @@ type ServerRequiredSuite struct{}
// Standalone tests
type StandaloneSuite struct{}
+func pythonDir() string {
+ gopath := os.Getenv("GOPATH")
+ return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
+}
+
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
if *no_server {
c.Skip("Skipping tests that require server")
} else {
- os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ os.Chdir(pythonDir())
exec.Command("python", "run_test_server.py", "start").Run()
exec.Command("python", "run_test_server.py", "start_keep").Run()
}
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
- os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ os.Chdir(pythonDir())
exec.Command("python", "run_test_server.py", "stop_keep").Run()
exec.Command("python", "run_test_server.py", "stop").Run()
}
@@ -464,7 +470,7 @@ func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url s
return listener, url
}
-func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
+func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
listener, url := RunBogusKeepServer(st, 2990)
@@ -488,7 +494,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
make(chan string)}
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
+ func(kc KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan UploadStatus) {
go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
@@ -511,7 +517,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
make(chan string)}
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
+ func(kc KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan UploadStatus) {
// Buffer for reads from 'r'
@@ -559,7 +565,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
hash := "acbd18db4cc2f85cedef654fccc4a4d8"
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
+ func(kc KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan UploadStatus) {
go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
@@ -869,7 +875,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
c.Check(content, DeepEquals, []byte("foo"))
}
-func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
+func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
os.Setenv("ARVADOS_API_HOST", "localhost:3001")
os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
@@ -882,12 +888,21 @@ func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
c.Check(replicas, Equals, 2)
c.Check(err, Equals, nil)
- r, n, url2, err := kc.Get(hash)
- c.Check(err, Equals, nil)
- c.Check(n, Equals, int64(3))
- c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+ {
+ r, n, url2, err := kc.Get(hash)
+ c.Check(err, Equals, nil)
+ c.Check(n, Equals, int64(3))
+ c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
- content, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
- c.Check(content, DeepEquals, []byte("foo"))
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, []byte("foo"))
+ }
+
+ {
+ n, url2, err := kc.Ask(hash)
+ c.Check(err, Equals, nil)
+ c.Check(n, Equals, int64(3))
+ c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+ }
}
commit 4ec57745d2106e955fea4442c9eccb2fce7246c4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 15 16:36:18 2014 -0400
Moved non-keep-specific buffering code into a separate package.
diff --git a/sdk/go/src/arvados.org/buffer/buffer.go b/sdk/go/src/arvados.org/buffer/buffer.go
new file mode 100644
index 0000000..6af1dd0
--- /dev/null
+++ b/sdk/go/src/arvados.org/buffer/buffer.go
@@ -0,0 +1,243 @@
+package buffer
+
+import (
+ "io"
+ "log"
+)
+
+type ReaderSlice struct {
+ slice []byte
+ reader_error error
+}
+
+// Read repeatedly from the reader into the specified buffer, and report each
+// read to channel 'c'. Completes when Reader 'r' reports on the error channel
+// and closes channel 'c'.
+func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
+ defer close(slices)
+
+ // Initially use entire buffer as scratch space
+ ptr := buffer[:]
+ for {
+ var n int
+ var err error
+ if len(ptr) > 0 {
+ // Read into the scratch space
+ n, err = r.Read(ptr)
+ } else {
+ // Ran out of scratch space, try reading one more byte
+ var b [1]byte
+ n, err = r.Read(b[:])
+
+ if n > 0 {
+ // Reader has more data but we have nowhere to
+ // put it, so we're stuffed
+ slices <- ReaderSlice{nil, io.ErrShortBuffer}
+ } else {
+ // Return some other error (hopefully EOF)
+ slices <- ReaderSlice{nil, err}
+ }
+ return
+ }
+
+ // End on error (includes EOF)
+ if err != nil {
+ slices <- ReaderSlice{nil, err}
+ return
+ }
+
+ if n > 0 {
+ // Make a slice with the contents of the read
+ slices <- ReaderSlice{ptr[:n], nil}
+
+ // Adjust the scratch space slice
+ ptr = ptr[n:]
+ }
+ }
+}
+
+// A read request to the Transfer() function
+type ReadRequest struct {
+ offset int
+ maxsize int
+ result chan<- ReadResult
+}
+
+// A read result from the Transfer() function
+type ReadResult struct {
+ slice []byte
+ err error
+}
+
+// Reads from the buffer managed by the Transfer()
+type BufferReader struct {
+ offset *int
+ requests chan<- ReadRequest
+ responses chan ReadResult
+}
+
+func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
+ return BufferReader{new(int), requests, make(chan ReadResult)}
+}
+
+// Reads from the buffer managed by the Transfer()
+func (this BufferReader) Read(p []byte) (n int, err error) {
+ this.requests <- ReadRequest{*this.offset, len(p), this.responses}
+ rr, valid := <-this.responses
+ if valid {
+ *this.offset += len(rr.slice)
+ return copy(p, rr.slice), rr.err
+ } else {
+ return 0, io.ErrUnexpectedEOF
+ }
+}
+
+func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+ // Record starting offset in order to correctly report the number of bytes sent
+ starting_offset := *this.offset
+ for {
+ this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
+ rr, valid := <-this.responses
+ if valid {
+ log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
+ *this.offset += len(rr.slice)
+ if rr.err != nil {
+ if rr.err == io.EOF {
+ // EOF is not an error.
+ return int64(*this.offset - starting_offset), nil
+ } else {
+ return int64(*this.offset - starting_offset), rr.err
+ }
+ } else {
+ dest.Write(rr.slice)
+ }
+ } else {
+ return int64(*this.offset), io.ErrUnexpectedEOF
+ }
+ }
+}
+
+// Close the responses channel
+func (this BufferReader) Close() error {
+ close(this.responses)
+ return nil
+}
+
+// Handle a read request. Returns true if a response was sent, and false if
+// the request should be queued.
+func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
+ log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
+ if req.offset < len(body) {
+ var end int
+ if req.offset+req.maxsize < len(body) {
+ end = req.offset + req.maxsize
+ } else {
+ end = len(body)
+ }
+ req.result <- ReadResult{body[req.offset:end], nil}
+ return true
+ } else if complete && req.offset >= len(body) {
+ req.result <- ReadResult{nil, io.EOF}
+ return true
+ } else {
+ return false
+ }
+}
+
+// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
+// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
+// Accepts read requests on the buffer on the 'requests' channel. Completes
+// when 'requests' channel is closed.
+func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
+ // currently buffered data
+ var body []byte
+
+ // for receiving slices from ReadIntoBuffer
+ var slices chan ReaderSlice = nil
+
+ // indicates whether the buffered data is complete
+ var complete bool = false
+
+ if source_reader != nil {
+ // 'body' is the buffer slice representing the body content read so far
+ body = source_buffer[:0]
+
+ // used to communicate slices of the buffer as they are
+ // ReadIntoBuffer will close 'slices' when it is done with it
+ slices = make(chan ReaderSlice)
+
+ // Spin it off
+ go ReadIntoBuffer(source_buffer, source_reader, slices)
+ } else {
+ // use the whole buffer
+ body = source_buffer[:]
+
+ // buffer is complete
+ complete = true
+ }
+
+ pending_requests := make([]ReadRequest, 0)
+
+ for {
+ select {
+ case req, valid := <-requests:
+ // Handle a buffer read request
+ if valid {
+ if !HandleReadRequest(req, body, complete) {
+ pending_requests = append(pending_requests, req)
+ }
+ } else {
+ // closed 'requests' channel indicates we're done
+ return
+ }
+
+ case bk, valid := <-slices:
+ // Got a new slice from the reader
+ if valid {
+ if bk.reader_error != nil {
+ reader_error <- bk.reader_error
+ if bk.reader_error == io.EOF {
+ // EOF indicates the reader is done
+ // sending, so our buffer is complete.
+ complete = true
+ } else {
+ // some other reader error
+ return
+ }
+ }
+
+ if bk.slice != nil {
+ // adjust body bounds now that another slice has been read
+ body = source_buffer[0 : len(body)+len(bk.slice)]
+ }
+
+ // handle pending reads
+ n := 0
+ for n < len(pending_requests) {
+ if HandleReadRequest(pending_requests[n], body, complete) {
+
+ // move the element from the
+ // back of the slice to
+ // position 'n', then shorten
+ // the slice by one element
+ pending_requests[n] = pending_requests[len(pending_requests)-1]
+ pending_requests = pending_requests[0 : len(pending_requests)-1]
+ } else {
+
+ // Request wasn't handled, so keep it in the request slice
+ n += 1
+ }
+ }
+ } else {
+ if complete {
+ // no more reads
+ slices = nil
+ } else {
+ // reader channel closed without signaling EOF
+ reader_error <- io.ErrUnexpectedEOF
+ return
+ }
+ }
+ }
+ }
+}
diff --git a/sdk/go/src/arvados.org/buffer/buffer_test.go b/sdk/go/src/arvados.org/buffer/buffer_test.go
new file mode 100644
index 0000000..28e1e66
--- /dev/null
+++ b/sdk/go/src/arvados.org/buffer/buffer_test.go
@@ -0,0 +1,364 @@
+package buffer
+
+import (
+ . "gopkg.in/check.v1"
+ "io"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) { TestingT(t) }
+
+var _ = Suite(&StandaloneSuite{})
+
+// Standalone tests
+type StandaloneSuite struct{}
+
+func ReadIntoBufferHelper(c *C, bufsize int) {
+ buffer := make([]byte, bufsize)
+
+ reader, writer := io.Pipe()
+ slices := make(chan ReaderSlice)
+
+ go ReadIntoBuffer(buffer, reader, slices)
+
+ {
+ out := make([]byte, 128)
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 128)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 128; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 96)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 96; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 96) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ writer.Close()
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.EOF)
+ }
+}
+
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+ ReadIntoBufferHelper(c, 512)
+ ReadIntoBufferHelper(c, 225)
+ ReadIntoBufferHelper(c, 224)
+}
+
+func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
+ buffer := make([]byte, 223)
+ reader, writer := io.Pipe()
+ slices := make(chan ReaderSlice)
+
+ go ReadIntoBuffer(buffer, reader, slices)
+
+ {
+ out := make([]byte, 128)
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 128)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 128; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
+ }
+
+ // Write will deadlock because it can't write all the data, so
+ // spin it off to a goroutine
+ go writer.Write(out)
+ s1 := <-slices
+
+ c.Check(len(s1.slice), Equals, 95)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 95; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 95) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ writer.Close()
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+ }
+
+}
+
+func (s *StandaloneSuite) TestTransfer(c *C) {
+ reader, writer := io.Pipe()
+
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 512)
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Reporting reader error states
+ reader_status := make(chan error)
+
+ go Transfer(buffer, reader, requests, reader_status)
+
+ br1 := MakeBufferReader(requests)
+ out := make([]byte, 128)
+
+ {
+ // Write some data, and read into a buffer shorter than
+ // available data
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+
+ writer.Write(out[:100])
+
+ in := make([]byte, 64)
+ n, err := br1.Read(in)
+
+ c.Check(n, Equals, 64)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 64; i += 1 {
+ c.Check(in[i], Equals, out[i])
+ }
+ }
+
+ {
+ // Write some more data, and read into buffer longer than
+ // available data
+ in := make([]byte, 64)
+ n, err := br1.Read(in)
+ c.Check(n, Equals, 36)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 36; i += 1 {
+ c.Check(in[i], Equals, out[64+i])
+ }
+
+ }
+
+ {
+ // Test read before write
+ type Rd struct {
+ n int
+ err error
+ }
+ rd := make(chan Rd)
+ in := make([]byte, 64)
+
+ go func() {
+ n, err := br1.Read(in)
+ rd <- Rd{n, err}
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+ writer.Write(out[100:])
+
+ got := <-rd
+
+ c.Check(got.n, Equals, 28)
+ c.Check(got.err, Equals, nil)
+
+ for i := 0; i < 28; i += 1 {
+ c.Check(in[i], Equals, out[100+i])
+ }
+ }
+
+ br2 := MakeBufferReader(requests)
+ {
+ // Test 'catch up' reader
+ in := make([]byte, 256)
+ n, err := br2.Read(in)
+
+ c.Check(n, Equals, 128)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 128; i += 1 {
+ c.Check(in[i], Equals, out[i])
+ }
+ }
+
+ {
+ // Test closing the reader
+ writer.Close()
+ status := <-reader_status
+ c.Check(status, Equals, io.EOF)
+
+ in := make([]byte, 256)
+ n1, err1 := br1.Read(in)
+ n2, err2 := br2.Read(in)
+ c.Check(n1, Equals, 0)
+ c.Check(err1, Equals, io.EOF)
+ c.Check(n2, Equals, 0)
+ c.Check(err2, Equals, io.EOF)
+ }
+
+ {
+ // Test 'catch up' reader after closing
+ br3 := MakeBufferReader(requests)
+ in := make([]byte, 256)
+ n, err := br3.Read(in)
+
+ c.Check(n, Equals, 128)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 128; i += 1 {
+ c.Check(in[i], Equals, out[i])
+ }
+
+ n, err = br3.Read(in)
+
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.EOF)
+ }
+}
+
+func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
+ reader, writer := io.Pipe()
+
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 100)
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Reporting reader error states
+ reader_status := make(chan error)
+
+ go Transfer(buffer, reader, requests, reader_status)
+
+ out := make([]byte, 101)
+ go writer.Write(out)
+
+ status := <-reader_status
+ c.Check(status, Equals, io.ErrShortBuffer)
+}
+
+func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 100)
+ for i := 0; i < 100; i += 1 {
+ buffer[i] = byte(i)
+ }
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ go Transfer(buffer, nil, requests, nil)
+
+ br1 := MakeBufferReader(requests)
+
+ in := make([]byte, 64)
+ {
+ n, err := br1.Read(in)
+
+ c.Check(n, Equals, 64)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 64; i += 1 {
+ c.Check(in[i], Equals, buffer[i])
+ }
+ }
+ {
+ n, err := br1.Read(in)
+
+ c.Check(n, Equals, 36)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 36; i += 1 {
+ c.Check(in[i], Equals, buffer[64+i])
+ }
+ }
+ {
+ n, err := br1.Read(in)
+
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.EOF)
+ }
+}
+
+func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 100)
+ for i := 0; i < 100; i += 1 {
+ buffer[i] = byte(i)
+ }
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ go Transfer(buffer, nil, requests, nil)
+
+ br1 := MakeBufferReader(requests)
+
+ reader, writer := io.Pipe()
+
+ go func() {
+ p := make([]byte, 100)
+ n, err := reader.Read(p)
+ c.Check(n, Equals, 100)
+ c.Check(err, Equals, nil)
+ c.Check(p, DeepEquals, buffer)
+ }()
+
+ io.Copy(writer, br1)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list