[ARVADOS] updated: fb7f238945e33b07f1c80b0623315c1ecf86bca2

git at public.curoverse.com git at public.curoverse.com
Tue May 13 22:36:33 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/keepclient/keepclient.go    | 48 +++++++++++----
 .../src/arvados.org/keepclient/keepclient_test.go  | 69 ++++++++++++++++++----
 2 files changed, 95 insertions(+), 22 deletions(-)

       via  fb7f238945e33b07f1c80b0623315c1ecf86bca2 (commit)
      from  09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit fb7f238945e33b07f1c80b0623315c1ecf86bca2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue May 13 22:36:19 2014 -0400

    2798: Read requests from Transfer() now return a slice.  Added BufferReader
    WriteTo() but for some reason http.Request Body isn't using it.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 93fcf4b..ce67503 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -208,15 +208,15 @@ func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
 
 // A read request to the Transfer() function
 type ReadRequest struct {
-	offset int
-	p      []byte
-	result chan<- ReadResult
+	offset  int
+	maxsize int
+	result  chan<- ReadResult
 }
 
 // A read result from the Transfer() function
 type ReadResult struct {
-	n   int
-	err error
+	slice []byte
+	err   error
 }
 
 // Reads from the buffer managed by the Transfer()
@@ -232,16 +232,36 @@ func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
 
 // Reads from the buffer managed by the Transfer()
 func (this BufferReader) Read(p []byte) (n int, err error) {
-	this.requests <- ReadRequest{*this.offset, p, this.responses}
+	log.Printf("BufferReader Read %d", len(p))
+	this.requests <- ReadRequest{*this.offset, len(p), this.responses}
 	rr, valid := <-this.responses
 	if valid {
-		*this.offset += rr.n
-		return rr.n, rr.err
+		*this.offset += len(rr.slice)
+		return copy(p, rr.slice), rr.err
 	} else {
 		return 0, io.ErrUnexpectedEOF
 	}
 }
 
+func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
+	log.Printf("BufferReader WriteTo")
+	for {
+		this.requests <- ReadRequest{*this.offset, 64 * 1024, this.responses}
+		rr, valid := <-this.responses
+		if valid {
+			*this.offset += len(rr.slice)
+			if err != nil {
+				return int64(*this.offset), err
+			} else {
+				dest.Write(rr.slice)
+			}
+
+		} else {
+			return int64(*this.offset), io.ErrUnexpectedEOF
+		}
+	}
+}
+
 // Close the responses channel
 func (this BufferReader) Close() error {
 	close(this.responses)
@@ -251,12 +271,18 @@ func (this BufferReader) Close() error {
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
 func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
-	log.Printf("HandleReadRequest %d %d %t", req.offset, len(body), complete)
+	log.Printf("HandleReadRequest offset: %d  max: %d body: %d %t", req.offset, req.maxsize, len(body), complete)
 	if req.offset < len(body) {
-		req.result <- ReadResult{copy(req.p, body[req.offset:]), nil}
+		var end int
+		if req.offset+req.maxsize < len(body) {
+			end = req.offset + req.maxsize
+		} else {
+			end = len(body)
+		}
+		req.result <- ReadResult{body[req.offset:end], nil}
 		return true
 	} else if complete && req.offset >= len(body) {
-		req.result <- ReadResult{0, io.EOF}
+		req.result <- ReadResult{nil, io.EOF}
 		return true
 	} else {
 		return false
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index fd4ba7b..c87b87e 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -366,7 +366,7 @@ type StubHandler struct {
 }
 
 func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-	this.c.Check(req.URL.Path, Equals, this.expectPath)
+	this.c.Check(req.URL.Path, Equals, "/"+this.expectPath)
 	this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
 	body, err := ioutil.ReadAll(req.Body)
 	this.c.Check(err, Equals, nil)
@@ -375,19 +375,21 @@ func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	this.handled <- true
 }
 
-func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler,
+	io.ReadCloser, io.WriteCloser, chan UploadError)) {
+
 	st := StubHandler{
 		c,
-		"/acbd18db4cc2f85cedef654fccc4a4d8",
+		"acbd18db4cc2f85cedef654fccc4a4d8",
 		"abc123",
 		"foo",
 		make(chan bool)}
 	server := http.Server{Handler: st}
 
-	listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999})
+	listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
 	defer listener.Close()
 
-	log.Printf("%s", listener.Addr().String())
+	url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
 
 	go server.Serve(listener)
 	kc, _ := MakeKeepClient()
@@ -396,14 +398,59 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 	reader, writer := io.Pipe()
 	upload_status := make(chan UploadError)
 
-	go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+	f(kc, url, st, reader, writer, upload_status)
+}
 
-	writer.Write([]byte("foo"))
-	writer.Close()
+func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+	log.Printf("Started TestUploadToStubKeepServer")
 
-	<-st.handled
-	status := <-upload_status
-	c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+	UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+		reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+		go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status)
+
+		writer.Write([]byte("foo"))
+		writer.Close()
+
+		<-st.handled
+		status := <-upload_status
+		c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+	})
+}
+
+func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
+	log.Printf("Started TestUploadToStubKeepServerBufferReader")
+
+	UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
+		reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+
+		// Buffer for reads from 'r'
+		buffer := make([]byte, 512)
+
+		// Read requests on Transfer() buffer
+		requests := make(chan ReadRequest)
+		defer close(requests)
+
+		// Reporting reader error states
+		reader_status := make(chan error)
+
+		go Transfer(buffer, reader, requests, reader_status)
+
+		br1 := MakeBufferReader(requests)
+
+		go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status)
+
+		writer.Write([]byte("foo"))
+		writer.Close()
+
+		<-reader_status
+		<-st.handled
+
+		status := <-upload_status
+		c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+
+		//c.Check(true, Equals, false)
+	})
 }
 
 type FailHandler struct {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list