[ARVADOS] created: 1.1.0-165-g4f1a135

Git user git at public.curoverse.com
Thu Nov 23 01:30:02 EST 2017


        at  4f1a135e93df78bb833dff32562efe713c6f690e (commit)


commit 4f1a135e93df78bb833dff32562efe713c6f690e
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Nov 23 01:04:38 2017 -0500

    12475: Add TestManyFailedPuts with a short timeout.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index 25619bb..23bb2bd 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -11,10 +11,12 @@ import (
 	"fmt"
 	"io/ioutil"
 	"log"
+	"math/rand"
 	"net/http"
 	"net/http/httptest"
 	"os"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -216,6 +218,33 @@ func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
 	}
 }
 
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+	kc := runProxy(c, nil, false)
+	defer closeListener()
+	router.(*proxyHandler).timeout = time.Nanosecond
+
+	buf := make([]byte, 1<<20)
+	rand.Read(buf)
+	var wg sync.WaitGroup
+	for i := 0; i < 128; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			kc.PutB(buf)
+		}()
+	}
+	done := make(chan bool)
+	go func() {
+		wg.Wait()
+		close(done)
+	}()
+	select {
+	case <-done:
+	case <-time.After(10 * time.Second):
+		c.Error("timeout")
+	}
+}
+
 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
 	kc := runProxy(c, nil, false)
 	defer closeListener()

commit abc241fb83523ae5ae5905ae47210f15d7e0671c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Wed Nov 22 23:11:43 2017 -0500

    12475: Rewrite streamer -> asyncbuf.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 433685c..3cfc692 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -100,7 +100,7 @@ sdk/go/health
 sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
 sdk/go/stats
 sdk/go/crunchrunner
 sdk/cwl
@@ -829,7 +829,7 @@ gostuff=(
     sdk/go/health
     sdk/go/httpserver
     sdk/go/manifest
-    sdk/go/streamer
+    sdk/go/asyncbuf
     sdk/go/crunchrunner
     sdk/go/stats
     lib/crunchstat
diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go
new file mode 100644
index 0000000..b3b9bf2
--- /dev/null
+++ b/sdk/go/asyncbuf/buf.go
@@ -0,0 +1,101 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+	"bytes"
+	"io"
+	"sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+	io.WriteCloser
+
+	// NewReader() returns an io.Reader that reads all data
+	// written to the Buffer.
+	NewReader() io.Reader
+
+	// Close, but return the given error (instead of io.EOF) to
+	// all readers when they reach the end of the buffer.
+	//
+	// CloseWithError(nil) is equivalent to
+	// CloseWithError(io.EOF).
+	CloseWithError(error) error
+}
+
+type buffer struct {
+	data *bytes.Buffer
+	cond sync.Cond
+	err  error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+	return &buffer{
+		data: bytes.NewBuffer(buf),
+		cond: sync.Cond{L: &sync.Mutex{}},
+	}
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+	defer b.cond.Broadcast()
+	b.cond.L.Lock()
+	defer b.cond.L.Unlock()
+	if b.err != nil {
+		return 0, b.err
+	}
+	return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+	return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+	defer b.cond.Broadcast()
+	b.cond.L.Lock()
+	defer b.cond.L.Unlock()
+	if err == nil {
+		b.err = io.EOF
+	} else {
+		b.err = err
+	}
+	return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+	return &reader{b: b}
+}
+
+type reader struct {
+	b    *buffer
+	read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+	r.b.cond.L.Lock()
+	defer r.b.cond.L.Unlock()
+	for {
+		if r.b.data.Len() > r.read || len(p) == 0 {
+			n := copy(p, r.b.data.Bytes()[r.read:])
+			r.read += n
+			return n, nil
+		}
+		if r.b.err != nil {
+			return 0, r.b.err
+		}
+		r.b.cond.Wait()
+	}
+}
diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go
new file mode 100644
index 0000000..845853b
--- /dev/null
+++ b/sdk/go/asyncbuf/buf_test.go
@@ -0,0 +1,159 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+	"crypto/md5"
+	"errors"
+	"io"
+	"io/ioutil"
+	"math/rand"
+	"testing"
+	"time"
+
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+	b := NewBuffer(nil)
+	r1 := b.NewReader()
+	r2 := b.NewReader()
+	b.Close()
+	s.checkReader(c, r1, []byte{}, nil, nil)
+	s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+	b := NewBuffer(nil)
+	n, err := b.Write([]byte("foobar"))
+	err2 := b.Close()
+	c.Check(n, check.Equals, 6)
+	c.Check(err, check.IsNil)
+	c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+	done := make(chan bool, 2)
+	b := NewBuffer(nil)
+	n, err := b.Write([]byte("foobar"))
+	c.Check(n, check.Equals, 6)
+	c.Check(err, check.IsNil)
+	r1 := b.NewReader()
+	r2 := b.NewReader()
+	go s.checkReader(c, r1, []byte("foobar"), nil, done)
+	go s.checkReader(c, r2, []byte("foobar"), nil, done)
+	time.Sleep(time.Millisecond)
+	c.Check(len(done), check.Equals, 0)
+	b.Close()
+	<-done
+	<-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+	done := make(chan bool, 2)
+	b := NewBuffer([]byte("baz"))
+	n, err := b.Write([]byte("waz"))
+	c.Check(n, check.Equals, 3)
+	c.Check(err, check.IsNil)
+	b.Close()
+	r1 := b.NewReader()
+	go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+	r2 := b.NewReader()
+	go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+	<-done
+	<-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+	done := make(chan bool, 1)
+	b := NewBuffer(nil)
+	r1 := b.NewReader()
+	go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+	b.Write([]byte("bazwaz"))
+
+	r2 := b.NewReader()
+	r2.Read(make([]byte, 3))
+
+	b.Write([]byte("qux"))
+	b.Close()
+
+	s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+	<-done
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+	errFake := errors.New("it's not even a real error")
+
+	done := make(chan bool, 1)
+	b := NewBuffer(nil)
+	r1 := b.NewReader()
+	go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+	b.Write([]byte("bazwaz"))
+
+	r2 := b.NewReader()
+	r2.Read(make([]byte, 3))
+
+	b.Write([]byte("qux"))
+	b.CloseWithError(errFake)
+
+	s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+	<-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+	const n = 256
+
+	b := NewBuffer(nil)
+
+	expectSum := make(chan []byte)
+	go func() {
+		hash := md5.New()
+		buf := make([]byte, n)
+		for i := 0; i < n; i++ {
+			time.Sleep(10 * time.Nanosecond)
+			rand.Read(buf)
+			b.Write(buf)
+			hash.Write(buf)
+		}
+		expectSum <- hash.Sum(nil)
+		b.Close()
+	}()
+
+	gotSum := make(chan []byte)
+	for i := 0; i < n; i++ {
+		go func(bufSize int) {
+			got := md5.New()
+			io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+			gotSum <- got.Sum(nil)
+		}(i + n/2)
+	}
+
+	expect := <-expectSum
+	for i := 0; i < n; i++ {
+		c.Check(expect, check.DeepEquals, <-gotSum)
+	}
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+	buf, err := ioutil.ReadAll(r)
+	c.Check(err, check.Equals, expectError)
+	c.Check(buf, check.DeepEquals, expectData)
+	if done != nil {
+		done <- true
+	}
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
diff --git a/sdk/go/keepclient/hashcheck.go b/sdk/go/keepclient/hashcheck.go
index 726b813..9295c14 100644
--- a/sdk/go/keepclient/hashcheck.go
+++ b/sdk/go/keepclient/hashcheck.go
@@ -72,19 +72,16 @@ func (this HashCheckingReader) Close() (err error) {
 	_, err = io.Copy(this.Hash, this.Reader)
 
 	if closer, ok := this.Reader.(io.Closer); ok {
-		err2 := closer.Close()
-		if err2 != nil && err == nil {
-			return err2
+		closeErr := closer.Close()
+		if err == nil {
+			err = closeErr
 		}
 	}
 	if err != nil {
 		return err
 	}
-
-	sum := this.Hash.Sum(nil)
-	if fmt.Sprintf("%x", sum) != this.Check {
-		err = BadChecksum
+	if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+		return BadChecksum
 	}
-
-	return err
+	return nil
 }
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index cbfad81..37d651e 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -21,7 +21,7 @@ import (
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
+	"git.curoverse.com/arvados.git/sdk/go/asyncbuf"
 )
 
 // A Keep "block" is 64MB.
@@ -156,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 		bufsize = BLOCKSIZE
 	}
 
-	t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
-	defer t.Close()
-
-	return kc.putReplicas(hash, t, dataBytes)
+	buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+	go func() {
+		_, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+		buf.CloseWithError(err)
+	}()
+	return kc.putReplicas(hash, buf.NewReader, dataBytes)
 }
 
 // PutHB writes a block to Keep. The hash of the bytes is given in
@@ -167,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
 //
 // Return values are the same as for PutHR.
 func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
-	t := streamer.AsyncStreamFromSlice(buf)
-	defer t.Close()
-	return kc.putReplicas(hash, t, int64(len(buf)))
+	newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+	return kc.putReplicas(hash, newReader, int64(len(buf)))
 }
 
 // PutB writes a block to Keep. It computes the hash itself.
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 3ce4e74..055141c 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -5,6 +5,7 @@
 package keepclient
 
 import (
+	"bytes"
 	"crypto/md5"
 	"errors"
 	"fmt"
@@ -20,7 +21,6 @@ import (
 
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	. "gopkg.in/check.v1"
 )
 
@@ -172,18 +172,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 		make(chan string)}
 
 	UploadToStubHelper(c, st,
-		func(kc *KeepClient, url string, reader io.ReadCloser,
-			writer io.WriteCloser, upload_status chan uploadStatus) {
-
-			tr := streamer.AsyncStreamFromReader(512, reader)
-			defer tr.Close()
-
-			br1 := tr.MakeStreamReader()
-
-			go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
-			writer.Write([]byte("foo"))
-			writer.Close()
+		func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+			go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
 
 			<-st.handled
 
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 49ef543..3791250 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -17,7 +17,6 @@ import (
 	"strings"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
 )
 
 // Function used to emit debug messages. The easiest way to enable
@@ -57,7 +56,7 @@ type uploadStatus struct {
 	response        string
 }
 
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
 	upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
 
 	var req *http.Request
@@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
 	if req, err = http.NewRequest("PUT", url, nil); err != nil {
 		DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
 		upload_status <- uploadStatus{err, url, 0, 0, ""}
-		body.Close()
 		return
 	}
 
 	req.ContentLength = expectedLength
 	if expectedLength > 0 {
-		// Do() will close the body ReadCloser when it is done
-		// with it.
-		req.Body = body
+		req.Body = ioutil.NopCloser(body)
 	} else {
-		// "For client requests, a value of 0 means unknown if Body is
-		// not nil."  In this case we do want the body to be empty, so
-		// don't set req.Body.  However, we still need to close the
-		// body ReadCloser.
-		body.Close()
+		// "For client requests, a value of 0 means unknown if
+		// Body is not nil."  In this case we do want the body
+		// to be empty, so don't set req.Body.
 	}
 
 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
@@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
 
 func (this *KeepClient) putReplicas(
 	hash string,
-	tr *streamer.AsyncStream,
+	getReader func() io.Reader,
 	expectedLength int64) (locator string, replicas int, err error) {
 
 	// Generate an arbitrary ID to identify this specific
@@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas(
 				// Start some upload requests
 				if next_server < len(sv) {
 					DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
-					go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+					go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
 					next_server += 1
 					active += 1
 				} else {
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
deleted file mode 100644
index 396e311..0000000
--- a/sdk/go/streamer/streamer.go
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
-  stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
-  reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
-  reader.Close()
-
-When you're done with the stream:
-  stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
-  stream := AsyncStreamFromSlice(buf)
-
-  r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
-	"errors"
-	"io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
-	buffer            []byte
-	requests          chan sliceRequest
-	add_reader        chan bool
-	subtract_reader   chan bool
-	wait_zero_readers chan bool
-	closed            bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
-	offset    int
-	stream    *AsyncStream
-	responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-	t := &AsyncStream{
-		buffer:            make([]byte, buffersize),
-		requests:          make(chan sliceRequest),
-		add_reader:        make(chan bool),
-		subtract_reader:   make(chan bool),
-		wait_zero_readers: make(chan bool),
-	}
-
-	go t.transfer(source)
-	go t.readersMonitor()
-
-	return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-	t := &AsyncStream{
-		buffer:            buf,
-		requests:          make(chan sliceRequest),
-		add_reader:        make(chan bool),
-		subtract_reader:   make(chan bool),
-		wait_zero_readers: make(chan bool),
-	}
-
-	go t.transfer(nil)
-	go t.readersMonitor()
-
-	return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
-	this.add_reader <- true
-	return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
-	this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
-	rr, valid := <-this.responses
-	if valid {
-		this.offset += len(rr.slice)
-		return copy(p, rr.slice), rr.err
-	} else {
-		return 0, io.ErrUnexpectedEOF
-	}
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
-	// Record starting offset in order to correctly report the number of bytes sent
-	starting_offset := this.offset
-	for {
-		this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
-		rr, valid := <-this.responses
-		if valid {
-			this.offset += len(rr.slice)
-			if rr.err != nil {
-				if rr.err == io.EOF {
-					// EOF is not an error.
-					return int64(this.offset - starting_offset), nil
-				} else {
-					return int64(this.offset - starting_offset), rr.err
-				}
-			} else {
-				dest.Write(rr.slice)
-			}
-		} else {
-			return int64(this.offset), io.ErrUnexpectedEOF
-		}
-	}
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
-	if this.stream == nil {
-		return ErrAlreadyClosed
-	}
-	this.stream.subtract_reader <- true
-	close(this.responses)
-	this.stream = nil
-	return nil
-}
-
-func (this *AsyncStream) Close() error {
-	if this.closed {
-		return ErrAlreadyClosed
-	}
-	this.closed = true
-	this.wait_zero_readers <- true
-	close(this.requests)
-	close(this.add_reader)
-	close(this.subtract_reader)
-	close(this.wait_zero_readers)
-	return nil
-}
diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go
deleted file mode 100644
index f8ddbf5..0000000
--- a/sdk/go/streamer/streamer_test.go
+++ /dev/null
@@ -1,381 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
-	. "gopkg.in/check.v1"
-	"io"
-	"testing"
-	"time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
-	ReadIntoBufferHelper(c, 225)
-	ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-	out := make([]byte, 128)
-	for i := 0; i < 128; i += 1 {
-		out[i] = byte(i)
-	}
-	writer.Write(out)
-	s1 := <-slices
-	c.Check(len(s1.slice), Equals, 128)
-	c.Check(s1.reader_error, Equals, nil)
-	for i := 0; i < 128; i += 1 {
-		c.Check(s1.slice[i], Equals, byte(i))
-	}
-	for i := 0; i < len(buffer); i += 1 {
-		if i < 128 {
-			c.Check(buffer[i], Equals, byte(i))
-		} else {
-			c.Check(buffer[i], Equals, byte(0))
-		}
-	}
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
-	out := make([]byte, 96)
-	for i := 0; i < 96; i += 1 {
-		out[i] = byte(i / 2)
-	}
-	writer.Write(out)
-	s1 := <-slices
-	c.Check(len(s1.slice), Equals, 96)
-	c.Check(s1.reader_error, Equals, nil)
-	for i := 0; i < 96; i += 1 {
-		c.Check(s1.slice[i], Equals, byte(i/2))
-	}
-	for i := 0; i < len(buffer); i += 1 {
-		if i < 128 {
-			c.Check(buffer[i], Equals, byte(i))
-		} else if i < (128 + 96) {
-			c.Check(buffer[i], Equals, byte((i-128)/2))
-		} else {
-			c.Check(buffer[i], Equals, byte(0))
-		}
-	}
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
-	buffer := make([]byte, bufsize)
-
-	reader, writer := io.Pipe()
-	slices := make(chan nextSlice)
-
-	go readIntoBuffer(buffer, reader, slices)
-
-	HelperWrite128andCheck(c, buffer, writer, slices)
-	HelperWrite96andCheck(c, buffer, writer, slices)
-
-	writer.Close()
-	s1 := <-slices
-	c.Check(len(s1.slice), Equals, 0)
-	c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
-	buffer := make([]byte, 223)
-	reader, writer := io.Pipe()
-	slices := make(chan nextSlice)
-
-	go readIntoBuffer(buffer, reader, slices)
-
-	HelperWrite128andCheck(c, buffer, writer, slices)
-
-	out := make([]byte, 96)
-	for i := 0; i < 96; i += 1 {
-		out[i] = byte(i / 2)
-	}
-
-	// Write will deadlock because it can't write all the data, so
-	// spin it off to a goroutine
-	go writer.Write(out)
-	s1 := <-slices
-
-	c.Check(len(s1.slice), Equals, 95)
-	c.Check(s1.reader_error, Equals, nil)
-	for i := 0; i < 95; i += 1 {
-		c.Check(s1.slice[i], Equals, byte(i/2))
-	}
-	for i := 0; i < len(buffer); i += 1 {
-		if i < 128 {
-			c.Check(buffer[i], Equals, byte(i))
-		} else if i < (128 + 95) {
-			c.Check(buffer[i], Equals, byte((i-128)/2))
-		} else {
-			c.Check(buffer[i], Equals, byte(0))
-		}
-	}
-
-	writer.Close()
-	s1 = <-slices
-	c.Check(len(s1.slice), Equals, 0)
-	c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
-	reader, writer := io.Pipe()
-
-	tr := AsyncStreamFromReader(512, reader)
-
-	br1 := tr.MakeStreamReader()
-	out := make([]byte, 128)
-
-	{
-		// Write some data, and read into a buffer shorter than
-		// available data
-		for i := 0; i < 128; i += 1 {
-			out[i] = byte(i)
-		}
-
-		writer.Write(out[:100])
-
-		in := make([]byte, 64)
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 64)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 64; i += 1 {
-			c.Check(in[i], Equals, out[i])
-		}
-	}
-
-	{
-		// Write some more data, and read into buffer longer than
-		// available data
-		in := make([]byte, 64)
-		n, err := br1.Read(in)
-		c.Check(n, Equals, 36)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 36; i += 1 {
-			c.Check(in[i], Equals, out[64+i])
-		}
-
-	}
-
-	{
-		// Test read before write
-		type Rd struct {
-			n   int
-			err error
-		}
-		rd := make(chan Rd)
-		in := make([]byte, 64)
-
-		go func() {
-			n, err := br1.Read(in)
-			rd <- Rd{n, err}
-		}()
-
-		time.Sleep(100 * time.Millisecond)
-		writer.Write(out[100:])
-
-		got := <-rd
-
-		c.Check(got.n, Equals, 28)
-		c.Check(got.err, Equals, nil)
-
-		for i := 0; i < 28; i += 1 {
-			c.Check(in[i], Equals, out[100+i])
-		}
-	}
-
-	br2 := tr.MakeStreamReader()
-	{
-		// Test 'catch up' reader
-		in := make([]byte, 256)
-		n, err := br2.Read(in)
-
-		c.Check(n, Equals, 128)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 128; i += 1 {
-			c.Check(in[i], Equals, out[i])
-		}
-	}
-
-	{
-		// Test closing the reader
-		writer.Close()
-
-		in := make([]byte, 256)
-		n1, err1 := br1.Read(in)
-		n2, err2 := br2.Read(in)
-		c.Check(n1, Equals, 0)
-		c.Check(err1, Equals, io.EOF)
-		c.Check(n2, Equals, 0)
-		c.Check(err2, Equals, io.EOF)
-	}
-
-	{
-		// Test 'catch up' reader after closing
-		br3 := tr.MakeStreamReader()
-		in := make([]byte, 256)
-		n, err := br3.Read(in)
-
-		c.Check(n, Equals, 128)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 128; i += 1 {
-			c.Check(in[i], Equals, out[i])
-		}
-
-		n, err = br3.Read(in)
-
-		c.Check(n, Equals, 0)
-		c.Check(err, Equals, io.EOF)
-	}
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
-	reader, writer := io.Pipe()
-
-	tr := AsyncStreamFromReader(100, reader)
-	defer tr.Close()
-
-	sr := tr.MakeStreamReader()
-	defer sr.Close()
-
-	out := make([]byte, 101)
-	go writer.Write(out)
-
-	n, err := sr.Read(out)
-	c.Check(n, Equals, 100)
-	c.Check(err, IsNil)
-
-	n, err = sr.Read(out)
-	c.Check(n, Equals, 0)
-	c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
-	// Buffer for reads from 'r'
-	buffer := make([]byte, 100)
-	for i := 0; i < 100; i += 1 {
-		buffer[i] = byte(i)
-	}
-
-	tr := AsyncStreamFromSlice(buffer)
-
-	br1 := tr.MakeStreamReader()
-
-	in := make([]byte, 64)
-	{
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 64)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 64; i += 1 {
-			c.Check(in[i], Equals, buffer[i])
-		}
-	}
-	{
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 36)
-		c.Check(err, Equals, nil)
-
-		for i := 0; i < 36; i += 1 {
-			c.Check(in[i], Equals, buffer[64+i])
-		}
-	}
-	{
-		n, err := br1.Read(in)
-
-		c.Check(n, Equals, 0)
-		c.Check(err, Equals, io.EOF)
-	}
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
-	// Buffer for reads from 'r'
-	buffer := make([]byte, 100)
-	for i := 0; i < 100; i += 1 {
-		buffer[i] = byte(i)
-	}
-
-	tr := AsyncStreamFromSlice(buffer)
-	defer tr.Close()
-
-	br1 := tr.MakeStreamReader()
-	defer br1.Close()
-
-	reader, writer := io.Pipe()
-
-	go func() {
-		p := make([]byte, 100)
-		n, err := reader.Read(p)
-		c.Check(n, Equals, 100)
-		c.Check(err, Equals, nil)
-		c.Check(p, DeepEquals, buffer)
-	}()
-
-	io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
-	reader, writer := io.Pipe()
-
-	tr := AsyncStreamFromReader(512, reader)
-	defer tr.Close()
-
-	sr := tr.MakeStreamReader()
-	go func() {
-		time.Sleep(100 * time.Millisecond)
-		sr.Close()
-	}()
-
-	for i := 0; i < 200; i += 1 {
-		go func() {
-			br1 := tr.MakeStreamReader()
-			defer br1.Close()
-
-			p := make([]byte, 3)
-			n, err := br1.Read(p)
-			c.Check(n, Equals, 3)
-			c.Check(p[0:3], DeepEquals, []byte("foo"))
-
-			n, err = br1.Read(p)
-			c.Check(n, Equals, 3)
-			c.Check(p[0:3], DeepEquals, []byte("bar"))
-
-			n, err = br1.Read(p)
-			c.Check(n, Equals, 3)
-			c.Check(p[0:3], DeepEquals, []byte("baz"))
-
-			n, err = br1.Read(p)
-			c.Check(n, Equals, 0)
-			c.Check(err, Equals, io.EOF)
-		}()
-	}
-
-	writer.Write([]byte("foo"))
-	writer.Write([]byte("bar"))
-	writer.Write([]byte("baz"))
-	writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
-	buffer := make([]byte, 100)
-	tr := AsyncStreamFromSlice(buffer)
-	sr := tr.MakeStreamReader()
-	c.Check(sr.Close(), IsNil)
-	c.Check(sr.Close(), Equals, ErrAlreadyClosed)
-	c.Check(tr.Close(), IsNil)
-	c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go
deleted file mode 100644
index bea27f8..0000000
--- a/sdk/go/streamer/transfer.go
+++ /dev/null
@@ -1,310 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine.  It manages concurrent reads and
-appends to the "body" slice.  "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer.  Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body".  Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled.  Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section.  Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer.  This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response.  If there was an error
-reported from the source reader, it is returned.  If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body).  If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF.  Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine.  This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers.  When a new reader is created, it sends a message on the add_reader
-channel.  If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed.  When a reader is closed, it sends a
-message on the subtract_reader channel.  Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
-	"io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
-	slice        []byte
-	reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
-	offset  int
-	maxsize int
-	result  chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
-	slice []byte
-	err   error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
-	buf []byte
-	ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
-	n = copy(this.buf[this.ptr:], p)
-	this.ptr += n
-	return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'.  Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
-	defer close(slices)
-
-	if writeto, ok := r.(io.WriterTo); ok {
-		n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
-		if err != nil {
-			slices <- nextSlice{nil, err}
-		} else {
-			slices <- nextSlice{buffer[:n], nil}
-			slices <- nextSlice{nil, io.EOF}
-		}
-		return
-	} else {
-		// Initially entire buffer is available
-		ptr := buffer[:]
-		for {
-			var n int
-			var err error
-			if len(ptr) > 0 {
-				const readblock = 64 * 1024
-				// Read 64KiB into the next part of the buffer
-				if len(ptr) > readblock {
-					n, err = r.Read(ptr[:readblock])
-				} else {
-					n, err = r.Read(ptr)
-				}
-			} else {
-				// Ran out of buffer space, try reading one more byte
-				var b [1]byte
-				n, err = r.Read(b[:])
-
-				if n > 0 {
-					// Reader has more data but we have nowhere to
-					// put it, so we're stuffed
-					slices <- nextSlice{nil, io.ErrShortBuffer}
-				} else {
-					// Return some other error (hopefully EOF)
-					slices <- nextSlice{nil, err}
-				}
-				return
-			}
-
-			// End on error (includes EOF)
-			if err != nil {
-				slices <- nextSlice{nil, err}
-				return
-			}
-
-			if n > 0 {
-				// Make a slice with the contents of the read
-				slices <- nextSlice{ptr[:n], nil}
-
-				// Adjust the scratch space slice
-				ptr = ptr[n:]
-			}
-		}
-	}
-}
-
-// Handle a read request.  Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
-	if (reader_status != nil) && (reader_status != io.EOF) {
-		req.result <- sliceResult{nil, reader_status}
-		return true
-	} else if req.offset < len(body) {
-		var end int
-		if req.offset+req.maxsize < len(body) {
-			end = req.offset + req.maxsize
-		} else {
-			end = len(body)
-		}
-		req.result <- sliceResult{body[req.offset:end], nil}
-		return true
-	} else if (reader_status == io.EOF) && (req.offset >= len(body)) {
-		req.result <- sliceResult{nil, io.EOF}
-		return true
-	} else {
-		return false
-	}
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel.  Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
-	source_buffer := this.buffer
-	requests := this.requests
-
-	// currently buffered data
-	var body []byte
-
-	// for receiving slices from readIntoBuffer
-	var slices chan nextSlice = nil
-
-	// indicates the status of the underlying reader
-	var reader_status error = nil
-
-	if source_reader != nil {
-		// 'body' is the buffer slice representing the body content read so far
-		body = source_buffer[:0]
-
-		// used to communicate slices of the buffer as they are
-		// readIntoBuffer will close 'slices' when it is done with it
-		slices = make(chan nextSlice)
-
-		// Spin it off
-		go readIntoBuffer(source_buffer, source_reader, slices)
-	} else {
-		// use the whole buffer
-		body = source_buffer[:]
-
-		// buffer is complete
-		reader_status = io.EOF
-	}
-
-	pending_requests := make([]sliceRequest, 0)
-
-	for {
-		select {
-		case req, valid := <-requests:
-			// Handle a buffer read request
-			if valid {
-				if !handleReadRequest(req, body, reader_status) {
-					pending_requests = append(pending_requests, req)
-				}
-			} else {
-				// closed 'requests' channel indicates we're done
-				return
-			}
-
-		case bk, valid := <-slices:
-			// Got a new slice from the reader
-			if valid {
-				reader_status = bk.reader_error
-
-				if bk.slice != nil {
-					// adjust body bounds now that another slice has been read
-					body = source_buffer[0 : len(body)+len(bk.slice)]
-				}
-
-				// handle pending reads
-				n := 0
-				for n < len(pending_requests) {
-					if handleReadRequest(pending_requests[n], body, reader_status) {
-						// move the element from the back of the slice to
-						// position 'n', then shorten the slice by one element
-						pending_requests[n] = pending_requests[len(pending_requests)-1]
-						pending_requests = pending_requests[0 : len(pending_requests)-1]
-					} else {
-
-						// Request wasn't handled, so keep it in the request slice
-						n += 1
-					}
-				}
-			} else {
-				if reader_status == nil {
-					// slices channel closed without signaling EOF
-					reader_status = io.ErrUnexpectedEOF
-				}
-				slices = nil
-			}
-		}
-	}
-}
-
-func (this *AsyncStream) readersMonitor() {
-	var readers int = 0
-
-	for {
-		if readers == 0 {
-			select {
-			case _, ok := <-this.wait_zero_readers:
-				if ok {
-					// nothing, just implicitly unblock the sender
-				} else {
-					return
-				}
-			case _, ok := <-this.add_reader:
-				if ok {
-					readers += 1
-				} else {
-					return
-				}
-			}
-		} else if readers > 0 && readers < MAX_READERS {
-			select {
-			case _, ok := <-this.add_reader:
-				if ok {
-					readers += 1
-				} else {
-					return
-				}
-
-			case _, ok := <-this.subtract_reader:
-				if ok {
-					readers -= 1
-				} else {
-					return
-				}
-			}
-		} else if readers == MAX_READERS {
-			_, ok := <-this.subtract_reader
-			if ok {
-				readers -= 1
-			} else {
-				return
-			}
-		}
-	}
-}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list