[ARVADOS] updated: c6a6693dc36615effca5e3363b81199362007c59

git at public.curoverse.com git at public.curoverse.com
Wed May 21 11:24:08 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/keepclient/keepclient.go    |   6 +-
 .../src/arvados.org/keepclient/keepclient_test.go  |  29 ++-
 sdk/go/src/arvados.org/keepclient/support.go       |  36 ++-
 sdk/go/src/arvados.org/streamer/streamer.go        |  58 ++---
 sdk/go/src/arvados.org/streamer/streamer_test.go   | 250 +++++++++++----------
 sdk/go/src/arvados.org/streamer/transfer.go        | 138 ++++++++----
 6 files changed, 298 insertions(+), 219 deletions(-)

       via  c6a6693dc36615effca5e3363b81199362007c59 (commit)
       via  38cef39fbdfeb8176e4c755d12e43a450e868439 (commit)
      from  2f32f3483d18e9a89a8b5c13e022495c8681db04 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c6a6693dc36615effca5e3363b81199362007c59
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed May 21 11:24:04 2014 -0400

    2798: Updated keep client with buffer/streamer changes.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index dcf1f33..dc3ceed 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -2,7 +2,7 @@
 package keepclient
 
 import (
-	"arvados.org/buffer"
+	"arvados.org/streamer"
 	"crypto/md5"
 	"crypto/tls"
 	"errors"
@@ -69,7 +69,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
 		bufsize = BLOCKSIZE
 	}
 
-	t := buffer.StartTransferFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
+	t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
 	defer t.Close()
 
 	return this.putReplicas(hash, t, expectedLength)
@@ -80,7 +80,7 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
 // replicas that were written and if there was an error.  Note this will return
 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
 func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
-	t := buffer.StartTransferFromSlice(buf)
+	t := streamer.AsyncStreamFromSlice(buf)
 	defer t.Close()
 
 	return this.putReplicas(hash, t, int64(len(buf)))
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 348b913..1d3bbee 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -1,7 +1,7 @@
 package keepclient
 
 import (
-	"arvados.org/buffer"
+	"arvados.org/streamer"
 	"crypto/md5"
 	"flag"
 	"fmt"
@@ -141,6 +141,8 @@ func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+	log.Printf("TestUploadToStubKeepServer")
+
 	st := StubPutHandler{
 		c,
 		"acbd18db4cc2f85cedef654fccc4a4d8",
@@ -161,9 +163,13 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 			status := <-upload_status
 			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
 		})
+
+	log.Printf("TestUploadToStubKeepServer done")
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+	log.Printf("TestUploadToStubKeepServerBufferReader")
+
 	st := StubPutHandler{
 		c,
 		"acbd18db4cc2f85cedef654fccc4a4d8",
@@ -175,22 +181,23 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 		func(kc KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan uploadStatus) {
 
-			tr := buffer.StartTransferFromReader(512, reader)
+			tr := streamer.AsyncStreamFromReader(512, reader)
 			defer tr.Close()
 
-			br1 := tr.MakeBufferReader()
+			br1 := tr.MakeStreamReader()
 
 			go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
 
 			writer.Write([]byte("foo"))
 			writer.Close()
 
-			<-tr.Reader_status
 			<-st.handled
 
 			status := <-upload_status
 			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
 		})
+
+	log.Printf("TestUploadToStubKeepServerBufferReader done")
 }
 
 type FailHandler struct {
@@ -203,6 +210,8 @@ func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
+	log.Printf("TestFailedUploadToStubKeepServer")
+
 	st := FailHandler{
 		make(chan string)}
 
@@ -223,7 +232,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 			c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
 			c.Check(status.StatusCode, Equals, 500)
 		})
-
+	log.Printf("TestFailedUploadToStubKeepServer done")
 }
 
 type KeepServer struct {
@@ -279,6 +288,8 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 		(s1 == shuff[1] && s2 == shuff[0]),
 		Equals,
 		true)
+
+	log.Printf("TestPutB done")
 }
 
 func (s *StandaloneSuite) TestPutHR(c *C) {
@@ -327,6 +338,8 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 		(s1 == shuff[1] && s2 == shuff[0]),
 		Equals,
 		true)
+
+	log.Printf("TestPutHR done")
 }
 
 func (s *StandaloneSuite) TestPutWithFail(c *C) {
@@ -419,6 +432,8 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 	c.Check(err, Equals, InsufficientReplicasError)
 	c.Check(replicas, Equals, 1)
 	c.Check(<-st.handled, Equals, shuff[1])
+
+	log.Printf("TestPutWithTooManyFail done")
 }
 
 type StubGetHandler struct {
@@ -436,6 +451,7 @@ func (this StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request
 }
 
 func (s *StandaloneSuite) TestGet(c *C) {
+	log.Printf("TestGet")
 
 	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
@@ -453,6 +469,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 	kc.Service_roots = []string{url}
 
 	r, n, url2, err := kc.Get(hash)
+	defer r.Close()
 	c.Check(err, Equals, nil)
 	c.Check(n, Equals, int64(3))
 	c.Check(url2, Equals, fmt.Sprintf("%s/%s", url, hash))
@@ -460,6 +477,8 @@ func (s *StandaloneSuite) TestGet(c *C) {
 	content, err2 := ioutil.ReadAll(r)
 	c.Check(err2, Equals, nil)
 	c.Check(content, DeepEquals, []byte("foo"))
+
+	log.Printf("TestGet done")
 }
 
 func (s *StandaloneSuite) TestGetFail(c *C) {
diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go
index 7ea8248..e657a60 100644
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@ -2,7 +2,7 @@
 package keepclient
 
 import (
-	"arvados.org/buffer"
+	"arvados.org/streamer"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -137,6 +137,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	var url = fmt.Sprintf("%s/%s", host, hash)
 	if req, err = http.NewRequest("PUT", url, nil); err != nil {
 		upload_status <- uploadStatus{err, url, 0}
+		body.Close()
 		return
 	}
 
@@ -163,7 +164,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
 func (this KeepClient) putReplicas(
 	hash string,
-	tr buffer.TransferBuffer,
+	tr *streamer.AsyncStream,
 	expectedLength int64) (replicas int, err error) {
 
 	// Calculate the ordering for uploading to servers
@@ -186,7 +187,7 @@ func (this KeepClient) putReplicas(
 		for active < remaining_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
-				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeBufferReader(), upload_status, expectedLength)
+				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
 				next_server += 1
 				active += 1
 			} else {
@@ -195,26 +196,17 @@ func (this KeepClient) putReplicas(
 		}
 
 		// Now wait for something to happen.
-		select {
-		case status := <-tr.Reader_status:
-			if status == io.EOF {
-				// good news!
-			} else {
-				// bad news
-				return (this.Want_replicas - remaining_replicas), status
-			}
-		case status := <-upload_status:
-			if status.StatusCode == 200 {
-				// good news!
-				remaining_replicas -= 1
-			} else {
-				// writing to keep server failed for some reason
-				log.Printf("Keep server put to %v failed with '%v'",
-					status.Url, status.Err)
-			}
-			active -= 1
-			log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
+		status := <-upload_status
+		if status.StatusCode == 200 {
+			// good news!
+			remaining_replicas -= 1
+		} else {
+			// writing to keep server failed for some reason
+			log.Printf("Keep server put to %v failed with '%v'",
+				status.Url, status.Err)
 		}
+		active -= 1
+		log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
 	}
 
 	return (this.Want_replicas - remaining_replicas), nil
diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index 78ab027..2217dd3 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -117,6 +117,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 func (this *StreamReader) Close() error {
 	this.stream.subtract_reader <- true
 	close(this.responses)
+	this.stream = nil
 	return nil
 }
 

commit 38cef39fbdfeb8176e4c755d12e43a450e868439
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed May 21 11:00:21 2014 -0400

    2798: Renamed internal messaging structs in an attempt to use the word "reader"
    slightly less.  Refactored tests to reduce redundancy slightly.  Added test with large number of concurrent readers.  Rewrote "how to use" package comments and wrote a small novel about the "theory of operation".

diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index ba49fb3..78ab027 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -1,23 +1,35 @@
-/* Implements a buffer that supports concurrent incremental read and append.
-New readers start reading from the beginning of the buffer, block when reaching
-the end of the buffer, and are unblocked as new data is added.
+/* AsyncStream pulls data in from a io.Reader source (such as a file or network
+socket) and fans out to any number of StreamReader sinks.
+
+Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
+any point in the lifetime of the AsyncStream, and each StreamReader will read
+the contents of the buffer up to the "frontier" of the buffer, at which point
+the StreamReader blocks until new data is read from the source.
+
+This is useful for minimizing readthrough latency as sinks can read and act on
+data from the source without waiting for the source to be completely buffered.
+It is also useful as a cache in situations where re-reading the original source
+potentially is costly, since the buffer retains a copy of the source data.
 
 Usage:
 
 Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  tr := StartTransferFromReader(buffersize, source)
+  stream := AsyncStreamFromReader(buffersize, source)
 
-To create a new reader (this can be called multiple times):
-  r := tr.MakeStreamReader()
+To create a new reader (this can be called multiple times, each reader starts
+at the beginning of the buffer):
+  reader := tr.MakeStreamReader()
 
-When you're done with the buffer:
-  tr.Close()
+Make sure to close the reader when you're done with it.
+  reader.Close()
 
+When you're done with the stream:
+  stream.Close()
 
 Alternately, if you already have a filled buffer and just want to read out from it:
-  tr := StartTransferFromSlice(buf)
+  stream := AsyncStreamFromSlice(buf)
+
   r := tr.MakeStreamReader()
-  tr.Close()
 
 */
 
@@ -28,35 +40,33 @@ import (
 )
 
 type AsyncStream struct {
-	requests          chan readRequest
+	buffer            []byte
+	requests          chan sliceRequest
 	add_reader        chan bool
 	subtract_reader   chan bool
 	wait_zero_readers chan bool
-	Reader_status     chan error
 }
 
 // Reads from the buffer managed by the Transfer()
 type StreamReader struct {
 	offset    int
 	stream    *AsyncStream
-	responses chan readResult
+	responses chan sliceResult
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-	buf := make([]byte, buffersize)
-
-	t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), make(chan error)}
+	t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-	go transfer(buf, source, t.requests, t.Reader_status)
+	go t.transfer(source)
 	go t.readersMonitor()
 
 	return t
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-	t := &AsyncStream{make(chan readRequest), make(chan bool), make(chan bool), make(chan bool), nil}
+	t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
 
-	go transfer(buf, nil, t.requests, nil)
+	go t.transfer(nil)
 	go t.readersMonitor()
 
 	return t
@@ -64,12 +74,12 @@ func AsyncStreamFromSlice(buf []byte) *AsyncStream {
 
 func (this *AsyncStream) MakeStreamReader() *StreamReader {
 	this.add_reader <- true
-	return &StreamReader{0, this, make(chan readResult)}
+	return &StreamReader{0, this, make(chan sliceResult)}
 }
 
 // Reads from the buffer managed by the Transfer()
 func (this *StreamReader) Read(p []byte) (n int, err error) {
-	this.stream.requests <- readRequest{this.offset, len(p), this.responses}
+	this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
 	rr, valid := <-this.responses
 	if valid {
 		this.offset += len(rr.slice)
@@ -83,7 +93,7 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 	// Record starting offset in order to correctly report the number of bytes sent
 	starting_offset := this.offset
 	for {
-		this.stream.requests <- readRequest{this.offset, 32 * 1024, this.responses}
+		this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
 		rr, valid := <-this.responses
 		if valid {
 			this.offset += len(rr.slice)
@@ -116,7 +126,4 @@ func (this *AsyncStream) Close() {
 	close(this.add_reader)
 	close(this.subtract_reader)
 	close(this.wait_zero_readers)
-	if this.Reader_status != nil {
-		close(this.Reader_status)
-	}
 }
diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go
index 33f84b8..853d7d3 100644
--- a/sdk/go/src/arvados.org/streamer/streamer_test.go
+++ b/sdk/go/src/arvados.org/streamer/streamer_test.go
@@ -15,130 +15,110 @@ var _ = Suite(&StandaloneSuite{})
 // Standalone tests
 type StandaloneSuite struct{}
 
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+	ReadIntoBufferHelper(c, 225)
+	ReadIntoBufferHelper(c, 224)
+}
+
+func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+	out := make([]byte, 128)
+	for i := 0; i < 128; i += 1 {
+		out[i] = byte(i)
+	}
+	writer.Write(out)
+	s1 := <-slices
+	c.Check(len(s1.slice), Equals, 128)
+	c.Check(s1.reader_error, Equals, nil)
+	for i := 0; i < 128; i += 1 {
+		c.Check(s1.slice[i], Equals, byte(i))
+	}
+	for i := 0; i < len(buffer); i += 1 {
+		if i < 128 {
+			c.Check(buffer[i], Equals, byte(i))
+		} else {
+			c.Check(buffer[i], Equals, byte(0))
+		}
+	}
+}
+
+func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
+	out := make([]byte, 96)
+	for i := 0; i < 96; i += 1 {
+		out[i] = byte(i / 2)
+	}
+	writer.Write(out)
+	s1 := <-slices
+	c.Check(len(s1.slice), Equals, 96)
+	c.Check(s1.reader_error, Equals, nil)
+	for i := 0; i < 96; i += 1 {
+		c.Check(s1.slice[i], Equals, byte(i/2))
+	}
+	for i := 0; i < len(buffer); i += 1 {
+		if i < 128 {
+			c.Check(buffer[i], Equals, byte(i))
+		} else if i < (128 + 96) {
+			c.Check(buffer[i], Equals, byte((i-128)/2))
+		} else {
+			c.Check(buffer[i], Equals, byte(0))
+		}
+	}
+}
+
 func ReadIntoBufferHelper(c *C, bufsize int) {
 	buffer := make([]byte, bufsize)
 
 	reader, writer := io.Pipe()
-	slices := make(chan readerSlice)
+	slices := make(chan nextSlice)
 
 	go readIntoBuffer(buffer, reader, slices)
 
-	{
-		out := make([]byte, 128)
-		for i := 0; i < 128; i += 1 {
-			out[i] = byte(i)
-		}
-		writer.Write(out)
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 128)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 128; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
-	}
-	{
-		out := make([]byte, 96)
-		for i := 0; i < 96; i += 1 {
-			out[i] = byte(i / 2)
-		}
-		writer.Write(out)
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 96)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 96; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i/2))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else if i < (128 + 96) {
-				c.Check(buffer[i], Equals, byte((i-128)/2))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
-	}
-	{
-		writer.Close()
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 0)
-		c.Check(s1.reader_error, Equals, io.EOF)
-	}
-}
+	HelperWrite128andCheck(c, buffer, writer, slices)
+	HelperWrite96andCheck(c, buffer, writer, slices)
 
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-	ReadIntoBufferHelper(c, 512)
-	ReadIntoBufferHelper(c, 225)
-	ReadIntoBufferHelper(c, 224)
+	writer.Close()
+	s1 := <-slices
+	c.Check(len(s1.slice), Equals, 0)
+	c.Check(s1.reader_error, Equals, io.EOF)
 }
 
 func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
 	buffer := make([]byte, 223)
 	reader, writer := io.Pipe()
-	slices := make(chan readerSlice)
+	slices := make(chan nextSlice)
 
 	go readIntoBuffer(buffer, reader, slices)
 
-	{
-		out := make([]byte, 128)
-		for i := 0; i < 128; i += 1 {
-			out[i] = byte(i)
-		}
-		writer.Write(out)
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 128)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 128; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
+	HelperWrite128andCheck(c, buffer, writer, slices)
+
+	out := make([]byte, 96)
+	for i := 0; i < 96; i += 1 {
+		out[i] = byte(i / 2)
 	}
-	{
-		out := make([]byte, 96)
-		for i := 0; i < 96; i += 1 {
-			out[i] = byte(i / 2)
-		}
 
-		// Write will deadlock because it can't write all the data, so
-		// spin it off to a goroutine
-		go writer.Write(out)
-		s1 := <-slices
+	// Write will deadlock because it can't write all the data, so
+	// spin it off to a goroutine
+	go writer.Write(out)
+	s1 := <-slices
 
-		c.Check(len(s1.slice), Equals, 95)
-		c.Check(s1.reader_error, Equals, nil)
-		for i := 0; i < 95; i += 1 {
-			c.Check(s1.slice[i], Equals, byte(i/2))
-		}
-		for i := 0; i < len(buffer); i += 1 {
-			if i < 128 {
-				c.Check(buffer[i], Equals, byte(i))
-			} else if i < (128 + 95) {
-				c.Check(buffer[i], Equals, byte((i-128)/2))
-			} else {
-				c.Check(buffer[i], Equals, byte(0))
-			}
-		}
+	c.Check(len(s1.slice), Equals, 95)
+	c.Check(s1.reader_error, Equals, nil)
+	for i := 0; i < 95; i += 1 {
+		c.Check(s1.slice[i], Equals, byte(i/2))
 	}
-	{
-		writer.Close()
-		s1 := <-slices
-		c.Check(len(s1.slice), Equals, 0)
-		c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+	for i := 0; i < len(buffer); i += 1 {
+		if i < 128 {
+			c.Check(buffer[i], Equals, byte(i))
+		} else if i < (128 + 95) {
+			c.Check(buffer[i], Equals, byte((i-128)/2))
+		} else {
+			c.Check(buffer[i], Equals, byte(0))
+		}
 	}
 
+	writer.Close()
+	s1 = <-slices
+	c.Check(len(s1.slice), Equals, 0)
+	c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
 }
 
 func (s *StandaloneSuite) TestTransfer(c *C) {
@@ -227,8 +207,6 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
 	{
 		// Test closing the reader
 		writer.Close()
-		status := <-tr.Reader_status
-		c.Check(status, Equals, io.EOF)
 
 		in := make([]byte, 256)
 		n1, err1 := br1.Read(in)
@@ -262,23 +240,21 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
 func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
 	reader, writer := io.Pipe()
 
-	// Buffer for reads from 'r'
-	buffer := make([]byte, 100)
-
-	// Read requests on Transfer() buffer
-	requests := make(chan readRequest)
-	defer close(requests)
-
-	// Reporting reader error states
-	reader_status := make(chan error)
+	tr := AsyncStreamFromReader(100, reader)
+	defer tr.Close()
 
-	go transfer(buffer, reader, requests, reader_status)
+	sr := tr.MakeStreamReader()
+	defer sr.Close()
 
 	out := make([]byte, 101)
 	go writer.Write(out)
 
-	status := <-reader_status
-	c.Check(status, Equals, io.ErrShortBuffer)
+	n, err := sr.Read(out)
+	c.Check(n, Equals, 100)
+
+	n, err = sr.Read(out)
+	c.Check(n, Equals, 0)
+	c.Check(err, Equals, io.ErrShortBuffer)
 }
 
 func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
@@ -346,3 +322,45 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
 
 	io.Copy(writer, br1)
 }
+
+func (s *StandaloneSuite) TestManyReaders(c *C) {
+	reader, writer := io.Pipe()
+
+	tr := AsyncStreamFromReader(512, reader)
+	defer tr.Close()
+
+	sr := tr.MakeStreamReader()
+	go func() {
+		time.Sleep(100 * time.Millisecond)
+		sr.Close()
+	}()
+
+	for i := 0; i < 200; i += 1 {
+		go func() {
+			br1 := tr.MakeStreamReader()
+			defer br1.Close()
+
+			p := make([]byte, 3)
+			n, err := br1.Read(p)
+			c.Check(n, Equals, 3)
+			c.Check(p[0:3], DeepEquals, []byte("foo"))
+
+			n, err = br1.Read(p)
+			c.Check(n, Equals, 3)
+			c.Check(p[0:3], DeepEquals, []byte("bar"))
+
+			n, err = br1.Read(p)
+			c.Check(n, Equals, 3)
+			c.Check(p[0:3], DeepEquals, []byte("baz"))
+
+			n, err = br1.Read(p)
+			c.Check(n, Equals, 0)
+			c.Check(err, Equals, io.EOF)
+		}()
+	}
+
+	writer.Write([]byte("foo"))
+	writer.Write([]byte("bar"))
+	writer.Write([]byte("baz"))
+	writer.Close()
+}
diff --git a/sdk/go/src/arvados.org/streamer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go
index 77242f1..a4a194f 100644
--- a/sdk/go/src/arvados.org/streamer/transfer.go
+++ b/sdk/go/src/arvados.org/streamer/transfer.go
@@ -1,3 +1,53 @@
+/* Internal implementation of AsyncStream.
+Outline of operation:
+
+The kernel is the transfer() goroutine.  It manages concurrent reads and
+appends to the "body" slice.  "body" is a slice of "source_buffer" that
+represents the segment of the buffer that is already filled in and available
+for reading.
+
+To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
+from the io.Reader source directly into source_buffer.  Each read goes into a
+slice of buffer which spans the section immediately following the end of the
+current "body".  Each time a Read completes, a slice representing the the
+section just filled in (or any read errors/EOF) is sent over the "slices"
+channel back to the transfer() function.
+
+Meanwhile, the transfer() function selects() on two channels, the "requests"
+channel and the "slices" channel.
+
+When a message is recieved on the "slices" channel, this means the a new
+section of the buffer has data, or an error is signaled.  Since the data has
+been read directly into the source_buffer, it is able to simply increases the
+size of the body slice to encompass the newly filled in section.  Then any
+pending reads are serviced with handleReadRequest (described below).
+
+When a message is recieved on the "requests" channel, it means a StreamReader
+wants access to a slice of the buffer.  This is passed to handleReadRequest().
+
+The handleReadRequest() function takes a sliceRequest consisting of a buffer
+offset, maximum size, and channel to send the response.  If there was an error
+reported from the source reader, it is returned.  If the offset is less than
+the size of the body, the request can proceed, and it sends a body slice
+spanning the segment from offset to min(offset+maxsize, end of the body).  If
+source reader status is EOF (done filling the buffer) and the read request
+offset is beyond end of the body, it responds with EOF.  Otherwise, the read
+request is for a slice beyond the current size of "body" but we expect the body
+to expand as more data is added, so the request gets added to a wait list.
+
+The transfer() runs until the requests channel is closed by AsyncStream.Close()
+
+To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
+chooses which channels to receive from based on the number of outstanding
+readers.  When a new reader is created, it sends a message on the add_reader
+channel.  If the number of readers is already at MAX_READERS, this blocks the
+sender until an existing reader is closed.  When a reader is closed, it sends a
+message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
+called, it sends a message on the wait_zero_readers channel, which will block
+the sender unless there are zero readers and it is safe to shut down the
+AsyncStream.
+*/
+
 package streamer
 
 import (
@@ -7,20 +57,20 @@ import (
 const MAX_READERS = 100
 
 // A slice passed from readIntoBuffer() to transfer()
-type readerSlice struct {
+type nextSlice struct {
 	slice        []byte
 	reader_error error
 }
 
 // A read request to the Transfer() function
-type readRequest struct {
+type sliceRequest struct {
 	offset  int
 	maxsize int
-	result  chan<- readResult
+	result  chan<- sliceResult
 }
 
 // A read result from the Transfer() function
-type readResult struct {
+type sliceResult struct {
 	slice []byte
 	err   error
 }
@@ -41,16 +91,16 @@ func (this *bufferWriter) Write(p []byte) (n int, err error) {
 // Read repeatedly from the reader and write sequentially into the specified
 // buffer, and report each read to channel 'c'.  Completes when Reader 'r'
 // reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
+func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
 	defer close(slices)
 
 	if writeto, ok := r.(io.WriterTo); ok {
 		n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
 		if err != nil {
-			slices <- readerSlice{nil, err}
+			slices <- nextSlice{nil, err}
 		} else {
-			slices <- readerSlice{buffer[:n], nil}
-			slices <- readerSlice{nil, io.EOF}
+			slices <- nextSlice{buffer[:n], nil}
+			slices <- nextSlice{nil, io.EOF}
 		}
 		return
 	} else {
@@ -75,23 +125,23 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
 				if n > 0 {
 					// Reader has more data but we have nowhere to
 					// put it, so we're stuffed
-					slices <- readerSlice{nil, io.ErrShortBuffer}
+					slices <- nextSlice{nil, io.ErrShortBuffer}
 				} else {
 					// Return some other error (hopefully EOF)
-					slices <- readerSlice{nil, err}
+					slices <- nextSlice{nil, err}
 				}
 				return
 			}
 
 			// End on error (includes EOF)
 			if err != nil {
-				slices <- readerSlice{nil, err}
+				slices <- nextSlice{nil, err}
 				return
 			}
 
 			if n > 0 {
 				// Make a slice with the contents of the read
-				slices <- readerSlice{ptr[:n], nil}
+				slices <- nextSlice{ptr[:n], nil}
 
 				// Adjust the scratch space slice
 				ptr = ptr[n:]
@@ -102,18 +152,21 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
 
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
-func handleReadRequest(req readRequest, body []byte, complete bool) bool {
-	if req.offset < len(body) {
+func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
+	if (reader_status != nil) && (reader_status != io.EOF) {
+		req.result <- sliceResult{nil, reader_status}
+		return true
+	} else if req.offset < len(body) {
 		var end int
 		if req.offset+req.maxsize < len(body) {
 			end = req.offset + req.maxsize
 		} else {
 			end = len(body)
 		}
-		req.result <- readResult{body[req.offset:end], nil}
+		req.result <- sliceResult{body[req.offset:end], nil}
 		return true
-	} else if complete && req.offset >= len(body) {
-		req.result <- readResult{nil, io.EOF}
+	} else if (reader_status == io.EOF) && (req.offset >= len(body)) {
+		req.result <- sliceResult{nil, io.EOF}
 		return true
 	} else {
 		return false
@@ -125,15 +178,18 @@ func handleReadRequest(req readRequest, body []byte, complete bool) bool {
 // in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
 // Accepts read requests on the buffer on the 'requests' channel.  Completes
 // when 'requests' channel is closed.
-func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan readRequest, reader_error chan error) {
+func (this *AsyncStream) transfer(source_reader io.Reader) {
+	source_buffer := this.buffer
+	requests := this.requests
+
 	// currently buffered data
 	var body []byte
 
 	// for receiving slices from readIntoBuffer
-	var slices chan readerSlice = nil
+	var slices chan nextSlice = nil
 
-	// indicates whether the buffered data is complete
-	var complete bool = false
+	// indicates the status of the underlying reader
+	var reader_status error = nil
 
 	if source_reader != nil {
 		// 'body' is the buffer slice representing the body content read so far
@@ -141,7 +197,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 
 		// used to communicate slices of the buffer as they are
 		// readIntoBuffer will close 'slices' when it is done with it
-		slices = make(chan readerSlice)
+		slices = make(chan nextSlice)
 
 		// Spin it off
 		go readIntoBuffer(source_buffer, source_reader, slices)
@@ -150,17 +206,17 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 		body = source_buffer[:]
 
 		// buffer is complete
-		complete = true
+		reader_status = io.EOF
 	}
 
-	pending_requests := make([]readRequest, 0)
+	pending_requests := make([]sliceRequest, 0)
 
 	for {
 		select {
 		case req, valid := <-requests:
 			// Handle a buffer read request
 			if valid {
-				if !handleReadRequest(req, body, complete) {
+				if !handleReadRequest(req, body, reader_status) {
 					pending_requests = append(pending_requests, req)
 				}
 			} else {
@@ -171,17 +227,7 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 		case bk, valid := <-slices:
 			// Got a new slice from the reader
 			if valid {
-				if bk.reader_error != nil {
-					reader_error <- bk.reader_error
-					if bk.reader_error == io.EOF {
-						// EOF indicates the reader is done
-						// sending, so our buffer is complete.
-						complete = true
-					} else {
-						// some other reader error
-						return
-					}
-				}
+				reader_status = bk.reader_error
 
 				if bk.slice != nil {
 					// adjust body bounds now that another slice has been read
@@ -191,12 +237,9 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 				// handle pending reads
 				n := 0
 				for n < len(pending_requests) {
-					if handleReadRequest(pending_requests[n], body, complete) {
-
-						// move the element from the
-						// back of the slice to
-						// position 'n', then shorten
-						// the slice by one element
+					if handleReadRequest(pending_requests[n], body, reader_status) {
+						// move the element from the back of the slice to
+						// position 'n', then shorten the slice by one element
 						pending_requests[n] = pending_requests[len(pending_requests)-1]
 						pending_requests = pending_requests[0 : len(pending_requests)-1]
 					} else {
@@ -206,14 +249,13 @@ func transfer(source_buffer []byte, source_reader io.Reader, requests <-chan rea
 					}
 				}
 			} else {
-				if complete {
-					// no more reads
-					slices = nil
+				if reader_status == io.EOF {
+					// no more reads expected, so this is ok
 				} else {
-					// reader channel closed without signaling EOF
-					reader_error <- io.ErrUnexpectedEOF
-					return
+					// slices channel closed without signaling EOF
+					reader_status = io.ErrUnexpectedEOF
 				}
+				slices = nil
 			}
 		}
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list