[ARVADOS] updated: 09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6

git at public.curoverse.com git at public.curoverse.com
Tue May 13 20:47:53 EDT 2014


Summary of changes:
 {services/keep => sdk/go}/build.sh                 |   9 +-
 sdk/go/src/arvados.org/keepclient/keepclient.go    | 432 ++++++++++++--------
 .../src/arvados.org/keepclient/keepclient_test.go  | 445 ++++++++++++++++++++-
 3 files changed, 713 insertions(+), 173 deletions(-)
 copy {services/keep => sdk/go}/build.sh (85%)

       via  09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6 (commit)
       via  99cd188a0cbd9143b690750e21abc8d8d5e6dbad (commit)
       via  2a493a9215f604c63ab7bc6f0e0956d10af8ef10 (commit)
       via  6132d8efbc522b71d0084160abaaa87031678bdc (commit)
      from  66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue May 13 20:47:49 2014 -0400

    2798: Added uploadToKeepServer() test

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 657b300..93fcf4b 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -372,7 +372,7 @@ type UploadError struct {
 	url string
 }
 
-func (this KeepClient) uploadToKeepServer(host string, hash string, readChannel chan<- ReadRequest, upload_status chan<- UploadError) {
+func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, upload_status chan<- UploadError) {
 	var req *http.Request
 	var err error
 	var url = fmt.Sprintf("%s/%s", host, hash)
@@ -382,7 +382,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, readChannel
 	}
 
 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
-	req.Body = MakeBufferReader(readChannel)
+	req.Body = body
 
 	var resp *http.Response
 	if resp, err = this.client.Do(req); err != nil {
@@ -421,7 +421,7 @@ func (this KeepClient) putReplicas(
 		for active < want_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
-				go this.uploadToKeepServer(sv[next_server], hash, requests, upload_status)
+				go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status)
 				next_server += 1
 				active += 1
 			} else {
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 8f6813d..fd4ba7b 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -2,10 +2,13 @@ package keepclient
 
 import (
 	"flag"
-	//"fmt"
+	"fmt"
 	. "gopkg.in/check.v1"
 	"io"
-	//"log"
+	"io/ioutil"
+	"log"
+	"net"
+	"net/http"
 	"os"
 	"os/exec"
 	"testing"
@@ -353,3 +356,101 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
 	status := <-reader_status
 	c.Check(status, Equals, io.ErrShortBuffer)
 }
+
+type StubHandler struct {
+	c              *C
+	expectPath     string
+	expectApiToken string
+	expectBody     string
+	handled        chan bool
+}
+
+func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	this.c.Check(req.URL.Path, Equals, this.expectPath)
+	this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
+	body, err := ioutil.ReadAll(req.Body)
+	this.c.Check(err, Equals, nil)
+	this.c.Check(body, DeepEquals, []byte(this.expectBody))
+	resp.WriteHeader(200)
+	this.handled <- true
+}
+
+func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+	st := StubHandler{
+		c,
+		"/acbd18db4cc2f85cedef654fccc4a4d8",
+		"abc123",
+		"foo",
+		make(chan bool)}
+	server := http.Server{Handler: st}
+
+	listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999})
+	defer listener.Close()
+
+	log.Printf("%s", listener.Addr().String())
+
+	go server.Serve(listener)
+	kc, _ := MakeKeepClient()
+	kc.ApiToken = "abc123"
+
+	reader, writer := io.Pipe()
+	upload_status := make(chan UploadError)
+
+	go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+
+	writer.Write([]byte("foo"))
+	writer.Close()
+
+	<-st.handled
+	status := <-upload_status
+	c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+}
+
+type FailHandler struct {
+	handled chan bool
+}
+
+func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	resp.WriteHeader(400)
+	this.handled <- true
+}
+
+/*func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
+	log.Printf("blup")
+
+	c.Check(true, Equals, false)
+
+	log.Printf("blug")
+
+	st := FailHandler{make(chan bool)}
+	server := http.Server{Handler: st}
+
+	listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
+	defer listener.Close()
+
+	go server.Serve(listener)
+	kc, _ := MakeKeepClient()
+	kc.ApiToken = "abc123"
+
+	reader, writer := io.Pipe()
+	upload_status := make(chan UploadError)
+
+	go kc.uploadToKeepServer(fmt.Sprintf("http://localhost:%s", listener.Addr().String()), "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+
+	log.Printf("Writing 1")
+
+	writer.Write([]byte("foo"))
+
+	log.Printf("Writing 2")
+
+	writer.Close()
+
+	log.Printf("Writing 3")
+
+	<-st.handled
+
+	log.Printf("Handled?!")
+
+	status := <-upload_status
+	c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+}*/

commit 99cd188a0cbd9143b690750e21abc8d8d5e6dbad
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue May 13 16:07:59 2014 -0400

    2798: Checkpoint commit, tests for ReadIntoBuffer() and Transfer() pass.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index f9dce5f..657b300 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -1,10 +1,14 @@
 package keepclient
 
 import (
+	"crypto/md5"
 	"crypto/tls"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
+	"io/ioutil"
+	"log"
 	"net/http"
 	"os"
 	"sort"
@@ -16,6 +20,8 @@ type KeepClient struct {
 	ApiToken      string
 	ApiInsecure   bool
 	Service_roots []string
+	Want_replicas int
+	client        *http.Client
 }
 
 type KeepDisk struct {
@@ -30,50 +36,68 @@ func MakeKeepClient() (kc *KeepClient, err error) {
 		ApiToken:    os.Getenv("ARVADOS_API_TOKEN"),
 		ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")}
 
-	if err := kc.DiscoverKeepDisks(); err != nil {
-		return nil, err
+	tr := &http.Transport{
+		TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure},
 	}
 
-	return kc, nil
+	kc.client = &http.Client{Transport: tr}
+
+	err = kc.DiscoverKeepDisks()
+
+	return kc, err
 }
 
 func (this *KeepClient) DiscoverKeepDisks() error {
-	tr := &http.Transport{
-		TLSClientConfig: &tls.Config{InsecureSkipVerify: this.ApiInsecure},
-	}
-	client := &http.Client{Transport: tr}
-
+	// Construct request of keep disk list
 	var req *http.Request
 	var err error
-	if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
+	if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
 		return err
 	}
 
-	var resp *http.Response
+	// Add api token header
 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
-	if resp, err = client.Do(req); err != nil {
+
+	// Make the request
+	var resp *http.Response
+	if resp, err = this.client.Do(req); err != nil {
 		return err
 	}
 
 	type SvcList struct {
 		Items []KeepDisk `json:"items"`
 	}
+
+	// Decode json reply
 	dec := json.NewDecoder(resp.Body)
 	var m SvcList
 	if err := dec.Decode(&m); err != nil {
 		return err
 	}
 
-	this.Service_roots = make([]string, len(m.Items))
-	for index, element := range m.Items {
+	listed := make(map[string]bool)
+	this.Service_roots = make([]string, 0, len(m.Items))
+
+	for _, element := range m.Items {
 		n := ""
 		if element.SSL {
 			n = "s"
 		}
-		this.Service_roots[index] = fmt.Sprintf("http%s://%s:%d",
-			n, element.Hostname, element.Port)
+
+		// Construct server URL
+		url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
+
+		// Skip duplicates
+		if !listed[url] {
+			listed[url] = true
+			this.Service_roots = append(this.Service_roots, url)
+		}
 	}
+
+	// Must be sorted for ShuffledServiceRoots() to produce consistent
+	// results.
 	sort.Strings(this.Service_roots)
+
 	return nil
 }
 
@@ -130,92 +154,125 @@ type ReaderSlice struct {
 	reader_error error
 }
 
-type Source <-chan ReaderSlice
-type Sink chan<- ReaderSlice
-type Status chan error
-
 // Read repeatedly from the reader into the specified buffer, and report each
-// read to channel 'c'.  Completes when Reader 'r' reports an error and closes
-// channel 'c'.
-func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
-	defer close(c)
+// read to channel 'c'.  Completes when Reader 'r' reports on the error channel
+// and closes channel 'c'.
+func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
+	defer close(slices)
 
 	// Initially use entire buffer as scratch space
 	ptr := buffer[:]
-	for len(ptr) > 0 {
-		// Read into the scratch space
-		n, err := r.Read(ptr)
+	for {
+		log.Printf("ReadIntoBuffer doing read")
+		var n int
+		var err error
+		if len(ptr) > 0 {
+			// Read into the scratch space
+			n, err = r.Read(ptr)
+		} else {
+			// Ran out of scratch space, try reading one more byte
+			var b [1]byte
+			n, err = r.Read(b[:])
+
+			if n > 0 {
+				// Reader has more data but we have nowhere to
+				// put it, so we're stuffed
+				slices <- ReaderSlice{nil, io.ErrShortBuffer}
+			} else {
+				// Return some other error (hopefully EOF)
+				slices <- ReaderSlice{nil, err}
+			}
+			return
+		}
 
 		// End on error (includes EOF)
 		if err != nil {
-			c <- ReaderSlice{nil, err}
+			log.Printf("ReadIntoBuffer sending error %d %s", n, err.Error())
+			slices <- ReaderSlice{nil, err}
 			return
 		}
 
-		// Make a slice with the contents of the read
-		c <- ReaderSlice{ptr[:n], nil}
+		log.Printf("ReadIntoBuffer got %d", n)
 
-		// Adjust the scratch space slice
-		ptr = ptr[n:]
-	}
-	if len(ptr) == 0 {
-		c <- ReaderSlice{nil, io.ErrShortBuffer}
+		if n > 0 {
+			log.Printf("ReadIntoBuffer sending readerslice")
+			// Make a slice with the contents of the read
+			slices <- ReaderSlice{ptr[:n], nil}
+			log.Printf("ReadIntoBuffer sent readerslice")
+
+			// Adjust the scratch space slice
+			ptr = ptr[n:]
+		}
 	}
 }
 
-// Take slices from 'source' channel and write them to Writer 'w'.  Reports read
-// or write errors on 'status'.  Completes when 'source' channel is closed.
-/*func SinkWriter(source Source, w io.Writer, status Status) {
-	can_write = true
+// A read request to the Transfer() function
+type ReadRequest struct {
+	offset int
+	p      []byte
+	result chan<- ReadResult
+}
 
-	for {
-		// Get the next block from the source
-		rs, valid := <-source
-
-		if valid {
-			if rs.error != nil {
-				// propagate reader status (should only be EOF)
-				status <- rs.error
-			} else if can_write {
-				buf := rs.slice[:]
-				for len(buf) > 0 {
-					n, err := w.Write(buf)
-					buf = buf[n:]
-					if err == io.ErrShortWrite {
-						// short write, so go around again
-					} else if err != nil {
-						// some other write error,
-						// propagate error and stop
-						// further writes
-						status <- err
-						can_write = false
-					}
-				}
-			}
-		} else {
-			// source channel closed
-			break
-		}
+// A read result from the Transfer() function
+type ReadResult struct {
+	n   int
+	err error
+}
+
+// Reads from the buffer managed by the Transfer()
+type BufferReader struct {
+	offset    *int
+	requests  chan<- ReadRequest
+	responses chan ReadResult
+}
+
+func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
+	return BufferReader{new(int), requests, make(chan ReadResult)}
+}
+
+// Reads from the buffer managed by the Transfer()
+func (this BufferReader) Read(p []byte) (n int, err error) {
+	this.requests <- ReadRequest{*this.offset, p, this.responses}
+	rr, valid := <-this.responses
+	if valid {
+		*this.offset += rr.n
+		return rr.n, rr.err
+	} else {
+		return 0, io.ErrUnexpectedEOF
 	}
-}*/
+}
 
-func closeSinks(sinks_slice []Sink) {
-	for _, s := range sinks_slice {
-		close(s)
+// Close the responses channel
+func (this BufferReader) Close() error {
+	close(this.responses)
+	return nil
+}
+
+// Handle a read request.  Returns true if a response was sent, and false if
+// the request should be queued.
+func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
+	log.Printf("HandleReadRequest %d %d %t", req.offset, len(body), complete)
+	if req.offset < len(body) {
+		req.result <- ReadResult{copy(req.p, body[req.offset:]), nil}
+		return true
+	} else if complete && req.offset >= len(body) {
+		req.result <- ReadResult{0, io.EOF}
+		return true
+	} else {
+		return false
 	}
 }
 
-// Transfer data from a source (either an already-filled buffer, or a reader)
-// into one or more 'sinks'.  If 'source' is valid, it will read from the
-// reader into the buffer and send the data to the sinks.  Otherwise 'buffer'
-// it will just send the contents of the buffer to the sinks.  Completes when
-// the 'sinks' channel is closed.
-func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
+// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
+// in the provided buffer.  Otherwise, use the contents of 'buffer' as is.
+// Accepts read requests on the buffer on the 'requests' channel.  Completes
+// when 'requests' channel is closed.
+func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
 	// currently buffered data
 	var body []byte
 
 	// for receiving slices from ReadIntoBuffer
-	var slices chan []byte = nil
+	var slices chan ReaderSlice = nil
 
 	// indicates whether the buffered data is complete
 	var complete bool = false
@@ -224,47 +281,45 @@ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink,
 		// 'body' is the buffer slice representing the body content read so far
 		body = source_buffer[:0]
 
-		// used to communicate slices of the buffer as read
-		reader_slices := make(chan []ReaderSlice)
+		// used to communicate slices of the buffer as they are
+		slices = make(chan ReaderSlice)
 
 		// Spin it off
-		go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
+		go ReadIntoBuffer(source_buffer, source_reader, slices)
 	} else {
 		// use the whole buffer
 		body = source_buffer[:]
 
-		// that's it
+		// buffer is complete
 		complete = true
 	}
 
-	// list of sinks to send to
-	sinks_slice := make([]Sink, 0)
-	defer closeSinks(sinks_slice)
+	pending_requests := make([]ReadRequest, 0)
 
 	for {
+		log.Printf("Doing select")
 		select {
-		case s, valid := <-sinks:
+		case req, valid := <-requests:
+			log.Printf("Got read request")
+			// Handle a buffer read request
 			if valid {
-				// add to the sinks slice
-				sinks_slice = append(sinks_slice, s)
-
-				// catch up the sink with the current body contents
-				if len(body) > 0 {
-					s <- ReaderSlice{body, nil}
-					if complete {
-						s <- ReaderSlice{nil, io.EOF}
-					}
+				if !HandleReadRequest(req, body, complete) {
+					log.Printf("Queued")
+					pending_requests = append(pending_requests, req)
 				}
 			} else {
-				// closed 'sinks' channel indicates we're done
+				// closed 'requests' channel indicates we're done
 				return
 			}
 
 		case bk, valid := <-slices:
+			// Got a new slice from the reader
 			if valid {
-				if bk.err != nil {
-					reader_error <- bk.err
-					if bk.err == io.EOF {
+				log.Printf("Got readerslice %d", len(bk.slice))
+
+				if bk.reader_error != nil {
+					reader_error <- bk.reader_error
+					if bk.reader_error == io.EOF {
 						// EOF indicates the reader is done
 						// sending, so our buffer is complete.
 						complete = true
@@ -279,86 +334,95 @@ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink,
 					body = source_buffer[0 : len(body)+len(bk.slice)]
 				}
 
-				// send the new slice to the sinks
-				for _, s := range sinks_slice {
-					s <- bk
-				}
-
-				if complete {
-					// got an EOF, so close the sinks
-					closeSinks(sinks_slice)
+				// handle pending reads
+				n := 0
+				for n < len(pending_requests) {
+					if HandleReadRequest(pending_requests[n], body, complete) {
+						log.Printf("ReadRequest handled")
+
+						// move the element from the
+						// back of the slice to
+						// position 'n', then shorten
+						// the slice by one element
+						pending_requests[n] = pending_requests[len(pending_requests)-1]
+						pending_requests = pending_requests[0 : len(pending_requests)-1]
+					} else {
+						log.Printf("ReadRequest re-queued")
 
-					// truncate sinks slice
-					sinks_slice = sinks_slice[:0]
+						// Request wasn't handled, so keep it in the request slice
+						n += 1
+					}
 				}
 			} else {
-				// no more reads
-				slices = nil
+				if complete {
+					// no more reads
+					slices = nil
+				} else {
+					// reader channel closed without signaling EOF
+					reader_error <- io.ErrUnexpectedEOF
+					return
+				}
 			}
 		}
 	}
 }
 
-func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
-	pipereader, pipewriter := io.Pipe()
+type UploadError struct {
+	err error
+	url string
+}
 
+func (this KeepClient) uploadToKeepServer(host string, hash string, readChannel chan<- ReadRequest, upload_status chan<- UploadError) {
 	var req *http.Request
-	if req, err = http.NewRequest("POST", url, nil); err != nil {
-		write_status <- err
+	var err error
+	var url = fmt.Sprintf("%s/%s", host, hash)
+	if req, err = http.NewRequest("PUT", url, nil); err != nil {
+		upload_status <- UploadError{err, url}
+		return
 	}
-	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
-	req.Body = pipereader
 
-	// create channel to transfer slices from reader to writer
-	tr := make(chan ReaderSlice)
-
-	// start the writer goroutine
-	go SinkWriter(tr, pipewriter, write_status)
-
-	// now transfer the channel to the reader goroutine
-	sinks <- tr
+	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+	req.Body = MakeBufferReader(readChannel)
 
 	var resp *http.Response
+	if resp, err = this.client.Do(req); err != nil {
+		upload_status <- UploadError{err, url}
+	}
 
-	if resp, err = client.Do(req); err != nil {
-		return nil, err
+	if resp.StatusCode == http.StatusOK {
+		upload_status <- UploadError{io.EOF, url}
 	}
 }
 
-var KeepWriteError = errors.new("Could not write sufficient replicas")
+var KeepWriteError = errors.New("Could not write sufficient replicas")
+
+func (this KeepClient) putReplicas(
+	hash string,
+	requests chan ReadRequest,
+	reader_status chan error) error {
 
-func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
-	// Calculate the ordering to try writing to servers
+	// Calculate the ordering for uploading to servers
 	sv := this.ShuffledServiceRoots(hash)
 
 	// The next server to try contacting
-	n := 0
+	next_server := 0
 
 	// The number of active writers
 	active := 0
 
-	// Used to buffer reads from 'r'
-	buffer := make([]byte, 64*1024*1024)
+	// Used to communicate status from the upload goroutines
+	upload_status := make(chan UploadError)
+	defer close(upload_status)
 
-	// Used to send writers to the reader goroutine
-	sinks := make(chan Sink)
-	defer close(sinks)
-
-	// Used to communicate status from the reader goroutine
-	reader_status := make(chan error)
-
-	// Start the reader goroutine
-	go Transfer(buffer, r, sinks, reader_status)
-
-	// Used to communicate status from the writer goroutines
-	write_status := make(chan error)
+	// Desired number of replicas
+	want_replicas := this.Want_replicas
 
 	for want_replicas > 0 {
 		for active < want_replicas {
-			// Start some writers
-			if n < len(sv) {
-				go this.ConnectToKeepServer(sv[n], sinks, write_status)
-				n += 1
+			// Start some upload requests
+			if next_server < len(sv) {
+				go this.uploadToKeepServer(sv[next_server], hash, requests, upload_status)
+				next_server += 1
 				active += 1
 			} else {
 				return KeepWriteError
@@ -374,14 +438,58 @@ func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) erro
 				// bad news
 				return status
 			}
-		case status := <-write_status:
-			if status == io.EOF {
+		case status := <-upload_status:
+			if status.err == io.EOF {
 				// good news!
 				want_replicas -= 1
 			} else {
-				// writing to keep server failed for some reason.
+				// writing to keep server failed for some reason
+				log.Printf("Got error %s uploading to %s", status.err, status.url)
 			}
 			active -= 1
 		}
 	}
+
+	return nil
+}
+
+func (this KeepClient) PutHR(hash string, r io.Reader) error {
+
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 64*1024*1024)
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	// Reporting reader error states
+	reader_status := make(chan error)
+
+	// Start the transfer goroutine
+	go Transfer(buffer, r, requests, reader_status)
+
+	return this.putReplicas(hash, requests, reader_status)
+}
+
+func (this KeepClient) PutHB(hash string, buffer []byte) error {
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	// Start the transfer goroutine
+	go Transfer(buffer, nil, requests, nil)
+
+	return this.putReplicas(hash, requests, nil)
+}
+
+func (this KeepClient) PutB(buffer []byte) error {
+	return this.PutHB(fmt.Sprintf("%x", md5.Sum(buffer)), buffer)
+}
+
+func (this KeepClient) PutR(r io.Reader) error {
+	if buffer, err := ioutil.ReadAll(r); err != nil {
+		return err
+	} else {
+		return this.PutB(buffer)
+	}
 }
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 13a203d..8f6813d 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -1,62 +1,355 @@
 package keepclient
 
 import (
-	"fmt"
+	"flag"
+	//"fmt"
 	. "gopkg.in/check.v1"
+	"io"
+	//"log"
 	"os"
 	"os/exec"
 	"testing"
+	"time"
 )
 
 // Gocheck boilerplate
 func Test(t *testing.T) { TestingT(t) }
 
 // Gocheck boilerplate
-var _ = Suite(&MySuite{})
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&StandaloneSuite{})
 
-// Our test fixture
-type MySuite struct{}
+var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
 
-func (s *MySuite) SetUpSuite(c *C) {
-	os.Chdir(os.ExpandEnv("$GOPATH../python"))
-	exec.Command("python", "run_test_server.py", "start").Run()
+// Tests that require the Keep server running
+type ServerRequiredSuite struct{}
+
+// Standalone tests
+type StandaloneSuite struct{}
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+	if *no_server {
+		c.Skip("Skipping tests that require server")
+	} else {
+		os.Chdir(os.ExpandEnv("$GOPATH../python"))
+		exec.Command("python", "run_test_server.py", "start").Run()
+		exec.Command("python", "run_test_server.py", "start_keep").Run()
+	}
 }
 
-func (s *MySuite) TearDownSuite(c *C) {
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
 	os.Chdir(os.ExpandEnv("$GOPATH../python"))
+	exec.Command("python", "run_test_server.py", "stop_keep").Run()
 	exec.Command("python", "run_test_server.py", "stop").Run()
 }
 
-func (s *MySuite) TestInit(c *C) {
+func (s *ServerRequiredSuite) TestInit(c *C) {
 	os.Setenv("ARVADOS_API_HOST", "localhost:3001")
-	os.Setenv("ARVADOS_API_TOKEN", "12345")
+	os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
 	os.Setenv("ARVADOS_API_HOST_INSECURE", "")
-	kc := InitKeepClient()
-	c.Assert(kc.apiServer, Equals, "localhost:3001")
-	c.Assert(kc.apiToken, Equals, "12345")
-	c.Assert(kc.apiInsecure, Equals, false)
+
+	kc, err := MakeKeepClient()
+	c.Assert(kc.ApiServer, Equals, "localhost:3001")
+	c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+	c.Assert(kc.ApiInsecure, Equals, false)
 
 	os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
-	kc = InitKeepClient()
-	c.Assert(kc.apiServer, Equals, "localhost:3001")
-	c.Assert(kc.apiToken, Equals, "12345")
-	c.Assert(kc.apiInsecure, Equals, true)
-}
 
-func (s *MySuite) TestGetKeepDisks(c *C) {
-	sr, err := KeepDisks()
+	kc, err = MakeKeepClient()
+	c.Assert(kc.ApiServer, Equals, "localhost:3001")
+	c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+	c.Assert(kc.ApiInsecure, Equals, true)
+
 	c.Assert(err, Equals, nil)
-	c.Assert(len(sr), Equals, 2)
-	c.Assert(sr[0], Equals, "http://localhost:25107")
-	c.Assert(sr[1], Equals, "http://localhost:25108")
+	c.Assert(len(kc.Service_roots), Equals, 2)
+	c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
+	c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
+}
 
-	service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
+func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
+	kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
 
 	// "foo" acbd18db4cc2f85cedef654fccc4a4d8
 	foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
-	c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
+	c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
 
 	// "bar" 37b51d194a7513e45b56f6524f2d51f2
 	bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
-	c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
+	c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
+}
+
+func ReadIntoBufferHelper(c *C, bufsize int) {
+	buffer := make([]byte, bufsize)
+
+	reader, writer := io.Pipe()
+	slices := make(chan ReaderSlice)
+
+	go ReadIntoBuffer(buffer, reader, slices)
+
+	{
+		out := make([]byte, 128)
+		for i := 0; i < 128; i += 1 {
+			out[i] = byte(i)
+		}
+		writer.Write(out)
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 128)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 128; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		out := make([]byte, 96)
+		for i := 0; i < 96; i += 1 {
+			out[i] = byte(i / 2)
+		}
+		writer.Write(out)
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 96)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 96; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i/2))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else if i < (128 + 96) {
+				c.Check(buffer[i], Equals, byte((i-128)/2))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		writer.Close()
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 0)
+		c.Check(s1.reader_error, Equals, io.EOF)
+	}
+}
+
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+	ReadIntoBufferHelper(c, 512)
+	ReadIntoBufferHelper(c, 225)
+	ReadIntoBufferHelper(c, 224)
+}
+
+func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
+	buffer := make([]byte, 223)
+	reader, writer := io.Pipe()
+	slices := make(chan ReaderSlice)
+
+	go ReadIntoBuffer(buffer, reader, slices)
+
+	{
+		out := make([]byte, 128)
+		for i := 0; i < 128; i += 1 {
+			out[i] = byte(i)
+		}
+		writer.Write(out)
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 128)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 128; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		out := make([]byte, 96)
+		for i := 0; i < 96; i += 1 {
+			out[i] = byte(i / 2)
+		}
+
+		// Write will deadlock because it can't write all the data, so
+		// spin it off to a goroutine
+		go writer.Write(out)
+		s1 := <-slices
+
+		c.Check(len(s1.slice), Equals, 95)
+		c.Check(s1.reader_error, Equals, nil)
+		for i := 0; i < 95; i += 1 {
+			c.Check(s1.slice[i], Equals, byte(i/2))
+		}
+		for i := 0; i < len(buffer); i += 1 {
+			if i < 128 {
+				c.Check(buffer[i], Equals, byte(i))
+			} else if i < (128 + 95) {
+				c.Check(buffer[i], Equals, byte((i-128)/2))
+			} else {
+				c.Check(buffer[i], Equals, byte(0))
+			}
+		}
+	}
+	{
+		writer.Close()
+		s1 := <-slices
+		c.Check(len(s1.slice), Equals, 0)
+		c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+	}
+
+}
+
+func (s *StandaloneSuite) TestTransfer(c *C) {
+	reader, writer := io.Pipe()
+
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 512)
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	// Reporting reader error states
+	reader_status := make(chan error)
+
+	go Transfer(buffer, reader, requests, reader_status)
+
+	br1 := MakeBufferReader(requests)
+	out := make([]byte, 128)
+
+	{
+		// Write some data, and read into a buffer shorter than
+		// available data
+		for i := 0; i < 128; i += 1 {
+			out[i] = byte(i)
+		}
+
+		writer.Write(out[:100])
+
+		in := make([]byte, 64)
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 64)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 64; i += 1 {
+			c.Check(in[i], Equals, out[i])
+		}
+	}
+
+	{
+		// Write some more data, and read into buffer longer than
+		// available data
+		in := make([]byte, 64)
+		n, err := br1.Read(in)
+		c.Check(n, Equals, 36)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 36; i += 1 {
+			c.Check(in[i], Equals, out[64+i])
+		}
+
+	}
+
+	{
+		// Test read before write
+		type Rd struct {
+			n   int
+			err error
+		}
+		rd := make(chan Rd)
+		in := make([]byte, 64)
+
+		go func() {
+			n, err := br1.Read(in)
+			rd <- Rd{n, err}
+		}()
+
+		time.Sleep(100 * time.Millisecond)
+		writer.Write(out[100:])
+
+		got := <-rd
+
+		c.Check(got.n, Equals, 28)
+		c.Check(got.err, Equals, nil)
+
+		for i := 0; i < 28; i += 1 {
+			c.Check(in[i], Equals, out[100+i])
+		}
+	}
+
+	br2 := MakeBufferReader(requests)
+	{
+		// Test 'catch up' reader
+		in := make([]byte, 256)
+		n, err := br2.Read(in)
+
+		c.Check(n, Equals, 128)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 128; i += 1 {
+			c.Check(in[i], Equals, out[i])
+		}
+	}
+
+	{
+		// Test closing the reader
+		writer.Close()
+		status := <-reader_status
+		c.Check(status, Equals, io.EOF)
+
+		in := make([]byte, 256)
+		n1, err1 := br1.Read(in)
+		n2, err2 := br2.Read(in)
+		c.Check(n1, Equals, 0)
+		c.Check(err1, Equals, io.EOF)
+		c.Check(n2, Equals, 0)
+		c.Check(err2, Equals, io.EOF)
+	}
+
+	{
+		// Test 'catch up' reader after closing
+		br3 := MakeBufferReader(requests)
+		in := make([]byte, 256)
+		n, err := br3.Read(in)
+
+		c.Check(n, Equals, 128)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 128; i += 1 {
+			c.Check(in[i], Equals, out[i])
+		}
+
+		n, err = br3.Read(in)
+
+		c.Check(n, Equals, 0)
+		c.Check(err, Equals, io.EOF)
+	}
+}
+
+func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
+	reader, writer := io.Pipe()
+
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 100)
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	// Reporting reader error states
+	reader_status := make(chan error)
+
+	go Transfer(buffer, reader, requests, reader_status)
+
+	out := make([]byte, 101)
+	go writer.Write(out)
+
+	status := <-reader_status
+	c.Check(status, Equals, io.ErrShortBuffer)
 }

commit 2a493a9215f604c63ab7bc6f0e0956d10af8ef10
Merge: 6132d8e 66d5cdb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue May 13 09:07:05 2014 -0400

    2798: Merged branch with code to read environment variables with branch working on Put support.
    
    Merge remote-tracking branch 'origin/2798-go-keep-client' into 2798-go-keep-client
    
    Conflicts:
    	sdk/go/src/arvados.org/keepclient/keepclient.go
    	sdk/go/src/arvados.org/keepclient/keepclient_test.go

diff --cc sdk/go/src/arvados.org/keepclient/keepclient.go
index 6d75425,073a76e..f9dce5f
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@@ -1,20 -1,379 +1,387 @@@
  package keepclient
  
  import (
- 	//"net/http"
+ 	"crypto/tls"
+ 	"encoding/json"
+ 	"fmt"
+ 	"io"
+ 	"net/http"
 +	"os"
+ 	"sort"
+ 	"strconv"
  )
  
  type KeepClient struct {
- 	apiServer   string
- 	apiToken    string
- 	apiInsecure bool
 -	Service_roots []string
++	ApiServer     string
+ 	ApiToken      string
++	ApiInsecure   bool
++	Service_roots []string
  }
  
- func InitKeepClient() *KeepClient {
- 	kc := KeepClient{os.Getenv("ARVADOS_API_HOST"),
- 		os.Getenv("ARVADOS_API_TOKEN"),
- 		os.Getenv("ARVADOS_API_HOST_INSECURE") != ""}
+ type KeepDisk struct {
+ 	Hostname string `json:"service_host"`
+ 	Port     int    `json:"service_port"`
+ 	SSL      bool   `json:"service_ssl_flag"`
+ }
+ 
+ func MakeKeepClient() (kc *KeepClient, err error) {
 -	kc := KeepClient{}
 -	err := kc.DiscoverKeepDisks()
 -	if err != nil {
++	kc = &KeepClient{
++		ApiServer:   os.Getenv("ARVADOS_API_HOST"),
++		ApiToken:    os.Getenv("ARVADOS_API_TOKEN"),
++		ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")}
++
++	if err := kc.DiscoverKeepDisks(); err != nil {
+ 		return nil, err
+ 	}
 -	return &kc, nil
++
++	return kc, nil
+ }
+ 
+ func (this *KeepClient) DiscoverKeepDisks() error {
+ 	tr := &http.Transport{
 -		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
++		TLSClientConfig: &tls.Config{InsecureSkipVerify: this.ApiInsecure},
+ 	}
+ 	client := &http.Client{Transport: tr}
+ 
+ 	var req *http.Request
++	var err error
+ 	if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
 -		return nil, err
++		return err
+ 	}
+ 
+ 	var resp *http.Response
+ 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ 	if resp, err = client.Do(req); err != nil {
 -		return nil, err
++		return err
+ 	}
+ 
+ 	type SvcList struct {
+ 		Items []KeepDisk `json:"items"`
+ 	}
+ 	dec := json.NewDecoder(resp.Body)
+ 	var m SvcList
+ 	if err := dec.Decode(&m); err != nil {
 -		return nil, err
++		return err
+ 	}
+ 
 -	this.service_roots = make([]string, len(m.Items))
++	this.Service_roots = make([]string, len(m.Items))
+ 	for index, element := range m.Items {
+ 		n := ""
+ 		if element.SSL {
+ 			n = "s"
+ 		}
 -		this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
++		this.Service_roots[index] = fmt.Sprintf("http%s://%s:%d",
+ 			n, element.Hostname, element.Port)
+ 	}
 -	sort.Strings(this.service_roots)
++	sort.Strings(this.Service_roots)
+ 	return nil
+ }
+ 
+ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
+ 	// Build an ordering with which to query the Keep servers based on the
+ 	// contents of the hash.  "hash" is a hex-encoded number at least 8
+ 	// digits (32 bits) long
+ 
+ 	// seed used to calculate the next keep server from 'pool' to be added
+ 	// to 'pseq'
+ 	seed := hash
+ 
+ 	// Keep servers still to be added to the ordering
+ 	pool := make([]string, len(this.Service_roots))
+ 	copy(pool, this.Service_roots)
+ 
+ 	// output probe sequence
+ 	pseq = make([]string, 0, len(this.Service_roots))
+ 
+ 	// iterate while there are servers left to be assigned
+ 	for len(pool) > 0 {
+ 
+ 		if len(seed) < 8 {
+ 			// ran out of digits in the seed
+ 			if len(pseq) < (len(hash) / 4) {
+ 				// the number of servers added to the probe
+ 				// sequence is less than the number of 4-digit
+ 				// slices in 'hash' so refill the seed with the
+ 				// last 4 digits.
+ 				seed = hash[len(hash)-4:]
+ 			}
+ 			seed += hash
+ 		}
+ 
+ 		// Take the next 8 digits (32 bytes) and interpret as an integer,
+ 		// then modulus with the size of the remaining pool to get the next
+ 		// selected server.
+ 		probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+ 		probe %= uint64(len(pool))
+ 
+ 		// Append the selected server to the probe sequence and remove it
+ 		// from the pool.
+ 		pseq = append(pseq, pool[probe])
+ 		pool = append(pool[:probe], pool[probe+1:]...)
+ 
+ 		// Remove the digits just used from the seed
+ 		seed = seed[8:]
+ 	}
+ 	return pseq
+ }
+ 
+ type ReaderSlice struct {
+ 	slice        []byte
+ 	reader_error error
+ }
+ 
+ type Source <-chan ReaderSlice
+ type Sink chan<- ReaderSlice
+ type Status chan error
+ 
+ // Read repeatedly from the reader into the specified buffer, and report each
+ // read to channel 'c'.  Completes when Reader 'r' reports an error and closes
+ // channel 'c'.
+ func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
+ 	defer close(c)
+ 
+ 	// Initially use entire buffer as scratch space
+ 	ptr := buffer[:]
+ 	for len(ptr) > 0 {
 -		v // Read into the scratch space
++		// Read into the scratch space
+ 		n, err := r.Read(ptr)
+ 
+ 		// End on error (includes EOF)
+ 		if err != nil {
+ 			c <- ReaderSlice{nil, err}
+ 			return
+ 		}
+ 
+ 		// Make a slice with the contents of the read
+ 		c <- ReaderSlice{ptr[:n], nil}
+ 
+ 		// Adjust the scratch space slice
+ 		ptr = ptr[n:]
+ 	}
+ 	if len(ptr) == 0 {
+ 		c <- ReaderSlice{nil, io.ErrShortBuffer}
+ 	}
+ }
+ 
+ // Take slices from 'source' channel and write them to Writer 'w'.  Reports read
+ // or write errors on 'status'.  Completes when 'source' channel is closed.
 -func SinkWriter(source Source, w io.Writer, status Status) {
++/*func SinkWriter(source Source, w io.Writer, status Status) {
+ 	can_write = true
+ 
+ 	for {
+ 		// Get the next block from the source
+ 		rs, valid := <-source
+ 
+ 		if valid {
+ 			if rs.error != nil {
+ 				// propagate reader status (should only be EOF)
+ 				status <- rs.error
+ 			} else if can_write {
+ 				buf := rs.slice[:]
+ 				for len(buf) > 0 {
+ 					n, err := w.Write(buf)
+ 					buf = buf[n:]
+ 					if err == io.ErrShortWrite {
+ 						// short write, so go around again
+ 					} else if err != nil {
+ 						// some other write error,
+ 						// propagate error and stop
+ 						// further writes
+ 						status <- err
+ 						can_write = false
+ 					}
+ 				}
+ 			}
+ 		} else {
+ 			// source channel closed
+ 			break
+ 		}
+ 	}
 -}
++}*/
+ 
+ func closeSinks(sinks_slice []Sink) {
+ 	for _, s := range sinks_slice {
+ 		close(s)
+ 	}
+ }
+ 
+ // Transfer data from a source (either an already-filled buffer, or a reader)
+ // into one or more 'sinks'.  If 'source' is valid, it will read from the
+ // reader into the buffer and send the data to the sinks.  Otherwise 'buffer'
+ // it will just send the contents of the buffer to the sinks.  Completes when
+ // the 'sinks' channel is closed.
+ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
+ 	// currently buffered data
+ 	var body []byte
+ 
+ 	// for receiving slices from ReadIntoBuffer
+ 	var slices chan []byte = nil
+ 
+ 	// indicates whether the buffered data is complete
+ 	var complete bool = false
+ 
 -	if source != nil {
++	if source_reader != nil {
+ 		// 'body' is the buffer slice representing the body content read so far
+ 		body = source_buffer[:0]
+ 
+ 		// used to communicate slices of the buffer as read
+ 		reader_slices := make(chan []ReaderSlice)
+ 
+ 		// Spin it off
+ 		go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
+ 	} else {
+ 		// use the whole buffer
+ 		body = source_buffer[:]
+ 
+ 		// that's it
+ 		complete = true
+ 	}
+ 
+ 	// list of sinks to send to
+ 	sinks_slice := make([]Sink, 0)
+ 	defer closeSinks(sinks_slice)
+ 
+ 	for {
+ 		select {
+ 		case s, valid := <-sinks:
+ 			if valid {
+ 				// add to the sinks slice
+ 				sinks_slice = append(sinks_slice, s)
+ 
+ 				// catch up the sink with the current body contents
+ 				if len(body) > 0 {
+ 					s <- ReaderSlice{body, nil}
+ 					if complete {
+ 						s <- ReaderSlice{nil, io.EOF}
+ 					}
+ 				}
+ 			} else {
+ 				// closed 'sinks' channel indicates we're done
+ 				return
+ 			}
+ 
+ 		case bk, valid := <-slices:
+ 			if valid {
+ 				if bk.err != nil {
+ 					reader_error <- bk.err
+ 					if bk.err == io.EOF {
+ 						// EOF indicates the reader is done
+ 						// sending, so our buffer is complete.
+ 						complete = true
+ 					} else {
+ 						// some other reader error
+ 						return
+ 					}
+ 				}
+ 
+ 				if bk.slice != nil {
+ 					// adjust body bounds now that another slice has been read
+ 					body = source_buffer[0 : len(body)+len(bk.slice)]
+ 				}
+ 
+ 				// send the new slice to the sinks
+ 				for _, s := range sinks_slice {
+ 					s <- bk
+ 				}
+ 
+ 				if complete {
+ 					// got an EOF, so close the sinks
+ 					closeSinks(sinks_slice)
+ 
+ 					// truncate sinks slice
+ 					sinks_slice = sinks_slice[:0]
+ 				}
+ 			} else {
+ 				// no more reads
+ 				slices = nil
+ 			}
+ 		}
+ 	}
+ }
+ 
+ func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
+ 	pipereader, pipewriter := io.Pipe()
+ 
+ 	var req *http.Request
+ 	if req, err = http.NewRequest("POST", url, nil); err != nil {
+ 		write_status <- err
+ 	}
+ 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ 	req.Body = pipereader
+ 
+ 	// create channel to transfer slices from reader to writer
+ 	tr := make(chan ReaderSlice)
+ 
+ 	// start the writer goroutine
+ 	go SinkWriter(tr, pipewriter, write_status)
+ 
+ 	// now transfer the channel to the reader goroutine
+ 	sinks <- tr
+ 
+ 	var resp *http.Response
+ 
+ 	if resp, err = client.Do(req); err != nil {
+ 		return nil, err
+ 	}
+ }
+ 
+ var KeepWriteError = errors.new("Could not write sufficient replicas")
+ 
+ func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
+ 	// Calculate the ordering to try writing to servers
+ 	sv := this.ShuffledServiceRoots(hash)
+ 
+ 	// The next server to try contacting
+ 	n := 0
+ 
+ 	// The number of active writers
+ 	active := 0
+ 
+ 	// Used to buffer reads from 'r'
+ 	buffer := make([]byte, 64*1024*1024)
+ 
+ 	// Used to send writers to the reader goroutine
+ 	sinks := make(chan Sink)
+ 	defer close(sinks)
+ 
+ 	// Used to communicate status from the reader goroutine
+ 	reader_status := make(chan error)
+ 
+ 	// Start the reader goroutine
+ 	go Transfer(buffer, r, sinks, reader_status)
+ 
+ 	// Used to communicate status from the writer goroutines
+ 	write_status := make(chan error)
+ 
+ 	for want_replicas > 0 {
+ 		for active < want_replicas {
+ 			// Start some writers
+ 			if n < len(sv) {
+ 				go this.ConnectToKeepServer(sv[n], sinks, write_status)
+ 				n += 1
+ 				active += 1
+ 			} else {
+ 				return KeepWriteError
+ 			}
+ 		}
  
- 	return &kc
+ 		// Now wait for something to happen.
+ 		select {
+ 		case status := <-reader_status:
+ 			if status == io.EOF {
+ 				// good news!
+ 			} else {
+ 				// bad news
+ 				return status
+ 			}
+ 		case status := <-write_status:
+ 			if status == io.EOF {
+ 				// good news!
+ 				want_replicas -= 1
+ 			} else {
+ 				// writing to keep server failed for some reason.
+ 			}
+ 			active -= 1
+ 		}
+ 	}
  }
diff --cc sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 733cc26,bc719c0..13a203d
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@@ -1,43 -1,33 +1,62 @@@
  package keepclient
  
  import (
+ 	"fmt"
  	. "gopkg.in/check.v1"
 +	"os"
 +	"os/exec"
  	"testing"
  )
  
 -// Hook up gocheck into the "go test" runner.
 +// Gocheck boilerplate
  func Test(t *testing.T) { TestingT(t) }
  
 +// Gocheck boilerplate
 +var _ = Suite(&MySuite{})
 +
 +// Our test fixture
  type MySuite struct{}
  
 -var _ = Suite(&MySuite{})
 +func (s *MySuite) SetUpSuite(c *C) {
 +	os.Chdir(os.ExpandEnv("$GOPATH../python"))
 +	exec.Command("python", "run_test_server.py", "start").Run()
 +}
 +
 +func (s *MySuite) TearDownSuite(c *C) {
 +	os.Chdir(os.ExpandEnv("$GOPATH../python"))
 +	exec.Command("python", "run_test_server.py", "stop").Run()
 +}
 +
 +func (s *MySuite) TestInit(c *C) {
 +	os.Setenv("ARVADOS_API_HOST", "localhost:3001")
 +	os.Setenv("ARVADOS_API_TOKEN", "12345")
 +	os.Setenv("ARVADOS_API_HOST_INSECURE", "")
 +	kc := InitKeepClient()
 +	c.Assert(kc.apiServer, Equals, "localhost:3001")
 +	c.Assert(kc.apiToken, Equals, "12345")
 +	c.Assert(kc.apiInsecure, Equals, false)
 +
 +	os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
 +	kc = InitKeepClient()
 +	c.Assert(kc.apiServer, Equals, "localhost:3001")
 +	c.Assert(kc.apiToken, Equals, "12345")
 +	c.Assert(kc.apiInsecure, Equals, true)
 +}
+ 
+ func (s *MySuite) TestGetKeepDisks(c *C) {
+ 	sr, err := KeepDisks()
+ 	c.Assert(err, Equals, nil)
+ 	c.Assert(len(sr), Equals, 2)
+ 	c.Assert(sr[0], Equals, "http://localhost:25107")
+ 	c.Assert(sr[1], Equals, "http://localhost:25108")
+ 
+ 	service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
+ 
+ 	// "foo" acbd18db4cc2f85cedef654fccc4a4d8
+ 	foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+ 	c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
+ 
+ 	// "bar" 37b51d194a7513e45b56f6524f2d51f2
+ 	bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+ 	c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
 -
+ }

commit 6132d8efbc522b71d0084160abaaa87031678bdc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 8 11:23:51 2014 -0400

    2798: Initial commit of Go Keep client library.

diff --git a/sdk/go/build.sh b/sdk/go/build.sh
new file mode 100755
index 0000000..ed95228
--- /dev/null
+++ b/sdk/go/build.sh
@@ -0,0 +1,37 @@
+#! /bin/sh
+
+# This script builds a Keep executable and installs it in
+# ./bin/keep.
+#
+# In idiomatic Go style, a user would install Keep with something
+# like:
+#
+#     go get arvados.org/keep
+#     go install arvados.org/keep
+#
+# which would download both the Keep source and any third-party
+# packages it depends on.
+#
+# Since the Keep source is bundled within the overall Arvados source,
+# "go get" is not the primary tool for delivering Keep source and this
+# process doesn't work.  Instead, this script sets the environment
+# properly and fetches any necessary dependencies by hand.
+
+if [ -z "$GOPATH" ]
+then
+    GOPATH=$(pwd)
+else
+    GOPATH=$(pwd):${GOPATH}
+fi
+
+export GOPATH
+
+set -o errexit   # fail if any command returns an error
+
+mkdir -p pkg
+mkdir -p bin
+go get gopkg.in/check.v1
+go install arvados.org/keepclient
+if ls -l pkg/*/arvados.org/keepclient.a ; then
+    echo "success!"
+fi
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
new file mode 100644
index 0000000..6d75425
--- /dev/null
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -0,0 +1,20 @@
+package keepclient
+
+import (
+	//"net/http"
+	"os"
+)
+
+type KeepClient struct {
+	apiServer   string
+	apiToken    string
+	apiInsecure bool
+}
+
+func InitKeepClient() *KeepClient {
+	kc := KeepClient{os.Getenv("ARVADOS_API_HOST"),
+		os.Getenv("ARVADOS_API_TOKEN"),
+		os.Getenv("ARVADOS_API_HOST_INSECURE") != ""}
+
+	return &kc
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
new file mode 100644
index 0000000..733cc26
--- /dev/null
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -0,0 +1,43 @@
+package keepclient
+
+import (
+	. "gopkg.in/check.v1"
+	"os"
+	"os/exec"
+	"testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) { TestingT(t) }
+
+// Gocheck boilerplate
+var _ = Suite(&MySuite{})
+
+// Our test fixture
+type MySuite struct{}
+
+func (s *MySuite) SetUpSuite(c *C) {
+	os.Chdir(os.ExpandEnv("$GOPATH../python"))
+	exec.Command("python", "run_test_server.py", "start").Run()
+}
+
+func (s *MySuite) TearDownSuite(c *C) {
+	os.Chdir(os.ExpandEnv("$GOPATH../python"))
+	exec.Command("python", "run_test_server.py", "stop").Run()
+}
+
+func (s *MySuite) TestInit(c *C) {
+	os.Setenv("ARVADOS_API_HOST", "localhost:3001")
+	os.Setenv("ARVADOS_API_TOKEN", "12345")
+	os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+	kc := InitKeepClient()
+	c.Assert(kc.apiServer, Equals, "localhost:3001")
+	c.Assert(kc.apiToken, Equals, "12345")
+	c.Assert(kc.apiInsecure, Equals, false)
+
+	os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+	kc = InitKeepClient()
+	c.Assert(kc.apiServer, Equals, "localhost:3001")
+	c.Assert(kc.apiToken, Equals, "12345")
+	c.Assert(kc.apiInsecure, Equals, true)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list