[ARVADOS] updated: fb7f238945e33b07f1c80b0623315c1ecf86bca2
git at public.curoverse.com
git at public.curoverse.com
Tue May 13 22:36:33 EDT 2014
Summary of changes:
sdk/go/src/arvados.org/keepclient/keepclient.go | 48 +++++++++++----
.../src/arvados.org/keepclient/keepclient_test.go | 69 ++++++++++++++++++----
2 files changed, 95 insertions(+), 22 deletions(-)
via fb7f238945e33b07f1c80b0623315c1ecf86bca2 (commit)
from 09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit fb7f238945e33b07f1c80b0623315c1ecf86bca2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 13 22:36:19 2014 -0400
2798: Read requests from Transfer() now return a slice. Added BufferReader
WriteTo() but for some reason http.Request Body isn't using it.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 93fcf4b..ce67503 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -208,15 +208,15 @@ func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
// A read request to the Transfer() function
type ReadRequest struct {
- offset int
- p []byte
- result chan<- ReadResult
+ offset int
+ maxsize int
+ result chan<- ReadResult
}
// A read result from the Transfer() function
type ReadResult struct {
- n int
- err error
+ slice []byte
+ err error
}
// Reads from the buffer managed by the Transfer()
@@ -232,16 +232,36 @@ func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
// Reads from the buffer managed by the Transfer()
func (this BufferReader) Read(p []byte) (n int, err error) {
- this.requests <- ReadRequest{*this.offset, p, this.responses}
+ log.Printf("BufferReader Read %d", len(p))
+ this.requests <- ReadRequest{*this.offset, len(p), this.responses}
rr, valid := <-this.responses
if valid {
- *this.offset += rr.n
- return rr.n, rr.err
+ *this.offset += len(rr.slice)
+ return copy(p, rr.slice), rr.err
} else {
return 0, io.ErrUnexpectedEOF
}
}
+func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+ log.Printf("BufferReader WriteTo")
+ for {
+ this.requests <- ReadRequest{*this.offset, 64 * 1024, this.responses}
+ rr, valid := <-this.responses
+ if valid {
+ *this.offset += len(rr.slice)
+ if err != nil {
+ return int64(*this.offset), err
+ } else {
+ dest.Write(rr.slice)
+ }
+
+ } else {
+ return int64(*this.offset), io.ErrUnexpectedEOF
+ }
+ }
+}
+
// Close the responses channel
func (this BufferReader) Close() error {
close(this.responses)
@@ -251,12 +271,18 @@ func (this BufferReader) Close() error {
// Handle a read request. Returns true if a response was sent, and false if
// the request should be queued.
func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
- log.Printf("HandleReadRequest %d %d %t", req.offset, len(body), complete)
+ log.Printf("HandleReadRequest offset: %d max: %d body: %d %t", req.offset, req.maxsize, len(body), complete)
if req.offset < len(body) {
- req.result <- ReadResult{copy(req.p, body[req.offset:]), nil}
+ var end int
+ if req.offset+req.maxsize < len(body) {
+ end = req.offset + req.maxsize
+ } else {
+ end = len(body)
+ }
+ req.result <- ReadResult{body[req.offset:end], nil}
return true
} else if complete && req.offset >= len(body) {
- req.result <- ReadResult{0, io.EOF}
+ req.result <- ReadResult{nil, io.EOF}
return true
} else {
return false
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index fd4ba7b..c87b87e 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -366,7 +366,7 @@ type StubHandler struct {
}
func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- this.c.Check(req.URL.Path, Equals, this.expectPath)
+ this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
body, err := ioutil.ReadAll(req.Body)
this.c.Check(err, Equals, nil)
@@ -375,19 +375,21 @@ func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
this.handled <- true
}
-func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler,
+ io.ReadCloser, io.WriteCloser, chan UploadError)) {
+
st := StubHandler{
c,
- "/acbd18db4cc2f85cedef654fccc4a4d8",
+ "acbd18db4cc2f85cedef654fccc4a4d8",
"abc123",
"foo",
make(chan bool)}
server := http.Server{Handler: st}
- listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999})
+ listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
defer listener.Close()
- log.Printf("%s", listener.Addr().String())
+ url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
go server.Serve(listener)
kc, _ := MakeKeepClient()
@@ -396,14 +398,59 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
reader, writer := io.Pipe()
upload_status := make(chan UploadError)
- go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+ f(kc, url, st, reader, writer, upload_status)
+}
- writer.Write([]byte("foo"))
- writer.Close()
+func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+ log.Printf("Started TestUploadToStubKeepServer")
- <-st.handled
- status := <-upload_status
- c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+ UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+ reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+ go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status)
+
+ writer.Write([]byte("foo"))
+ writer.Close()
+
+ <-st.handled
+ status := <-upload_status
+ c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+ })
+}
+
+func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+ log.Printf("Started TestUploadToStubKeepServerBufferReader")
+
+ UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+ reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 512)
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Reporting reader error states
+ reader_status := make(chan error)
+
+ go Transfer(buffer, reader, requests, reader_status)
+
+ br1 := MakeBufferReader(requests)
+
+ go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status)
+
+ writer.Write([]byte("foo"))
+ writer.Close()
+
+ <-reader_status
+ <-st.handled
+
+ status := <-upload_status
+ c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+
+ //c.Check(true, Equals, false)
+ })
}
type FailHandler struct {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list