[ARVADOS] created: 1.1.0-165-g4f1a135
Git user
git at public.curoverse.com
Thu Nov 23 01:30:02 EST 2017
at 4f1a135e93df78bb833dff32562efe713c6f690e (commit)
commit 4f1a135e93df78bb833dff32562efe713c6f690e
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Nov 23 01:04:38 2017 -0500
12475: Add TestManyFailedPuts with a short timeout.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index 25619bb..23bb2bd 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -11,10 +11,12 @@ import (
"fmt"
"io/ioutil"
"log"
+ "math/rand"
"net/http"
"net/http/httptest"
"os"
"strings"
+ "sync"
"testing"
"time"
@@ -216,6 +218,33 @@ func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
}
}
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+ router.(*proxyHandler).timeout = time.Nanosecond
+
+ buf := make([]byte, 1<<20)
+ rand.Read(buf)
+ var wg sync.WaitGroup
+ for i := 0; i < 128; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ kc.PutB(buf)
+ }()
+ }
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Error("timeout")
+ }
+}
+
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
commit abc241fb83523ae5ae5905ae47210f15d7e0671c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Wed Nov 22 23:11:43 2017 -0500
12475: Rewrite streamer -> asyncbuf.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 433685c..3cfc692 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -100,7 +100,7 @@ sdk/go/health
sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
-sdk/go/streamer
+sdk/go/asyncbuf
sdk/go/stats
sdk/go/crunchrunner
sdk/cwl
@@ -829,7 +829,7 @@ gostuff=(
sdk/go/health
sdk/go/httpserver
sdk/go/manifest
- sdk/go/streamer
+ sdk/go/asyncbuf
sdk/go/crunchrunner
sdk/go/stats
lib/crunchstat
diff --git a/sdk/go/asyncbuf/buf.go b/sdk/go/asyncbuf/buf.go
new file mode 100644
index 0000000..b3b9bf2
--- /dev/null
+++ b/sdk/go/asyncbuf/buf.go
@@ -0,0 +1,101 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+// A Buffer is an io.Writer that distributes written data
+// asynchronously to multiple concurrent readers.
+//
+// NewReader() can be called at any time. In all cases, every returned
+// io.Reader reads all data written to the Buffer.
+//
+// Behavior is undefined if Write is called after Close or
+// CloseWithError.
+type Buffer interface {
+ io.WriteCloser
+
+ // NewReader() returns an io.Reader that reads all data
+ // written to the Buffer.
+ NewReader() io.Reader
+
+ // Close, but return the given error (instead of io.EOF) to
+ // all readers when they reach the end of the buffer.
+ //
+ // CloseWithError(nil) is equivalent to
+ // CloseWithError(io.EOF).
+ CloseWithError(error) error
+}
+
+type buffer struct {
+ data *bytes.Buffer
+ cond sync.Cond
+ err error // nil if there might be more writes
+}
+
+// NewBuffer creates a new Buffer using buf as its initial
+// contents. The new Buffer takes ownership of buf, and the caller
+// should not use buf after this call.
+func NewBuffer(buf []byte) Buffer {
+ return &buffer{
+ data: bytes.NewBuffer(buf),
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (b *buffer) Write(p []byte) (int, error) {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if b.err != nil {
+ return 0, b.err
+ }
+ return b.data.Write(p)
+}
+
+func (b *buffer) Close() error {
+ return b.CloseWithError(nil)
+}
+
+func (b *buffer) CloseWithError(err error) error {
+ defer b.cond.Broadcast()
+ b.cond.L.Lock()
+ defer b.cond.L.Unlock()
+ if err == nil {
+ b.err = io.EOF
+ } else {
+ b.err = err
+ }
+ return nil
+}
+
+func (b *buffer) NewReader() io.Reader {
+ return &reader{b: b}
+}
+
+type reader struct {
+ b *buffer
+ read int // # bytes already read
+}
+
+func (r *reader) Read(p []byte) (int, error) {
+ r.b.cond.L.Lock()
+ defer r.b.cond.L.Unlock()
+ for {
+ if r.b.data.Len() > r.read || len(p) == 0 {
+ n := copy(p, r.b.data.Bytes()[r.read:])
+ r.read += n
+ return n, nil
+ }
+ if r.b.err != nil {
+ return 0, r.b.err
+ }
+ r.b.cond.Wait()
+ }
+}
diff --git a/sdk/go/asyncbuf/buf_test.go b/sdk/go/asyncbuf/buf_test.go
new file mode 100644
index 0000000..845853b
--- /dev/null
+++ b/sdk/go/asyncbuf/buf_test.go
@@ -0,0 +1,159 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package asyncbuf
+
+import (
+ "crypto/md5"
+ "errors"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "testing"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestNoWrites(c *check.C) {
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ b.Close()
+ s.checkReader(c, r1, []byte{}, nil, nil)
+ s.checkReader(c, r2, []byte{}, nil, nil)
+}
+
+func (s *Suite) TestNoReaders(c *check.C) {
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ err2 := b.Close()
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ c.Check(err2, check.IsNil)
+}
+
+func (s *Suite) TestWriteReadClose(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer(nil)
+ n, err := b.Write([]byte("foobar"))
+ c.Check(n, check.Equals, 6)
+ c.Check(err, check.IsNil)
+ r1 := b.NewReader()
+ r2 := b.NewReader()
+ go s.checkReader(c, r1, []byte("foobar"), nil, done)
+ go s.checkReader(c, r2, []byte("foobar"), nil, done)
+ time.Sleep(time.Millisecond)
+ c.Check(len(done), check.Equals, 0)
+ b.Close()
+ <-done
+ <-done
+}
+
+func (s *Suite) TestPrefillWriteCloseRead(c *check.C) {
+ done := make(chan bool, 2)
+ b := NewBuffer([]byte("baz"))
+ n, err := b.Write([]byte("waz"))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ b.Close()
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwaz"), nil, done)
+ r2 := b.NewReader()
+ go s.checkReader(c, r2, []byte("bazwaz"), nil, done)
+ <-done
+ <-done
+}
+
+func (s *Suite) TestWriteReadCloseRead(c *check.C) {
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), nil, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.Close()
+
+ s.checkReader(c, r2, []byte("wazqux"), nil, nil)
+ <-done
+}
+
+func (s *Suite) TestCloseWithError(c *check.C) {
+ errFake := errors.New("it's not even a real error")
+
+ done := make(chan bool, 1)
+ b := NewBuffer(nil)
+ r1 := b.NewReader()
+ go s.checkReader(c, r1, []byte("bazwazqux"), errFake, done)
+
+ b.Write([]byte("bazwaz"))
+
+ r2 := b.NewReader()
+ r2.Read(make([]byte, 3))
+
+ b.Write([]byte("qux"))
+ b.CloseWithError(errFake)
+
+ s.checkReader(c, r2, []byte("wazqux"), errFake, nil)
+ <-done
+}
+
+// Write n*n bytes, n at a time; read them into n goroutines using
+// varying buffer sizes; compare checksums.
+func (s *Suite) TestManyReaders(c *check.C) {
+ const n = 256
+
+ b := NewBuffer(nil)
+
+ expectSum := make(chan []byte)
+ go func() {
+ hash := md5.New()
+ buf := make([]byte, n)
+ for i := 0; i < n; i++ {
+ time.Sleep(10 * time.Nanosecond)
+ rand.Read(buf)
+ b.Write(buf)
+ hash.Write(buf)
+ }
+ expectSum <- hash.Sum(nil)
+ b.Close()
+ }()
+
+ gotSum := make(chan []byte)
+ for i := 0; i < n; i++ {
+ go func(bufSize int) {
+ got := md5.New()
+ io.CopyBuffer(got, b.NewReader(), make([]byte, bufSize))
+ gotSum <- got.Sum(nil)
+ }(i + n/2)
+ }
+
+ expect := <-expectSum
+ for i := 0; i < n; i++ {
+ c.Check(expect, check.DeepEquals, <-gotSum)
+ }
+}
+
+func (s *Suite) checkReader(c *check.C, r io.Reader, expectData []byte, expectError error, done chan bool) {
+ buf, err := ioutil.ReadAll(r)
+ c.Check(err, check.Equals, expectError)
+ c.Check(buf, check.DeepEquals, expectData)
+ if done != nil {
+ done <- true
+ }
+}
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
diff --git a/sdk/go/keepclient/hashcheck.go b/sdk/go/keepclient/hashcheck.go
index 726b813..9295c14 100644
--- a/sdk/go/keepclient/hashcheck.go
+++ b/sdk/go/keepclient/hashcheck.go
@@ -72,19 +72,16 @@ func (this HashCheckingReader) Close() (err error) {
_, err = io.Copy(this.Hash, this.Reader)
if closer, ok := this.Reader.(io.Closer); ok {
- err2 := closer.Close()
- if err2 != nil && err == nil {
- return err2
+ closeErr := closer.Close()
+ if err == nil {
+ err = closeErr
}
}
if err != nil {
return err
}
-
- sum := this.Hash.Sum(nil)
- if fmt.Sprintf("%x", sum) != this.Check {
- err = BadChecksum
+ if fmt.Sprintf("%x", this.Hash.Sum(nil)) != this.Check {
+ return BadChecksum
}
-
- return err
+ return nil
}
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index cbfad81..37d651e 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -21,7 +21,7 @@ import (
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
+ "git.curoverse.com/arvados.git/sdk/go/asyncbuf"
)
// A Keep "block" is 64MB.
@@ -156,10 +156,12 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
bufsize = BLOCKSIZE
}
- t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
- defer t.Close()
-
- return kc.putReplicas(hash, t, dataBytes)
+ buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
+ buf.CloseWithError(err)
+ }()
+ return kc.putReplicas(hash, buf.NewReader, dataBytes)
}
// PutHB writes a block to Keep. The hash of the bytes is given in
@@ -167,9 +169,8 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- t := streamer.AsyncStreamFromSlice(buf)
- defer t.Close()
- return kc.putReplicas(hash, t, int64(len(buf)))
+ newReader := func() io.Reader { return bytes.NewBuffer(buf) }
+ return kc.putReplicas(hash, newReader, int64(len(buf)))
}
// PutB writes a block to Keep. It computes the hash itself.
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 3ce4e74..055141c 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -5,6 +5,7 @@
package keepclient
import (
+ "bytes"
"crypto/md5"
"errors"
"fmt"
@@ -20,7 +21,6 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
. "gopkg.in/check.v1"
)
@@ -172,18 +172,8 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
make(chan string)}
UploadToStubHelper(c, st,
- func(kc *KeepClient, url string, reader io.ReadCloser,
- writer io.WriteCloser, upload_status chan uploadStatus) {
-
- tr := streamer.AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
-
- go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
- writer.Write([]byte("foo"))
- writer.Close()
+ func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+ go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, 0)
<-st.handled
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 49ef543..3791250 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -17,7 +17,6 @@ import (
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
)
// Function used to emit debug messages. The easiest way to enable
@@ -57,7 +56,7 @@ type uploadStatus struct {
response string
}
-func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
upload_status chan<- uploadStatus, expectedLength int64, requestID int32) {
var req *http.Request
@@ -66,21 +65,16 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
if req, err = http.NewRequest("PUT", url, nil); err != nil {
DebugPrintf("DEBUG: [%08x] Error creating request PUT %v error: %v", requestID, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
- body.Close()
return
}
req.ContentLength = expectedLength
if expectedLength > 0 {
- // Do() will close the body ReadCloser when it is done
- // with it.
- req.Body = body
+ req.Body = ioutil.NopCloser(body)
} else {
- // "For client requests, a value of 0 means unknown if Body is
- // not nil." In this case we do want the body to be empty, so
- // don't set req.Body. However, we still need to close the
- // body ReadCloser.
- body.Close()
+ // "For client requests, a value of 0 means unknown if
+ // Body is not nil." In this case we do want the body
+ // to be empty, so don't set req.Body.
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
@@ -121,7 +115,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
func (this *KeepClient) putReplicas(
hash string,
- tr *streamer.AsyncStream,
+ getReader func() io.Reader,
expectedLength int64) (locator string, replicas int, err error) {
// Generate an arbitrary ID to identify this specific
@@ -174,7 +168,7 @@ func (this *KeepClient) putReplicas(
// Start some upload requests
if next_server < len(sv) {
DebugPrintf("DEBUG: [%08x] Begin upload %s to %s", requestID, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestID)
+ go this.uploadToKeepServer(sv[next_server], hash, getReader(), upload_status, expectedLength, requestID)
next_server += 1
active += 1
} else {
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
deleted file mode 100644
index 396e311..0000000
--- a/sdk/go/streamer/streamer.go
+++ /dev/null
@@ -1,158 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* AsyncStream pulls data in from a io.Reader source (such as a file or network
-socket) and fans out to any number of StreamReader sinks.
-
-Unlike io.TeeReader() or io.MultiWriter(), new StreamReaders can be created at
-any point in the lifetime of the AsyncStream, and each StreamReader will read
-the contents of the buffer up to the "frontier" of the buffer, at which point
-the StreamReader blocks until new data is read from the source.
-
-This is useful for minimizing readthrough latency as sinks can read and act on
-data from the source without waiting for the source to be completely buffered.
-It is also useful as a cache in situations where re-reading the original source
-potentially is costly, since the buffer retains a copy of the source data.
-
-Usage:
-
-Begin reading into a buffer with maximum size 'buffersize' from 'source':
- stream := AsyncStreamFromReader(buffersize, source)
-
-To create a new reader (this can be called multiple times, each reader starts
-at the beginning of the buffer):
- reader := tr.MakeStreamReader()
-
-Make sure to close the reader when you're done with it.
- reader.Close()
-
-When you're done with the stream:
- stream.Close()
-
-Alternately, if you already have a filled buffer and just want to read out from it:
- stream := AsyncStreamFromSlice(buf)
-
- r := tr.MakeStreamReader()
-
-*/
-
-package streamer
-
-import (
- "errors"
- "io"
-)
-
-var ErrAlreadyClosed = errors.New("cannot close a stream twice")
-
-type AsyncStream struct {
- buffer []byte
- requests chan sliceRequest
- add_reader chan bool
- subtract_reader chan bool
- wait_zero_readers chan bool
- closed bool
-}
-
-// Reads from the buffer managed by the Transfer()
-type StreamReader struct {
- offset int
- stream *AsyncStream
- responses chan sliceResult
-}
-
-func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
- t := &AsyncStream{
- buffer: make([]byte, buffersize),
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(source)
- go t.readersMonitor()
-
- return t
-}
-
-func AsyncStreamFromSlice(buf []byte) *AsyncStream {
- t := &AsyncStream{
- buffer: buf,
- requests: make(chan sliceRequest),
- add_reader: make(chan bool),
- subtract_reader: make(chan bool),
- wait_zero_readers: make(chan bool),
- }
-
- go t.transfer(nil)
- go t.readersMonitor()
-
- return t
-}
-
-func (this *AsyncStream) MakeStreamReader() *StreamReader {
- this.add_reader <- true
- return &StreamReader{0, this, make(chan sliceResult)}
-}
-
-// Reads from the buffer managed by the Transfer()
-func (this *StreamReader) Read(p []byte) (n int, err error) {
- this.stream.requests <- sliceRequest{this.offset, len(p), this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- return copy(p, rr.slice), rr.err
- } else {
- return 0, io.ErrUnexpectedEOF
- }
-}
-
-func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
- // Record starting offset in order to correctly report the number of bytes sent
- starting_offset := this.offset
- for {
- this.stream.requests <- sliceRequest{this.offset, 32 * 1024, this.responses}
- rr, valid := <-this.responses
- if valid {
- this.offset += len(rr.slice)
- if rr.err != nil {
- if rr.err == io.EOF {
- // EOF is not an error.
- return int64(this.offset - starting_offset), nil
- } else {
- return int64(this.offset - starting_offset), rr.err
- }
- } else {
- dest.Write(rr.slice)
- }
- } else {
- return int64(this.offset), io.ErrUnexpectedEOF
- }
- }
-}
-
-// Close the responses channel
-func (this *StreamReader) Close() error {
- if this.stream == nil {
- return ErrAlreadyClosed
- }
- this.stream.subtract_reader <- true
- close(this.responses)
- this.stream = nil
- return nil
-}
-
-func (this *AsyncStream) Close() error {
- if this.closed {
- return ErrAlreadyClosed
- }
- this.closed = true
- this.wait_zero_readers <- true
- close(this.requests)
- close(this.add_reader)
- close(this.subtract_reader)
- close(this.wait_zero_readers)
- return nil
-}
diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go
deleted file mode 100644
index f8ddbf5..0000000
--- a/sdk/go/streamer/streamer_test.go
+++ /dev/null
@@ -1,381 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package streamer
-
-import (
- . "gopkg.in/check.v1"
- "io"
- "testing"
- "time"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) { TestingT(t) }
-
-var _ = Suite(&StandaloneSuite{})
-
-// Standalone tests
-type StandaloneSuite struct{}
-
-func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
- ReadIntoBufferHelper(c, 225)
- ReadIntoBufferHelper(c, 224)
-}
-
-func HelperWrite128andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 128)
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 128)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 128; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func HelperWrite96andCheck(c *C, buffer []byte, writer io.Writer, slices chan nextSlice) {
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
- writer.Write(out)
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 96)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 96; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 96) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-}
-
-func ReadIntoBufferHelper(c *C, bufsize int) {
- buffer := make([]byte, bufsize)
-
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
- HelperWrite96andCheck(c, buffer, writer, slices)
-
- writer.Close()
- s1 := <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.EOF)
-}
-
-func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
- buffer := make([]byte, 223)
- reader, writer := io.Pipe()
- slices := make(chan nextSlice)
-
- go readIntoBuffer(buffer, reader, slices)
-
- HelperWrite128andCheck(c, buffer, writer, slices)
-
- out := make([]byte, 96)
- for i := 0; i < 96; i += 1 {
- out[i] = byte(i / 2)
- }
-
- // Write will deadlock because it can't write all the data, so
- // spin it off to a goroutine
- go writer.Write(out)
- s1 := <-slices
-
- c.Check(len(s1.slice), Equals, 95)
- c.Check(s1.reader_error, Equals, nil)
- for i := 0; i < 95; i += 1 {
- c.Check(s1.slice[i], Equals, byte(i/2))
- }
- for i := 0; i < len(buffer); i += 1 {
- if i < 128 {
- c.Check(buffer[i], Equals, byte(i))
- } else if i < (128 + 95) {
- c.Check(buffer[i], Equals, byte((i-128)/2))
- } else {
- c.Check(buffer[i], Equals, byte(0))
- }
- }
-
- writer.Close()
- s1 = <-slices
- c.Check(len(s1.slice), Equals, 0)
- c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransfer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
-
- br1 := tr.MakeStreamReader()
- out := make([]byte, 128)
-
- {
- // Write some data, and read into a buffer shorter than
- // available data
- for i := 0; i < 128; i += 1 {
- out[i] = byte(i)
- }
-
- writer.Write(out[:100])
-
- in := make([]byte, 64)
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Write some more data, and read into buffer longer than
- // available data
- in := make([]byte, 64)
- n, err := br1.Read(in)
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, out[64+i])
- }
-
- }
-
- {
- // Test read before write
- type Rd struct {
- n int
- err error
- }
- rd := make(chan Rd)
- in := make([]byte, 64)
-
- go func() {
- n, err := br1.Read(in)
- rd <- Rd{n, err}
- }()
-
- time.Sleep(100 * time.Millisecond)
- writer.Write(out[100:])
-
- got := <-rd
-
- c.Check(got.n, Equals, 28)
- c.Check(got.err, Equals, nil)
-
- for i := 0; i < 28; i += 1 {
- c.Check(in[i], Equals, out[100+i])
- }
- }
-
- br2 := tr.MakeStreamReader()
- {
- // Test 'catch up' reader
- in := make([]byte, 256)
- n, err := br2.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
- }
-
- {
- // Test closing the reader
- writer.Close()
-
- in := make([]byte, 256)
- n1, err1 := br1.Read(in)
- n2, err2 := br2.Read(in)
- c.Check(n1, Equals, 0)
- c.Check(err1, Equals, io.EOF)
- c.Check(n2, Equals, 0)
- c.Check(err2, Equals, io.EOF)
- }
-
- {
- // Test 'catch up' reader after closing
- br3 := tr.MakeStreamReader()
- in := make([]byte, 256)
- n, err := br3.Read(in)
-
- c.Check(n, Equals, 128)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 128; i += 1 {
- c.Check(in[i], Equals, out[i])
- }
-
- n, err = br3.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(100, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- defer sr.Close()
-
- out := make([]byte, 101)
- go writer.Write(out)
-
- n, err := sr.Read(out)
- c.Check(n, Equals, 100)
- c.Check(err, IsNil)
-
- n, err = sr.Read(out)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.ErrShortBuffer)
-}
-
-func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
-
- br1 := tr.MakeStreamReader()
-
- in := make([]byte, 64)
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 64)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 64; i += 1 {
- c.Check(in[i], Equals, buffer[i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 36)
- c.Check(err, Equals, nil)
-
- for i := 0; i < 36; i += 1 {
- c.Check(in[i], Equals, buffer[64+i])
- }
- }
- {
- n, err := br1.Read(in)
-
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }
-}
-
-func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
- // Buffer for reads from 'r'
- buffer := make([]byte, 100)
- for i := 0; i < 100; i += 1 {
- buffer[i] = byte(i)
- }
-
- tr := AsyncStreamFromSlice(buffer)
- defer tr.Close()
-
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- reader, writer := io.Pipe()
-
- go func() {
- p := make([]byte, 100)
- n, err := reader.Read(p)
- c.Check(n, Equals, 100)
- c.Check(err, Equals, nil)
- c.Check(p, DeepEquals, buffer)
- }()
-
- io.Copy(writer, br1)
-}
-
-func (s *StandaloneSuite) TestManyReaders(c *C) {
- reader, writer := io.Pipe()
-
- tr := AsyncStreamFromReader(512, reader)
- defer tr.Close()
-
- sr := tr.MakeStreamReader()
- go func() {
- time.Sleep(100 * time.Millisecond)
- sr.Close()
- }()
-
- for i := 0; i < 200; i += 1 {
- go func() {
- br1 := tr.MakeStreamReader()
- defer br1.Close()
-
- p := make([]byte, 3)
- n, err := br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("foo"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("bar"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 3)
- c.Check(p[0:3], DeepEquals, []byte("baz"))
-
- n, err = br1.Read(p)
- c.Check(n, Equals, 0)
- c.Check(err, Equals, io.EOF)
- }()
- }
-
- writer.Write([]byte("foo"))
- writer.Write([]byte("bar"))
- writer.Write([]byte("baz"))
- writer.Close()
-}
-
-func (s *StandaloneSuite) TestMultipleCloseNoPanic(c *C) {
- buffer := make([]byte, 100)
- tr := AsyncStreamFromSlice(buffer)
- sr := tr.MakeStreamReader()
- c.Check(sr.Close(), IsNil)
- c.Check(sr.Close(), Equals, ErrAlreadyClosed)
- c.Check(tr.Close(), IsNil)
- c.Check(tr.Close(), Equals, ErrAlreadyClosed)
-}
diff --git a/sdk/go/streamer/transfer.go b/sdk/go/streamer/transfer.go
deleted file mode 100644
index bea27f8..0000000
--- a/sdk/go/streamer/transfer.go
+++ /dev/null
@@ -1,310 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-/* Internal implementation of AsyncStream.
-Outline of operation:
-
-The kernel is the transfer() goroutine. It manages concurrent reads and
-appends to the "body" slice. "body" is a slice of "source_buffer" that
-represents the segment of the buffer that is already filled in and available
-for reading.
-
-To fill in the buffer, transfer() starts the readIntoBuffer() goroutine to read
-from the io.Reader source directly into source_buffer. Each read goes into a
-slice of buffer which spans the section immediately following the end of the
-current "body". Each time a Read completes, a slice representing the the
-section just filled in (or any read errors/EOF) is sent over the "slices"
-channel back to the transfer() function.
-
-Meanwhile, the transfer() function selects() on two channels, the "requests"
-channel and the "slices" channel.
-
-When a message is received on the "slices" channel, this means the a new
-section of the buffer has data, or an error is signaled. Since the data has
-been read directly into the source_buffer, it is able to simply increases the
-size of the body slice to encompass the newly filled in section. Then any
-pending reads are serviced with handleReadRequest (described below).
-
-When a message is received on the "requests" channel, it means a StreamReader
-wants access to a slice of the buffer. This is passed to handleReadRequest().
-
-The handleReadRequest() function takes a sliceRequest consisting of a buffer
-offset, maximum size, and channel to send the response. If there was an error
-reported from the source reader, it is returned. If the offset is less than
-the size of the body, the request can proceed, and it sends a body slice
-spanning the segment from offset to min(offset+maxsize, end of the body). If
-source reader status is EOF (done filling the buffer) and the read request
-offset is beyond end of the body, it responds with EOF. Otherwise, the read
-request is for a slice beyond the current size of "body" but we expect the body
-to expand as more data is added, so the request gets added to a wait list.
-
-The transfer() runs until the requests channel is closed by AsyncStream.Close()
-
-To track readers, streamer uses the readersMonitor() goroutine. This goroutine
-chooses which channels to receive from based on the number of outstanding
-readers. When a new reader is created, it sends a message on the add_reader
-channel. If the number of readers is already at MAX_READERS, this blocks the
-sender until an existing reader is closed. When a reader is closed, it sends a
-message on the subtract_reader channel. Finally, when AsyncStream.Close() is
-called, it sends a message on the wait_zero_readers channel, which will block
-the sender unless there are zero readers and it is safe to shut down the
-AsyncStream.
-*/
-
-package streamer
-
-import (
- "io"
-)
-
-const MAX_READERS = 100
-
-// A slice passed from readIntoBuffer() to transfer()
-type nextSlice struct {
- slice []byte
- reader_error error
-}
-
-// A read request to the Transfer() function
-type sliceRequest struct {
- offset int
- maxsize int
- result chan<- sliceResult
-}
-
-// A read result from the Transfer() function
-type sliceResult struct {
- slice []byte
- err error
-}
-
-// Supports writing into a buffer
-type bufferWriter struct {
- buf []byte
- ptr int
-}
-
-// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this *bufferWriter) Write(p []byte) (n int, err error) {
- n = copy(this.buf[this.ptr:], p)
- this.ptr += n
- return n, nil
-}
-
-// Read repeatedly from the reader and write sequentially into the specified
-// buffer, and report each read to channel 'c'. Completes when Reader 'r'
-// reports on the error channel and closes channel 'c'.
-func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- nextSlice) {
- defer close(slices)
-
- if writeto, ok := r.(io.WriterTo); ok {
- n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
- if err != nil {
- slices <- nextSlice{nil, err}
- } else {
- slices <- nextSlice{buffer[:n], nil}
- slices <- nextSlice{nil, io.EOF}
- }
- return
- } else {
- // Initially entire buffer is available
- ptr := buffer[:]
- for {
- var n int
- var err error
- if len(ptr) > 0 {
- const readblock = 64 * 1024
- // Read 64KiB into the next part of the buffer
- if len(ptr) > readblock {
- n, err = r.Read(ptr[:readblock])
- } else {
- n, err = r.Read(ptr)
- }
- } else {
- // Ran out of buffer space, try reading one more byte
- var b [1]byte
- n, err = r.Read(b[:])
-
- if n > 0 {
- // Reader has more data but we have nowhere to
- // put it, so we're stuffed
- slices <- nextSlice{nil, io.ErrShortBuffer}
- } else {
- // Return some other error (hopefully EOF)
- slices <- nextSlice{nil, err}
- }
- return
- }
-
- // End on error (includes EOF)
- if err != nil {
- slices <- nextSlice{nil, err}
- return
- }
-
- if n > 0 {
- // Make a slice with the contents of the read
- slices <- nextSlice{ptr[:n], nil}
-
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- }
- }
-}
-
-// Handle a read request. Returns true if a response was sent, and false if
-// the request should be queued.
-func handleReadRequest(req sliceRequest, body []byte, reader_status error) bool {
- if (reader_status != nil) && (reader_status != io.EOF) {
- req.result <- sliceResult{nil, reader_status}
- return true
- } else if req.offset < len(body) {
- var end int
- if req.offset+req.maxsize < len(body) {
- end = req.offset + req.maxsize
- } else {
- end = len(body)
- }
- req.result <- sliceResult{body[req.offset:end], nil}
- return true
- } else if (reader_status == io.EOF) && (req.offset >= len(body)) {
- req.result <- sliceResult{nil, io.EOF}
- return true
- } else {
- return false
- }
-}
-
-// Mediates between reads and appends.
-// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
-// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
-// Accepts read requests on the buffer on the 'requests' channel. Completes
-// when 'requests' channel is closed.
-func (this *AsyncStream) transfer(source_reader io.Reader) {
- source_buffer := this.buffer
- requests := this.requests
-
- // currently buffered data
- var body []byte
-
- // for receiving slices from readIntoBuffer
- var slices chan nextSlice = nil
-
- // indicates the status of the underlying reader
- var reader_status error = nil
-
- if source_reader != nil {
- // 'body' is the buffer slice representing the body content read so far
- body = source_buffer[:0]
-
- // used to communicate slices of the buffer as they are
- // readIntoBuffer will close 'slices' when it is done with it
- slices = make(chan nextSlice)
-
- // Spin it off
- go readIntoBuffer(source_buffer, source_reader, slices)
- } else {
- // use the whole buffer
- body = source_buffer[:]
-
- // buffer is complete
- reader_status = io.EOF
- }
-
- pending_requests := make([]sliceRequest, 0)
-
- for {
- select {
- case req, valid := <-requests:
- // Handle a buffer read request
- if valid {
- if !handleReadRequest(req, body, reader_status) {
- pending_requests = append(pending_requests, req)
- }
- } else {
- // closed 'requests' channel indicates we're done
- return
- }
-
- case bk, valid := <-slices:
- // Got a new slice from the reader
- if valid {
- reader_status = bk.reader_error
-
- if bk.slice != nil {
- // adjust body bounds now that another slice has been read
- body = source_buffer[0 : len(body)+len(bk.slice)]
- }
-
- // handle pending reads
- n := 0
- for n < len(pending_requests) {
- if handleReadRequest(pending_requests[n], body, reader_status) {
- // move the element from the back of the slice to
- // position 'n', then shorten the slice by one element
- pending_requests[n] = pending_requests[len(pending_requests)-1]
- pending_requests = pending_requests[0 : len(pending_requests)-1]
- } else {
-
- // Request wasn't handled, so keep it in the request slice
- n += 1
- }
- }
- } else {
- if reader_status == nil {
- // slices channel closed without signaling EOF
- reader_status = io.ErrUnexpectedEOF
- }
- slices = nil
- }
- }
- }
-}
-
-func (this *AsyncStream) readersMonitor() {
- var readers int = 0
-
- for {
- if readers == 0 {
- select {
- case _, ok := <-this.wait_zero_readers:
- if ok {
- // nothing, just implicitly unblock the sender
- } else {
- return
- }
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
- }
- } else if readers > 0 && readers < MAX_READERS {
- select {
- case _, ok := <-this.add_reader:
- if ok {
- readers += 1
- } else {
- return
- }
-
- case _, ok := <-this.subtract_reader:
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- } else if readers == MAX_READERS {
- _, ok := <-this.subtract_reader
- if ok {
- readers -= 1
- } else {
- return
- }
- }
- }
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list