[ARVADOS] updated: 2f32f3483d18e9a89a8b5c13e022495c8681db04
git at public.curoverse.com
git at public.curoverse.com
Tue May 20 22:23:24 EDT 2014
Summary of changes:
sdk/go/src/arvados.org/streamer/streamer.go | 27 +++++++++----
sdk/go/src/arvados.org/streamer/streamer_test.go | 2 +
sdk/go/src/arvados.org/streamer/transfer.go | 50 +++++++++++++++++++++++-
3 files changed, 69 insertions(+), 10 deletions(-)
via 2f32f3483d18e9a89a8b5c13e022495c8681db04 (commit)
from 3986815ae5e7e61c48f3ed979c32358710ef7e20 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 2f32f3483d18e9a89a8b5c13e022495c8681db04
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 20 22:23:18 2014 -0400
2798: Tracks opening and closing of readers, will block closing AsyncStream
until all readers are closed. Additionally, will block if too many readers are
created.
diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index 0b8755f..ba49fb3 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -28,42 +28,48 @@ import (
)
type AsyncStream struct {
- requests chan readRequest
- Reader_status chan error
+ requests chan readRequest
+ add_reader chan bool
+ subtract_reader chan bool
+ wait_zero_readers chan bool
+ Reader_status chan error
}
// Reads from the buffer managed by the Transfer()
type StreamReader struct {
offset int
- requests chan<- readRequest
+ stream *AsyncStream
responses chan readResult
}
func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
buf := make([]byte, buffersize)
- t := &AsyncStream{make(chan readRequest), make(chan error)}
+ t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
go transfer(buf, source, t.requests, t.Reader_status)
+ go t.readersMonitor()
return t
}
func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{make(chan readRequest), nil}
+ t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
go transfer(buf, nil, t.requests, nil)
+ go t.readersMonitor()
return t
}
func (this *AsyncStream) MakeStreamReader() *StreamReader {
- return &StreamReader{0, this.requests, make(chan readResult)}
+ this.add_reader <- true
+ return &StreamReader{0, this, make(chan readResult)}
}
// Reads from the buffer managed by the Transfer()
func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.requests <- readRequest{this.offset, len(p), this.responses}
+ this.stream.requests <- readRequest{this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
@@ -77,7 +83,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
// Record starting offset in order to correctly report the number of bytes sent
starting_offset := this.offset
for {
- this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+ this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
rr, valid := <-this.responses
if valid {
this.offset += len(rr.slice)
@@ -99,12 +105,17 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
// Close the responses channel
func (this *StreamReader) Close() error {
+ this.stream.subtract_reader <- true
close(this.responses)
return nil
}
func (this *AsyncStream) Close() {
+ this.wait_zero_readers <- true
close(this.requests)
+ close(this.add_reader)
+ close(this.subtract_reader)
+ close(this.wait_zero_readers)
if this.Reader_status != nil {
close(this.Reader_status)
}
diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go
index 9e24cfb..33f84b8 100644
--- a/sdk/go/src/arvados.org/streamer/streamer_test.go
+++ b/sdk/go/src/arvados.org/streamer/streamer_test.go
@@ -329,8 +329,10 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
}
tr := AsyncStreamFromSlice(buffer)
+ defer tr.Close()
br1 := tr.MakeStreamReader()
+ defer br1.Close()
reader, writer := io.Pipe()
diff --git a/sdk/go/src/arvados.org/streamer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go
index ab8f941..77242f1 100644
--- a/sdk/go/src/arvados.org/streamer/transfer.go
+++ b/sdk/go/src/arvados.org/streamer/transfer.go
@@ -2,9 +2,10 @@ package streamer
import (
"io"
- "log"
)
+const MAX_READERS = 100
+
// A slice passed from readIntoBuffer() to transfer()
type readerSlice struct {
slice []byte
@@ -102,7 +103,6 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
// Handle a read request. Returns true if a response was sent, and false if
// the request should be queued.
func handleReadRequest(req readRequest, body []byte, complete bool) bool {
- log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body))
if req.offset < len(body) {
var end int
if req.offset+req.maxsize < len(body) {
@@ -218,3 +218,49 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
}
}
}
+
+func (this *AsyncStream) readersMonitor() {
+ var readers int = 0
+
+ for {
+ if readers == 0 {
+ select {
+ case _, ok := <-this.wait_zero_readers:
+ if ok {
+ // nothing, just implicitly unblock the sender
+ } else {
+ return
+ }
+ case _, ok := <-this.add_reader:
+ if ok {
+ readers += 1
+ } else {
+ return
+ }
+ }
+ } else if readers > 0 && readers < MAX_READERS {
+ select {
+ case _, ok := <-this.add_reader:
+ if ok {
+ readers += 1
+ } else {
+ return
+ }
+
+ case _, ok := <-this.subtract_reader:
+ if ok {
+ readers -= 1
+ } else {
+ return
+ }
+ }
+ } else if readers == MAX_READERS {
+ _, ok := <-this.subtract_reader
+ if ok {
+ readers -= 1
+ } else {
+ return
+ }
+ }
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list