[ARVADOS] updated: c6a6693dc36615effca5e3363b81199362007c59
git at public.curoverse.com
git at public.curoverse.com
Wed May 21 11:24:08 EDT 2014
Summary of changes:
sdk/go/src/arvados.org/keepclient/keepclient.go | 6 +-
.../src/arvados.org/keepclient/keepclient_test.go | 29 ++-
sdk/go/src/arvados.org/keepclient/support.go | 36 ++-
sdk/go/src/arvados.org/streamer/streamer.go | 58 ++---
sdk/go/src/arvados.org/streamer/streamer_test.go | 250 +++++++++++----------
sdk/go/src/arvados.org/streamer/transfer.go | 138 ++++++++----
6 files changed, 298 insertions(+), 219 deletions(-)
via c6a6693dc36615effca5e3363b81199362007c59 (commit)
via 38cef39fbdfeb8176e4c755d12e43a450e868439 (commit)
from 2f32f3483d18e9a89a8b5c13e022495c8681db04 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c6a6693dc36615effca5e3363b81199362007c59
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed May 21 11:24:04 2014 -0400
2798: Updated keep client with buffer/streamer changes.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index dcf1f33..dc3ceed 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -2,7 +2,7 @@
package keepclient
import (
- "arvados.org/buffer"
+ "arvados.org/streamer"
"crypto/md5"
"crypto/tls"
"errors"
@@ -69,7 +69,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
bufsize = BLOCKSIZE
}
- t := buffer.StartTransferFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
+ t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
defer t.Close()
return this.putReplicas(hash, t, expectedLength)
@@ -80,7 +80,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
// replicas that were written and if there was an error. Note this will return
// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
- t := buffer.StartTransferFromSlice(buf)
+ t := streamer.AsyncStreamFromSlice(buf)
defer t.Close()
return this.putReplicas(hash, t, int64(len(buf)))
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 348b913..1d3bbee 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -1,7 +1,7 @@
package keepclient
import (
- "arvados.org/buffer"
+ "arvados.org/streamer"
"crypto/md5"
"flag"
"fmt"
@@ -141,6 +141,8 @@ func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
}
func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+ log.Printf("TestUploadToStubKeepServer")
+
st := StubPutHandler{
c,
"acbd18db4cc2f85cedef654fccc4a4d8",
@@ -161,9 +163,13 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
status := <-upload_status
c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
})
+
+ log.Printf("TestUploadToStubKeepServer done")
}
func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+ log.Printf("TestUploadToStubKeepServerBufferReader")
+
st := StubPutHandler{
c,
"acbd18db4cc2f85cedef654fccc4a4d8",
@@ -175,22 +181,23 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
func(kc KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
- tr := buffer.StartTransferFromReader(512, reader)
+ tr := streamer.AsyncStreamFromReader(512, reader)
defer tr.Close()
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
writer.Write([]byte("foo"))
writer.Close()
- <-tr.Reader_status
<-st.handled
status := <-upload_status
c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
})
+
+ log.Printf("TestUploadToStubKeepServerBufferReader done")
}
type FailHandler struct {
@@ -203,6 +210,8 @@ func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}
func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
+ log.Printf("TestFailedUploadToStubKeepServer")
+
st := FailHandler{
make(chan string)}
@@ -223,7 +232,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
c.Check(status.StatusCode, Equals, 500)
})
-
+ log.Printf("TestFailedUploadToStubKeepServer done")
}
type KeepServer struct {
@@ -279,6 +288,8 @@ func (s *StandaloneSuite) TestPutB(c *C) {
(s1 == shuff[1] && s2 == shuff[0]),
Equals,
true)
+
+ log.Printf("TestPutB done")
}
func (s *StandaloneSuite) TestPutHR(c *C) {
@@ -327,6 +338,8 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
(s1 == shuff[1] && s2 == shuff[0]),
Equals,
true)
+
+ log.Printf("TestPutHR done")
}
func (s *StandaloneSuite) TestPutWithFail(c *C) {
@@ -419,6 +432,8 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
c.Check(err, Equals, InsufficientReplicasError)
c.Check(replicas, Equals, 1)
c.Check(<-st.handled, Equals, shuff[1])
+
+ log.Printf("TestPutWithTooManyFail done")
}
type StubGetHandler struct {
@@ -436,6 +451,7 @@ func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request
}
func (s *StandaloneSuite) TestGet(c *C) {
+ log.Printf("TestGet")
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
@@ -453,6 +469,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
kc.Service_roots = []string{url}
r, n, url2, err := kc.Get(hash)
+ defer r.Close()
c.Check(err, Equals, nil)
c.Check(n, Equals, int64(3))
c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
@@ -460,6 +477,8 @@ func (s *StandaloneSuite) TestGet(c *C) {
content, err2 := ioutil.ReadAll(r)
c.Check(err2, Equals, nil)
c.Check(content, DeepEquals, []byte("foo"))
+
+ log.Printf("TestGet done")
}
func (s *StandaloneSuite) TestGetFail(c *C) {
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index 7ea8248..e657a60 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -2,7 +2,7 @@
package keepclient
import (
- "arvados.org/buffer"
+ "arvados.org/streamer"
"encoding/json"
"errors"
"fmt"
@@ -137,6 +137,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
upload_status <- uploadStatus{err, url, 0}
+ body.Close()
return
}
@@ -163,7 +164,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
func (this KeepClient) putReplicas(
hash string,
- tr buffer.TransferBuffer,
+ tr *streamer.AsyncStream,
expectedLength int64) (replicas int, err error) {
// Calculate the ordering for uploading to servers
@@ -186,7 +187,7 @@ func (this KeepClient) putReplicas(
for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeBufferReader(), upload_status, expectedLength)
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
next_server += 1
active += 1
} else {
@@ -195,26 +196,17 @@ func (this KeepClient) putReplicas(
}
// Now wait for something to happen.
- select {
- case status := <-tr.Reader_status:
- if status == io.EOF {
- // good news!
- } else {
- // bad news
- return (this.Want_replicas - remaining_replicas), status
- }
- case status := <-upload_status:
- if status.StatusCode == 200 {
- // good news!
- remaining_replicas -= 1
- } else {
- // writing to keep server failed for some reason
- log.Printf("Keep server put to %v failed with '%v'",
- status.Url, status.Err)
- }
- active -= 1
- log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
+ status := <-upload_status
+ if status.StatusCode == 200 {
+ // good news!
+ remaining_replicas -= 1
+ } else {
+ // writing to keep server failed for some reason
+ log.Printf("Keep server put to %v failed with '%v'",
+ status.Url, status.Err)
}
+ active -= 1
+ log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
}
return (this.Want_replicas - remaining_replicas), nil
diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index 78ab027..2217dd3 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -117,6 +117,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
func (this *StreamReader) Close() error {
this.stream.subtract_reader <- true
close(this.responses)
+ this.stream = nil
return nil
}
commit 38cef39fbdfeb8176e4c755d12e43a450e868439
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed May 21 11:00:21 2014 -0400
2798: Renamed internal messaging structs in an attempt to use the word "reader"
slightly less. Refactored tests to reduce redundancy slightly. Added test with large number of concurrent readers. Rewrote "how to use" package comments and wrote a small novel about the "theory of operation".
diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index ba49fb3..78ab027 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -1,23 +1,35 @@
-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
Usage:
Begin reading into a buffer with maximum size 'buffersize' from 'source':
- tr := StartTransferFromReader(buffersize, source)
+ stream := AsyncStreamFromReader(buffersize, source)
-To create a new reader (this can be called multiple times):
- r := tr.MakeStreamReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+ reader := tr.MakeStreamReader()
-When you're done with the buffer:
- tr.Close()
+Make sure to close the reader when you're done with it.
+ reader.Close()
+When you're done with the stream:
+ stream.Close()
Alternately, if you already have a filled buffer and just want to read out from it:
- tr := StartTransferFromSlice(buf)
+ stream := AsyncStreamFromSlice(buf)
+
r := tr.MakeStreamReader()
- tr.Close()
*/
@@ -28,35 +40,33 @@ import (
)
type AsyncStream struct {
- requests chan readRequest
+ buffer []byte
+ requests chan sliceRequest
add_reader chan bool
subtract_reader chan bool
wait_zero_readers chan bool
- Reader_status chan error
}
// Reads from the buffer managed by the Transfer()
type StreamReader struct {
offset int
stream *AsyncStream
- responses chan readResult
+ responses chan sliceResult
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- buf := make([]byte, buffersize)
-
- t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
+ t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- go transfer(buf, source, t.requests, t.Reader_status)
+ go t.transfer(source)
go t.readersMonitor()
return t
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
+ t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
- go transfer(buf, nil, t.requests, nil)
+ go t.transfer(nil)
go t.readersMonitor()
return t
@@ -64,12 +74,12 @@ func AsyncStreamFromSlice(buf []byte) *AsyncStream {
func (this *AsyncStream) MakeStreamReader() *StreamReader {
this.add_reader <- true
- return &StreamReader{0, this, make(chan readResult)}
+ return &StreamReader{0, this, make(chan sliceResult)}
}
// Reads from the buffer managed by the Transfer()
func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- readRequest{this.offset, len(p), this.responses}
+ this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
@@ -83,7 +93,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
// Record starting offset in order to correctly report the number of bytes sent
starting_offset := this.offset
for {
- this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+ this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
@@ -116,7 +126,4 @@ func (this *AsyncStream) Close() {
close(this.add_reader)
close(this.subtract_reader)
close(this.wait_zero_readers)
- if this.Reader_status != nil {
- close(this.Reader_status)
- }
}
diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go
index 33f84b8..853d7d3 100644
--- a/sdk/go/src/arvados.org/streamer/streamer_test.go
+++ b/sdk/go/src/arvados.org/streamer/streamer_test.go
@@ -15,130 +15,110 @@ var _ = Suite(&StandaloneSuite{})
// Standalone tests
type StandaloneSuite struct{}
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+ ReadIntoBufferHelper(c, 225)
+ ReadIntoBufferHelper(c, 224)
+}
+
+func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+ out := make([]byte, 128)
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 128)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 128; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+}
+
+func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 96)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 96; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 96) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+}
+
func ReadIntoBufferHelper(c *C, bufsize int) {
buffer := make([]byte, bufsize)
reader, writer := io.Pipe()
- slices := make(chan readerSlice)
+ slices := make(chan nextSlice)
go readIntoBuffer(buffer, reader, slices)
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
- }
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
- }
-}
+ HelperWrite128andCheck(c, buffer, writer, slices)
+ HelperWrite96andCheck(c, buffer, writer, slices)
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 512)
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
+ writer.Close()
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.EOF)
}
func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
buffer := make([]byte, 223)
reader, writer := io.Pipe()
- slices := make(chan readerSlice)
+ slices := make(chan nextSlice)
go readIntoBuffer(buffer, reader, slices)
- {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
+ HelperWrite128andCheck(c, buffer, writer, slices)
+
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
}
- {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
+ // Write will deadlock because it can't write all the data, so
+ // spin it off to a goroutine
+ go writer.Write(out)
+ s1 := <-slices
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
+ c.Check(len(s1.slice), Equals, 95)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 95; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
}
- {
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 95) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
}
+ writer.Close()
+ s1 = <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
}
func (s *StandaloneSuite) TestTransfer(c *C) {
@@ -227,8 +207,6 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
{
// Test closing the reader
writer.Close()
- status := <-tr.Reader_status
- c.Check(status, Equals, io.EOF)
in := make([]byte, 256)
n1, err1 := br1.Read(in)
@@ -262,23 +240,21 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
reader, writer := io.Pipe()
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
-
- // Read requests on Transfer() buffer
- requests := make(chan readRequest)
- defer close(requests)
-
- // Reporting reader error states
- reader_status := make(chan error)
+ tr := AsyncStreamFromReader(100, reader)
+ defer tr.Close()
- go transfer(buffer, reader, requests, reader_status)
+ sr := tr.MakeStreamReader()
+ defer sr.Close()
out := make([]byte, 101)
go writer.Write(out)
- status := <-reader_status
- c.Check(status, Equals, io.ErrShortBuffer)
+ n, err := sr.Read(out)
+ c.Check(n, Equals, 100)
+
+ n, err = sr.Read(out)
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.ErrShortBuffer)
}
func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
@@ -346,3 +322,45 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
io.Copy(writer, br1)
}
+
+func (s *StandaloneSuite) TestManyReaders(c *C) {
+ reader, writer := io.Pipe()
+
+ tr := AsyncStreamFromReader(512, reader)
+ defer tr.Close()
+
+ sr := tr.MakeStreamReader()
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ sr.Close()
+ }()
+
+ for i := 0; i < 200; i += 1 {
+ go func() {
+ br1 := tr.MakeStreamReader()
+ defer br1.Close()
+
+ p := make([]byte, 3)
+ n, err := br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("foo"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("bar"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 3)
+ c.Check(p[0:3], DeepEquals, []byte("baz"))
+
+ n, err = br1.Read(p)
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.EOF)
+ }()
+ }
+
+ writer.Write([]byte("foo"))
+ writer.Write([]byte("bar"))
+ writer.Write([]byte("baz"))
+ writer.Close()
+}
diff --git a/sdk/go/src/arvados.org/streamer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go
index 77242f1..a4a194f 100644
--- a/sdk/go/src/arvados.org/streamer/transfer.go
+++ b/sdk/go/src/arvados.org/streamer/transfer.go
@@ -1,3 +1,53 @@
+/* Internal implementation of AsyncStream.
+Outline of operation:
+
+The kernel is the transfer() goroutine. It manages concurrent reads and
+appends to the "body" slice. "body" is a slice of "source_buffer" that
+represents the segment of the buffer that is already filled in and available
+for reading.
+
+To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
+from the io.Reader source directly into source_buffer. Each read goes into a
+slice of buffer which spans the section immediately following the end of the
+current "body". Each time a Read completes, a slice representing the the
+section just filled in (or any read errors/EOF) is sent over the "slices"
+channel back to the transfer() function.
+
+Meanwhile, the transfer() function selects() on two channels, the "requests"
+channel and the "slices" channel.
+
+When a message is recieved on the "slices" channel, this means the a new
+section of the buffer has data, or an error is signaled. Since the data has
+been read directly into the source_buffer, it is able to simply increases the
+size of the body slice to encompass the newly filled in section. Then any
+pending reads are serviced with handleReadRequest (described below).
+
+When a message is recieved on the "requests" channel, it means a StreamReader
+wants access to a slice of the buffer. This is passed to handleReadRequest().
+
+The handleReadRequest() function takes a sliceRequest consisting of a buffer
+offset, maximum size, and channel to send the response. If there was an error
+reported from the source reader, it is returned. If the offset is less than
+the size of the body, the request can proceed, and it sends a body slice
+spanning the segment from offset to min(offset+maxsize, end of the body). If
+source reader status is EOF (done filling the buffer) and the read request
+offset is beyond end of the body, it responds with EOF. Otherwise, the read
+request is for a slice beyond the current size of "body" but we expect the body
+to expand as more data is added, so the request gets added to a wait list.
+
+The transfer() runs until the requests channel is closed by AsyncStream.Close()
+
+To track readers, streamer uses the readersMonitor() goroutine. This goroutine
+chooses which channels to receive from based on the number of outstanding
+readers. When a new reader is created, it sends a message on the add_reader
+channel. If the number of readers is already at MAX_READERS, this blocks the
+sender until an existing reader is closed. When a reader is closed, it sends a
+message on the subtract_reader channel. Finally, when AsyncStream.Close() is
+called, it sends a message on the wait_zero_readers channel, which will block
+the sender unless there are zero readers and it is safe to shut down the
+AsyncStream.
+*/
+
package streamer
import (
@@ -7,20 +57,20 @@ import (
const MAX_READERS = 100
// A slice passed from readIntoBuffer() to transfer()
-type readerSlice struct {
+type nextSlice struct {
slice []byte
reader_error error
}
// A read request to the Transfer() function
-type readRequest struct {
+type sliceRequest struct {
offset int
maxsize int
- result chan<- readResult
+ result chan<- sliceResult
}
// A read result from the Transfer() function
-type readResult struct {
+type sliceResult struct {
slice []byte
err error
}
@@ -41,16 +91,16 @@ func (this *bufferWriter) Write(p []byte) (n int, err error) {
// Read repeatedly from the reader and write sequentially into the specified
// buffer, and report each read to channel 'c'. Completes when Reader 'r'
// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
+func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
defer close(slices)
if writeto, ok := r.(io.WriterTo); ok {
n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
if err != nil {
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
} else {
- slices <- readerSlice{buffer[:n], nil}
- slices <- readerSlice{nil, io.EOF}
+ slices <- nextSlice{buffer[:n], nil}
+ slices <- nextSlice{nil, io.EOF}
}
return
} else {
@@ -75,23 +125,23 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
if n > 0 {
// Reader has more data but we have nowhere to
// put it, so we're stuffed
- slices <- readerSlice{nil, io.ErrShortBuffer}
+ slices <- nextSlice{nil, io.ErrShortBuffer}
} else {
// Return some other error (hopefully EOF)
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
}
return
}
// End on error (includes EOF)
if err != nil {
- slices <- readerSlice{nil, err}
+ slices <- nextSlice{nil, err}
return
}
if n > 0 {
// Make a slice with the contents of the read
- slices <- readerSlice{ptr[:n], nil}
+ slices <- nextSlice{ptr[:n], nil}
// Adjust the scratch space slice
ptr = ptr[n:]
@@ -102,18 +152,21 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
// Handle a read request. Returns true if a response was sent, and false if
// the request should be queued.
-func handleReadRequest(req readRequest, body []byte, complete bool) bool {
- if req.offset < len(body) {
+func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
+ if (reader_status != nil) && (reader_status != io.EOF) {
+ req.result <- sliceResult{nil, reader_status}
+ return true
+ } else if req.offset < len(body) {
var end int
if req.offset+req.maxsize < len(body) {
end = req.offset + req.maxsize
} else {
end = len(body)
}
- req.result <- readResult{body[req.offset:end], nil}
+ req.result <- sliceResult{body[req.offset:end], nil}
return true
- } else if complete && req.offset >= len(body) {
- req.result <- readResult{nil, io.EOF}
+ } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
+ req.result <- sliceResult{nil, io.EOF}
return true
} else {
return false
@@ -125,15 +178,18 @@ func handleReadRequest(req readRequest, body []byte, complete bool) bool {
// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
// Accepts read requests on the buffer on the 'requests' channel. Completes
// when 'requests' channel is closed.
-func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
+func (this *AsyncStream) transfer(source_reader io.Reader) {
+ source_buffer := this.buffer
+ requests := this.requests
+
// currently buffered data
var body []byte
// for receiving slices from readIntoBuffer
- var slices chan readerSlice = nil
+ var slices chan nextSlice = nil
- // indicates whether the buffered data is complete
- var complete bool = false
+ // indicates the status of the underlying reader
+ var reader_status error = nil
if source_reader != nil {
// 'body' is the buffer slice representing the body content read so far
@@ -141,7 +197,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
// used to communicate slices of the buffer as they are
// readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan readerSlice)
+ slices = make(chan nextSlice)
// Spin it off
go readIntoBuffer(source_buffer, source_reader, slices)
@@ -150,17 +206,17 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
body = source_buffer[:]
// buffer is complete
- complete = true
+ reader_status = io.EOF
}
- pending_requests := make([]readRequest, 0)
+ pending_requests := make([]sliceRequest, 0)
for {
select {
case req, valid := <-requests:
// Handle a buffer read request
if valid {
- if !handleReadRequest(req, body, complete) {
+ if !handleReadRequest(req, body, reader_status) {
pending_requests = append(pending_requests, req)
}
} else {
@@ -171,17 +227,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
case bk, valid := <-slices:
// Got a new slice from the reader
if valid {
- if bk.reader_error != nil {
- reader_error <- bk.reader_error
- if bk.reader_error == io.EOF {
- // EOF indicates the reader is done
- // sending, so our buffer is complete.
- complete = true
- } else {
- // some other reader error
- return
- }
- }
+ reader_status = bk.reader_error
if bk.slice != nil {
// adjust body bounds now that another slice has been read
@@ -191,12 +237,9 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
// handle pending reads
n := 0
for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, complete) {
-
- // move the element from the
- // back of the slice to
- // position 'n', then shorten
- // the slice by one element
+ if handleReadRequest(pending_requests[n], body, reader_status) {
+ // move the element from the back of the slice to
+ // position 'n', then shorten the slice by one element
pending_requests[n] = pending_requests[len(pending_requests)-1]
pending_requests = pending_requests[0 : len(pending_requests)-1]
} else {
@@ -206,14 +249,13 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
}
}
} else {
- if complete {
- // no more reads
- slices = nil
+ if reader_status == io.EOF {
+ // no more reads expected, so this is ok
} else {
- // reader channel closed without signaling EOF
- reader_error <- io.ErrUnexpectedEOF
- return
+ // slices channel closed without signaling EOF
+ reader_status = io.ErrUnexpectedEOF
}
+ slices = nil
}
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list