[ARVADOS] updated: 2f32f3483d18e9a89a8b5c13e022495c8681db04

git at public.curoverse.com git at public.curoverse.com
Tue May 20 22:23:24 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/streamer/streamer.go      | 27 +++++++++----
 sdk/go/src/arvados.org/streamer/streamer_test.go |  2 +
 sdk/go/src/arvados.org/streamer/transfer.go      | 50 +++++++++++++++++++++++-
 3 files changed, 69 insertions(+), 10 deletions(-)

       via  2f32f3483d18e9a89a8b5c13e022495c8681db04 (commit)
      from  3986815ae5e7e61c48f3ed979c32358710ef7e20 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2f32f3483d18e9a89a8b5c13e022495c8681db04
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue May 20 22:23:18 2014 -0400

    2798: Tracks opening and closing of readers, will block closing AsyncStream
    until all readers are closed.  Additionally, will block if too many readers are
    created.

diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index 0b8755f..ba49fb3 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -28,42 +28,48 @@ import (
 )
 
 type AsyncStream struct {
-	requests      chan readRequest
-	Reader_status chan error
+	requests          chan readRequest
+	add_reader        chan bool
+	subtract_reader   chan bool
+	wait_zero_readers chan bool
+	Reader_status     chan error
 }
 
 // Reads from the buffer managed by the Transfer()
 type StreamReader struct {
 	offset    int
-	requests  chan<- readRequest
+	stream    *AsyncStream
 	responses chan readResult
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
 	buf := make([]byte, buffersize)
 
-	t := &AsyncStream{make(chan readRequest), make(chan error)}
+	t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
 
 	go transfer(buf, source, t.requests, t.Reader_status)
+	go t.readersMonitor()
 
 	return t
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-	t := &AsyncStream{make(chan readRequest), nil}
+	t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
 
 	go transfer(buf, nil, t.requests, nil)
+	go t.readersMonitor()
 
 	return t
 }
 
 func (this *AsyncStream) MakeStreamReader() *StreamReader {
-	return &StreamReader{0, this.requests, make(chan readResult)}
+	this.add_reader <- true
+	return &StreamReader{0, this, make(chan readResult)}
 }
 
 // Reads from the buffer managed by the Transfer()
 func (this *StreamReader) Read(p []byte) (n int, err error) {
-	this.requests <- readRequest{this.offset, len(p), this.responses}
+	this.stream.requests <- readRequest{this.offset, len(p), this.responses}
 	rr, valid := <-this.responses
 	if valid {
 		this.offset += len(rr.slice)
@@ -77,7 +83,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 	// Record starting offset in order to correctly report the number of bytes sent
 	starting_offset := this.offset
 	for {
-		this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+		this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
 		rr, valid := <-this.responses
 		if valid {
 			this.offset += len(rr.slice)
@@ -99,12 +105,17 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 
 // Close the responses channel
 func (this *StreamReader) Close() error {
+	this.stream.subtract_reader <- true
 	close(this.responses)
 	return nil
 }
 
 func (this *AsyncStream) Close() {
+	this.wait_zero_readers <- true
 	close(this.requests)
+	close(this.add_reader)
+	close(this.subtract_reader)
+	close(this.wait_zero_readers)
 	if this.Reader_status != nil {
 		close(this.Reader_status)
 	}
diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go
index 9e24cfb..33f84b8 100644
--- a/sdk/go/src/arvados.org/streamer/streamer_test.go
+++ b/sdk/go/src/arvados.org/streamer/streamer_test.go
@@ -329,8 +329,10 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
 	}
 
 	tr := AsyncStreamFromSlice(buffer)
+	defer tr.Close()
 
 	br1 := tr.MakeStreamReader()
+	defer br1.Close()
 
 	reader, writer := io.Pipe()
 
diff --git a/sdk/go/src/arvados.org/streamer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go
index ab8f941..77242f1 100644
--- a/sdk/go/src/arvados.org/streamer/transfer.go
+++ b/sdk/go/src/arvados.org/streamer/transfer.go
@@ -2,9 +2,10 @@ package streamer
 
 import (
 	"io"
-	"log"
 )
 
+const MAX_READERS = 100
+
 // A slice passed from readIntoBuffer() to transfer()
 type readerSlice struct {
 	slice        []byte
@@ -102,7 +103,6 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
 func handleReadRequest(req readRequest, body []byte, complete bool) bool {
-	log.Printf("HandlereadRequest %d %d %d", req.offset, req.maxsize, len(body))
 	if req.offset < len(body) {
 		var end int
 		if req.offset+req.maxsize < len(body) {
@@ -218,3 +218,49 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 		}
 	}
 }
+
+func (this *AsyncStream) readersMonitor() {
+	var readers int = 0
+
+	for {
+		if readers == 0 {
+			select {
+			case _, ok := <-this.wait_zero_readers:
+				if ok {
+					// nothing, just implicitly unblock the sender
+				} else {
+					return
+				}
+			case _, ok := <-this.add_reader:
+				if ok {
+					readers += 1
+				} else {
+					return
+				}
+			}
+		} else if readers > 0 && readers < MAX_READERS {
+			select {
+			case _, ok := <-this.add_reader:
+				if ok {
+					readers += 1
+				} else {
+					return
+				}
+
+			case _, ok := <-this.subtract_reader:
+				if ok {
+					readers -= 1
+				} else {
+					return
+				}
+			}
+		} else if readers == MAX_READERS {
+			_, ok := <-this.subtract_reader
+			if ok {
+				readers -= 1
+			} else {
+				return
+			}
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list