[ARVADOS] updated: 3986815ae5e7e61c48f3ed979c32358710ef7e20
git at public.curoverse.com
git at public.curoverse.com
Tue May 20 17:12:34 EDT 2014
Summary of changes:
.../{buffer/buffer.go => streamer/streamer.go} | 50 +++++++++++-----------
.../buffer_test.go => streamer/streamer_test.go} | 18 ++++----
.../arvados.org/{buffer => streamer}/transfer.go | 12 +++---
3 files changed, 39 insertions(+), 41 deletions(-)
rename sdk/go/src/arvados.org/{buffer/buffer.go => streamer/streamer.go} (56%)
rename sdk/go/src/arvados.org/{buffer/buffer_test.go => streamer/streamer_test.go} (95%)
rename sdk/go/src/arvados.org/{buffer => streamer}/transfer.go (96%)
via 3986815ae5e7e61c48f3ed979c32358710ef7e20 (commit)
via a2273675f29a0f85cb80e62b3742e82d63c365e9 (commit)
via 64aac6153e1819738d9d80e156572aeb9bf07f97 (commit)
from 941bcf698f1cfb498510a13f23d3c9d403b0435f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 3986815ae5e7e61c48f3ed979c32358710ef7e20
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 20 17:12:10 2014 -0400
mend
diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index b886134..0b8755f 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -39,7 +39,7 @@ type StreamReader struct {
responses chan readResult
}
-func AsyncReaderStream(buffersize int, source io.Reader) *AsyncStream {
+func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
buf := make([]byte, buffersize)
t := &AsyncStream{make(chan readRequest), make(chan error)}
@@ -49,7 +49,7 @@ func AsyncReaderStream(buffersize int, source io.Reader) *AsyncStream {
return t
}
-func AsyncSliceStream(buf []byte) *AsyncStream {
+func AsyncStreamFromSlice(buf []byte) *AsyncStream {
t := &AsyncStream{make(chan readRequest), nil}
go transfer(buf, nil, t.requests, nil)
diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go
index 3c2d83a..9e24cfb 100644
--- a/sdk/go/src/arvados.org/streamer/streamer_test.go
+++ b/sdk/go/src/arvados.org/streamer/streamer_test.go
@@ -144,7 +144,7 @@ func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
func (s *StandaloneSuite) TestTransfer(c *C) {
reader, writer := io.Pipe()
- tr := StartTransferFromReader(512, reader)
+ tr := AsyncStreamFromReader(512, reader)
br1 := tr.MakeStreamReader()
out := make([]byte, 128)
@@ -288,7 +288,7 @@ func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
buffer[i] = byte(i)
}
- tr := StartTransferFromSlice(buffer)
+ tr := AsyncStreamFromSlice(buffer)
br1 := tr.MakeStreamReader()
@@ -328,7 +328,7 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
buffer[i] = byte(i)
}
- tr := StartTransferFromSlice(buffer)
+ tr := AsyncStreamFromSlice(buffer)
br1 := tr.MakeStreamReader()
commit a2273675f29a0f85cb80e62b3742e82d63c365e9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 20 17:09:06 2014 -0400
2798: Renamed TransferBuffer->AsyncStream, BufferReader->StreamReader,
StartTransferFromReader->AsyncStreamFromReader,
StartTransferFromSlice->AsyncStreamFromSlice. Changed structs to use pointer
receivers instead of embedded pointer fields. Updated tests.
diff --git a/sdk/go/src/arvados.org/streamer/streamer.go b/sdk/go/src/arvados.org/streamer/streamer.go
index 12475aa..b886134 100644
--- a/sdk/go/src/arvados.org/streamer/streamer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -8,7 +8,7 @@ Begin reading into a buffer with maximum size 'buffersize' from 'source':
tr := StartTransferFromReader(buffersize, source)
To create a new reader (this can be called multiple times):
- r := tr.MakeBufferReader()
+ r := tr.MakeStreamReader()
When you're done with the buffer:
tr.Close()
@@ -16,7 +16,7 @@ When you're done with the buffer:
Alternately, if you already have a filled buffer and just want to read out from it:
tr := StartTransferFromSlice(buf)
- r := tr.MakeBufferReader()
+ r := tr.MakeStreamReader()
tr.Close()
*/
@@ -25,87 +25,85 @@ package streamer
import (
"io"
- "log"
)
-type TransferBuffer struct {
+type AsyncStream struct {
requests chan readRequest
Reader_status chan error
}
// Reads from the buffer managed by the Transfer()
-type BufferReader struct {
- offset *int
+type StreamReader struct {
+ offset int
requests chan<- readRequest
responses chan readResult
}
-func StartTransferFromReader(buffersize int, source io.Reader) TransferBuffer {
+func AsyncReaderStream(buffersize int, source io.Reader) *AsyncStream {
buf := make([]byte, buffersize)
- t := TransferBuffer{make(chan readRequest), make(chan error)}
+ t := &AsyncStream{make(chan readRequest), make(chan error)}
go transfer(buf, source, t.requests, t.Reader_status)
return t
}
-func StartTransferFromSlice(buf []byte) TransferBuffer {
- t := TransferBuffer{make(chan readRequest), nil}
+func AsyncSliceStream(buf []byte) *AsyncStream {
+ t := &AsyncStream{make(chan readRequest), nil}
go transfer(buf, nil, t.requests, nil)
return t
}
-func (this TransferBuffer) MakeBufferReader() BufferReader {
- return BufferReader{new(int), this.requests, make(chan readResult)}
+func (this *AsyncStream) MakeStreamReader() *StreamReader {
+ return &StreamReader{0, this.requests, make(chan readResult)}
}
// Reads from the buffer managed by the Transfer()
-func (this BufferReader) Read(p []byte) (n int, err error) {
- this.requests <- readRequest{*this.offset, len(p), this.responses}
+func (this *StreamReader) Read(p []byte) (n int, err error) {
+ this.requests <- readRequest{this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
- *this.offset += len(rr.slice)
+ this.offset += len(rr.slice)
return copy(p, rr.slice), rr.err
} else {
return 0, io.ErrUnexpectedEOF
}
}
-func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
// Record starting offset in order to correctly report the number of bytes sent
- starting_offset := *this.offset
+ starting_offset := this.offset
for {
- this.requests <- readRequest{*this.offset, 32 * 1024, this.responses}
+ this.requests <- readRequest{this.offset, 32 * 1024, this.responses}
rr, valid := <-this.responses
if valid {
- log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
- *this.offset += len(rr.slice)
+ this.offset += len(rr.slice)
if rr.err != nil {
if rr.err == io.EOF {
// EOF is not an error.
- return int64(*this.offset - starting_offset), nil
+ return int64(this.offset - starting_offset), nil
} else {
- return int64(*this.offset - starting_offset), rr.err
+ return int64(this.offset - starting_offset), rr.err
}
} else {
dest.Write(rr.slice)
}
} else {
- return int64(*this.offset), io.ErrUnexpectedEOF
+ return int64(this.offset), io.ErrUnexpectedEOF
}
}
}
// Close the responses channel
-func (this BufferReader) Close() error {
+func (this *StreamReader) Close() error {
close(this.responses)
return nil
}
-func (this TransferBuffer) Close() {
+func (this *AsyncStream) Close() {
close(this.requests)
if this.Reader_status != nil {
close(this.Reader_status)
diff --git a/sdk/go/src/arvados.org/streamer/streamer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go
index 54f1bc7..3c2d83a 100644
--- a/sdk/go/src/arvados.org/streamer/streamer_test.go
+++ b/sdk/go/src/arvados.org/streamer/streamer_test.go
@@ -146,7 +146,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
tr := StartTransferFromReader(512, reader)
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
out := make([]byte, 128)
{
@@ -210,7 +210,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
}
}
- br2 := tr.MakeBufferReader()
+ br2 := tr.MakeStreamReader()
{
// Test 'catch up' reader
in := make([]byte, 256)
@@ -241,7 +241,7 @@ func (s *StandaloneSuite) TestTransfer(c *C) {
{
// Test 'catch up' reader after closing
- br3 := tr.MakeBufferReader()
+ br3 := tr.MakeStreamReader()
in := make([]byte, 256)
n, err := br3.Read(in)
@@ -290,7 +290,7 @@ func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
tr := StartTransferFromSlice(buffer)
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
in := make([]byte, 64)
{
@@ -330,7 +330,7 @@ func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
tr := StartTransferFromSlice(buffer)
- br1 := tr.MakeBufferReader()
+ br1 := tr.MakeStreamReader()
reader, writer := io.Pipe()
diff --git a/sdk/go/src/arvados.org/streamer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go
index b017dbf..ab8f941 100644
--- a/sdk/go/src/arvados.org/streamer/transfer.go
+++ b/sdk/go/src/arvados.org/streamer/transfer.go
@@ -27,13 +27,13 @@ type readResult struct {
// Supports writing into a buffer
type bufferWriter struct {
buf []byte
- ptr *int
+ ptr int
}
// Copy p into this.buf, increment pointer and return number of bytes read.
-func (this bufferWriter) Write(p []byte) (n int, err error) {
- n = copy(this.buf[*this.ptr:], p)
- *this.ptr += n
+func (this *bufferWriter) Write(p []byte) (n int, err error) {
+ n = copy(this.buf[this.ptr:], p)
+ this.ptr += n
return n, nil
}
@@ -44,7 +44,7 @@ func readIntoBuffer(buffer []byte, r io.Reader, slices chan<- readerSlice) {
defer close(slices)
if writeto, ok := r.(io.WriterTo); ok {
- n, err := writeto.WriteTo(bufferWriter{buffer, new(int)})
+ n, err := writeto.WriteTo(&bufferWriter{buffer, 0})
if err != nil {
slices <- readerSlice{nil, err}
} else {
commit 64aac6153e1819738d9d80e156572aeb9bf07f97
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 20 16:40:11 2014 -0400
2798: Renamed 'buffer' package to 'streamer'
diff --git a/sdk/go/src/arvados.org/buffer/buffer.go b/sdk/go/src/arvados.org/streamer/streamer.go
similarity index 99%
rename from sdk/go/src/arvados.org/buffer/buffer.go
rename to sdk/go/src/arvados.org/streamer/streamer.go
index e2ee1c4..12475aa 100644
--- a/sdk/go/src/arvados.org/buffer/buffer.go
+++ b/sdk/go/src/arvados.org/streamer/streamer.go
@@ -21,7 +21,7 @@ Alternately, if you already have a filled buffer and just want to read out from
*/
-package buffer
+package streamer
import (
"io"
diff --git a/sdk/go/src/arvados.org/buffer/buffer_test.go b/sdk/go/src/arvados.org/streamer/streamer_test.go
similarity index 99%
rename from sdk/go/src/arvados.org/buffer/buffer_test.go
rename to sdk/go/src/arvados.org/streamer/streamer_test.go
index f35b110..54f1bc7 100644
--- a/sdk/go/src/arvados.org/buffer/buffer_test.go
+++ b/sdk/go/src/arvados.org/streamer/streamer_test.go
@@ -1,4 +1,4 @@
-package buffer
+package streamer
import (
. "gopkg.in/check.v1"
diff --git a/sdk/go/src/arvados.org/buffer/transfer.go b/sdk/go/src/arvados.org/streamer/transfer.go
similarity index 99%
rename from sdk/go/src/arvados.org/buffer/transfer.go
rename to sdk/go/src/arvados.org/streamer/transfer.go
index 6dbc921..b017dbf 100644
--- a/sdk/go/src/arvados.org/buffer/transfer.go
+++ b/sdk/go/src/arvados.org/streamer/transfer.go
@@ -1,4 +1,4 @@
-package buffer
+package streamer
import (
"io"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list