[ARVADOS] updated: 09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6
git at public.curoverse.com
git at public.curoverse.com
Tue May 13 20:47:53 EDT 2014
Summary of changes:
{services/keep => sdk/go}/build.sh | 9 +-
sdk/go/src/arvados.org/keepclient/keepclient.go | 432 ++++++++++++--------
.../src/arvados.org/keepclient/keepclient_test.go | 445 ++++++++++++++++++++-
3 files changed, 713 insertions(+), 173 deletions(-)
copy {services/keep => sdk/go}/build.sh (85%)
via 09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6 (commit)
via 99cd188a0cbd9143b690750e21abc8d8d5e6dbad (commit)
via 2a493a9215f604c63ab7bc6f0e0956d10af8ef10 (commit)
via 6132d8efbc522b71d0084160abaaa87031678bdc (commit)
from 66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 09559ebbb9f52c8b8a47e21d0a9c5720f3a2b7c6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 13 20:47:49 2014 -0400
2798: Added uploadToKeepServer() test
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index 657b300..93fcf4b 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -372,7 +372,7 @@ type UploadError struct {
url string
}
-func (this KeepClient) uploadToKeepServer(host string, hash string, readChannel chan<- ReadRequest, upload_status chan<- UploadError) {
+func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, upload_status chan<- UploadError) {
var req *http.Request
var err error
var url = fmt.Sprintf("%s/%s", host, hash)
@@ -382,7 +382,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, readChannel
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
- req.Body = MakeBufferReader(readChannel)
+ req.Body = body
var resp *http.Response
if resp, err = this.client.Do(req); err != nil {
@@ -421,7 +421,7 @@ func (this KeepClient) putReplicas(
for active < want_replicas {
// Start some upload requests
if next_server < len(sv) {
- go this.uploadToKeepServer(sv[next_server], hash, requests, upload_status)
+ go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status)
next_server += 1
active += 1
} else {
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 8f6813d..fd4ba7b 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -2,10 +2,13 @@ package keepclient
import (
"flag"
- //"fmt"
+ "fmt"
. "gopkg.in/check.v1"
"io"
- //"log"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
"os"
"os/exec"
"testing"
@@ -353,3 +356,101 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
status := <-reader_status
c.Check(status, Equals, io.ErrShortBuffer)
}
+
+type StubHandler struct {
+ c *C
+ expectPath string
+ expectApiToken string
+ expectBody string
+ handled chan bool
+}
+
+func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ this.c.Check(req.URL.Path, Equals, this.expectPath)
+ this.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", this.expectApiToken))
+ body, err := ioutil.ReadAll(req.Body)
+ this.c.Check(err, Equals, nil)
+ this.c.Check(body, DeepEquals, []byte(this.expectBody))
+ resp.WriteHeader(200)
+ this.handled <- true
+}
+
+func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
+ st := StubHandler{
+ c,
+ "/acbd18db4cc2f85cedef654fccc4a4d8",
+ "abc123",
+ "foo",
+ make(chan bool)}
+ server := http.Server{Handler: st}
+
+ listener, _ := net.ListenTCP("tcp", &net.TCPAddr{Port: 2999})
+ defer listener.Close()
+
+ log.Printf("%s", listener.Addr().String())
+
+ go server.Serve(listener)
+ kc, _ := MakeKeepClient()
+ kc.ApiToken = "abc123"
+
+ reader, writer := io.Pipe()
+ upload_status := make(chan UploadError)
+
+ go kc.uploadToKeepServer("http://localhost:2999", "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+
+ writer.Write([]byte("foo"))
+ writer.Close()
+
+ <-st.handled
+ status := <-upload_status
+ c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+}
+
+type FailHandler struct {
+ handled chan bool
+}
+
+func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ resp.WriteHeader(400)
+ this.handled <- true
+}
+
+/*func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
+ log.Printf("blup")
+
+ c.Check(true, Equals, false)
+
+ log.Printf("blug")
+
+ st := FailHandler{make(chan bool)}
+ server := http.Server{Handler: st}
+
+ listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
+ defer listener.Close()
+
+ go server.Serve(listener)
+ kc, _ := MakeKeepClient()
+ kc.ApiToken = "abc123"
+
+ reader, writer := io.Pipe()
+ upload_status := make(chan UploadError)
+
+ go kc.uploadToKeepServer(fmt.Sprintf("http://localhost:%s", listener.Addr().String()), "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+
+ log.Printf("Writing 1")
+
+ writer.Write([]byte("foo"))
+
+ log.Printf("Writing 2")
+
+ writer.Close()
+
+ log.Printf("Writing 3")
+
+ <-st.handled
+
+ log.Printf("Handled?!")
+
+ status := <-upload_status
+ c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
+}*/
commit 99cd188a0cbd9143b690750e21abc8d8d5e6dbad
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 13 16:07:59 2014 -0400
2798: Checkpoint commit, tests for ReadIntoBuffer() and Transfer() pass.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index f9dce5f..657b300 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -1,10 +1,14 @@
package keepclient
import (
+ "crypto/md5"
"crypto/tls"
"encoding/json"
+ "errors"
"fmt"
"io"
+ "io/ioutil"
+ "log"
"net/http"
"os"
"sort"
@@ -16,6 +20,8 @@ type KeepClient struct {
ApiToken string
ApiInsecure bool
Service_roots []string
+ Want_replicas int
+ client *http.Client
}
type KeepDisk struct {
@@ -30,50 +36,68 @@ func MakeKeepClient() (kc *KeepClient, err error) {
ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")}
- if err := kc.DiscoverKeepDisks(); err != nil {
- return nil, err
+ tr := &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure},
}
- return kc, nil
+ kc.client = &http.Client{Transport: tr}
+
+ err = kc.DiscoverKeepDisks()
+
+ return kc, err
}
func (this *KeepClient) DiscoverKeepDisks() error {
- tr := &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: this.ApiInsecure},
- }
- client := &http.Client{Transport: tr}
-
+ // Construct request of keep disk list
var req *http.Request
var err error
- if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
+ if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
return err
}
- var resp *http.Response
+ // Add api token header
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
- if resp, err = client.Do(req); err != nil {
+
+ // Make the request
+ var resp *http.Response
+ if resp, err = this.client.Do(req); err != nil {
return err
}
type SvcList struct {
Items []KeepDisk `json:"items"`
}
+
+ // Decode json reply
dec := json.NewDecoder(resp.Body)
var m SvcList
if err := dec.Decode(&m); err != nil {
return err
}
- this.Service_roots = make([]string, len(m.Items))
- for index, element := range m.Items {
+ listed := make(map[string]bool)
+ this.Service_roots = make([]string, 0, len(m.Items))
+
+ for _, element := range m.Items {
n := ""
if element.SSL {
n = "s"
}
- this.Service_roots[index] = fmt.Sprintf("http%s://%s:%d",
- n, element.Hostname, element.Port)
+
+ // Construct server URL
+ url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
+
+ // Skip duplicates
+ if !listed[url] {
+ listed[url] = true
+ this.Service_roots = append(this.Service_roots, url)
+ }
}
+
+ // Must be sorted for ShuffledServiceRoots() to produce consistent
+ // results.
sort.Strings(this.Service_roots)
+
return nil
}
@@ -130,92 +154,125 @@ type ReaderSlice struct {
reader_error error
}
-type Source <-chan ReaderSlice
-type Sink chan<- ReaderSlice
-type Status chan error
-
// Read repeatedly from the reader into the specified buffer, and report each
-// read to channel 'c'. Completes when Reader 'r' reports an error and closes
-// channel 'c'.
-func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
- defer close(c)
+// read to channel 'c'. Completes when Reader 'r' reports on the error channel
+// and closes channel 'c'.
+func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
+ defer close(slices)
// Initially use entire buffer as scratch space
ptr := buffer[:]
- for len(ptr) > 0 {
- // Read into the scratch space
- n, err := r.Read(ptr)
+ for {
+ log.Printf("ReadIntoBuffer doing read")
+ var n int
+ var err error
+ if len(ptr) > 0 {
+ // Read into the scratch space
+ n, err = r.Read(ptr)
+ } else {
+ // Ran out of scratch space, try reading one more byte
+ var b [1]byte
+ n, err = r.Read(b[:])
+
+ if n > 0 {
+ // Reader has more data but we have nowhere to
+ // put it, so we're stuffed
+ slices <- ReaderSlice{nil, io.ErrShortBuffer}
+ } else {
+ // Return some other error (hopefully EOF)
+ slices <- ReaderSlice{nil, err}
+ }
+ return
+ }
// End on error (includes EOF)
if err != nil {
- c <- ReaderSlice{nil, err}
+ log.Printf("ReadIntoBuffer sending error %d %s", n, err.Error())
+ slices <- ReaderSlice{nil, err}
return
}
- // Make a slice with the contents of the read
- c <- ReaderSlice{ptr[:n], nil}
+ log.Printf("ReadIntoBuffer got %d", n)
- // Adjust the scratch space slice
- ptr = ptr[n:]
- }
- if len(ptr) == 0 {
- c <- ReaderSlice{nil, io.ErrShortBuffer}
+ if n > 0 {
+ log.Printf("ReadIntoBuffer sending readerslice")
+ // Make a slice with the contents of the read
+ slices <- ReaderSlice{ptr[:n], nil}
+ log.Printf("ReadIntoBuffer sent readerslice")
+
+ // Adjust the scratch space slice
+ ptr = ptr[n:]
+ }
}
}
-// Take slices from 'source' channel and write them to Writer 'w'. Reports read
-// or write errors on 'status'. Completes when 'source' channel is closed.
-/*func SinkWriter(source Source, w io.Writer, status Status) {
- can_write = true
+// A read request to the Transfer() function
+type ReadRequest struct {
+ offset int
+ p []byte
+ result chan<- ReadResult
+}
- for {
- // Get the next block from the source
- rs, valid := <-source
-
- if valid {
- if rs.error != nil {
- // propagate reader status (should only be EOF)
- status <- rs.error
- } else if can_write {
- buf := rs.slice[:]
- for len(buf) > 0 {
- n, err := w.Write(buf)
- buf = buf[n:]
- if err == io.ErrShortWrite {
- // short write, so go around again
- } else if err != nil {
- // some other write error,
- // propagate error and stop
- // further writes
- status <- err
- can_write = false
- }
- }
- }
- } else {
- // source channel closed
- break
- }
+// A read result from the Transfer() function
+type ReadResult struct {
+ n int
+ err error
+}
+
+// Reads from the buffer managed by the Transfer()
+type BufferReader struct {
+ offset *int
+ requests chan<- ReadRequest
+ responses chan ReadResult
+}
+
+func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
+ return BufferReader{new(int), requests, make(chan ReadResult)}
+}
+
+// Reads from the buffer managed by the Transfer()
+func (this BufferReader) Read(p []byte) (n int, err error) {
+ this.requests <- ReadRequest{*this.offset, p, this.responses}
+ rr, valid := <-this.responses
+ if valid {
+ *this.offset += rr.n
+ return rr.n, rr.err
+ } else {
+ return 0, io.ErrUnexpectedEOF
}
-}*/
+}
-func closeSinks(sinks_slice []Sink) {
- for _, s := range sinks_slice {
- close(s)
+// Close the responses channel
+func (this BufferReader) Close() error {
+ close(this.responses)
+ return nil
+}
+
+// Handle a read request. Returns true if a response was sent, and false if
+// the request should be queued.
+func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
+ log.Printf("HandleReadRequest %d %d %t", req.offset, len(body), complete)
+ if req.offset < len(body) {
+ req.result <- ReadResult{copy(req.p, body[req.offset:]), nil}
+ return true
+ } else if complete && req.offset >= len(body) {
+ req.result <- ReadResult{0, io.EOF}
+ return true
+ } else {
+ return false
}
}
-// Transfer data from a source (either an already-filled buffer, or a reader)
-// into one or more 'sinks'. If 'source' is valid, it will read from the
-// reader into the buffer and send the data to the sinks. Otherwise 'buffer'
-// it will just send the contents of the buffer to the sinks. Completes when
-// the 'sinks' channel is closed.
-func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
+// If 'source_reader' is not nil, reads data from 'source_reader' and stores it
+// in the provided buffer. Otherwise, use the contents of 'buffer' as is.
+// Accepts read requests on the buffer on the 'requests' channel. Completes
+// when 'requests' channel is closed.
+func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan ReadRequest, reader_error chan error) {
// currently buffered data
var body []byte
// for receiving slices from ReadIntoBuffer
- var slices chan []byte = nil
+ var slices chan ReaderSlice = nil
// indicates whether the buffered data is complete
var complete bool = false
@@ -224,47 +281,45 @@ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink,
// 'body' is the buffer slice representing the body content read so far
body = source_buffer[:0]
- // used to communicate slices of the buffer as read
- reader_slices := make(chan []ReaderSlice)
+ // used to communicate slices of the buffer as they are
+ slices = make(chan ReaderSlice)
// Spin it off
- go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
+ go ReadIntoBuffer(source_buffer, source_reader, slices)
} else {
// use the whole buffer
body = source_buffer[:]
- // that's it
+ // buffer is complete
complete = true
}
- // list of sinks to send to
- sinks_slice := make([]Sink, 0)
- defer closeSinks(sinks_slice)
+ pending_requests := make([]ReadRequest, 0)
for {
+ log.Printf("Doing select")
select {
- case s, valid := <-sinks:
+ case req, valid := <-requests:
+ log.Printf("Got read request")
+ // Handle a buffer read request
if valid {
- // add to the sinks slice
- sinks_slice = append(sinks_slice, s)
-
- // catch up the sink with the current body contents
- if len(body) > 0 {
- s <- ReaderSlice{body, nil}
- if complete {
- s <- ReaderSlice{nil, io.EOF}
- }
+ if !HandleReadRequest(req, body, complete) {
+ log.Printf("Queued")
+ pending_requests = append(pending_requests, req)
}
} else {
- // closed 'sinks' channel indicates we're done
+ // closed 'requests' channel indicates we're done
return
}
case bk, valid := <-slices:
+ // Got a new slice from the reader
if valid {
- if bk.err != nil {
- reader_error <- bk.err
- if bk.err == io.EOF {
+ log.Printf("Got readerslice %d", len(bk.slice))
+
+ if bk.reader_error != nil {
+ reader_error <- bk.reader_error
+ if bk.reader_error == io.EOF {
// EOF indicates the reader is done
// sending, so our buffer is complete.
complete = true
@@ -279,86 +334,95 @@ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink,
body = source_buffer[0 : len(body)+len(bk.slice)]
}
- // send the new slice to the sinks
- for _, s := range sinks_slice {
- s <- bk
- }
-
- if complete {
- // got an EOF, so close the sinks
- closeSinks(sinks_slice)
+ // handle pending reads
+ n := 0
+ for n < len(pending_requests) {
+ if HandleReadRequest(pending_requests[n], body, complete) {
+ log.Printf("ReadRequest handled")
+
+ // move the element from the
+ // back of the slice to
+ // position 'n', then shorten
+ // the slice by one element
+ pending_requests[n] = pending_requests[len(pending_requests)-1]
+ pending_requests = pending_requests[0 : len(pending_requests)-1]
+ } else {
+ log.Printf("ReadRequest re-queued")
- // truncate sinks slice
- sinks_slice = sinks_slice[:0]
+ // Request wasn't handled, so keep it in the request slice
+ n += 1
+ }
}
} else {
- // no more reads
- slices = nil
+ if complete {
+ // no more reads
+ slices = nil
+ } else {
+ // reader channel closed without signaling EOF
+ reader_error <- io.ErrUnexpectedEOF
+ return
+ }
}
}
}
}
-func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
- pipereader, pipewriter := io.Pipe()
+type UploadError struct {
+ err error
+ url string
+}
+func (this KeepClient) uploadToKeepServer(host string, hash string, readChannel chan<- ReadRequest, upload_status chan<- UploadError) {
var req *http.Request
- if req, err = http.NewRequest("POST", url, nil); err != nil {
- write_status <- err
+ var err error
+ var url = fmt.Sprintf("%s/%s", host, hash)
+ if req, err = http.NewRequest("PUT", url, nil); err != nil {
+ upload_status <- UploadError{err, url}
+ return
}
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
- req.Body = pipereader
- // create channel to transfer slices from reader to writer
- tr := make(chan ReaderSlice)
-
- // start the writer goroutine
- go SinkWriter(tr, pipewriter, write_status)
-
- // now transfer the channel to the reader goroutine
- sinks <- tr
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ req.Body = MakeBufferReader(readChannel)
var resp *http.Response
+ if resp, err = this.client.Do(req); err != nil {
+ upload_status <- UploadError{err, url}
+ }
- if resp, err = client.Do(req); err != nil {
- return nil, err
+ if resp.StatusCode == http.StatusOK {
+ upload_status <- UploadError{io.EOF, url}
}
}
-var KeepWriteError = errors.new("Could not write sufficient replicas")
+var KeepWriteError = errors.New("Could not write sufficient replicas")
+
+func (this KeepClient) putReplicas(
+ hash string,
+ requests chan ReadRequest,
+ reader_status chan error) error {
-func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
- // Calculate the ordering to try writing to servers
+ // Calculate the ordering for uploading to servers
sv := this.ShuffledServiceRoots(hash)
// The next server to try contacting
- n := 0
+ next_server := 0
// The number of active writers
active := 0
- // Used to buffer reads from 'r'
- buffer := make([]byte, 64*1024*1024)
+ // Used to communicate status from the upload goroutines
+ upload_status := make(chan UploadError)
+ defer close(upload_status)
- // Used to send writers to the reader goroutine
- sinks := make(chan Sink)
- defer close(sinks)
-
- // Used to communicate status from the reader goroutine
- reader_status := make(chan error)
-
- // Start the reader goroutine
- go Transfer(buffer, r, sinks, reader_status)
-
- // Used to communicate status from the writer goroutines
- write_status := make(chan error)
+ // Desired number of replicas
+ want_replicas := this.Want_replicas
for want_replicas > 0 {
for active < want_replicas {
- // Start some writers
- if n < len(sv) {
- go this.ConnectToKeepServer(sv[n], sinks, write_status)
- n += 1
+ // Start some upload requests
+ if next_server < len(sv) {
+ go this.uploadToKeepServer(sv[next_server], hash, requests, upload_status)
+ next_server += 1
active += 1
} else {
return KeepWriteError
@@ -374,14 +438,58 @@ func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) erro
// bad news
return status
}
- case status := <-write_status:
- if status == io.EOF {
+ case status := <-upload_status:
+ if status.err == io.EOF {
// good news!
want_replicas -= 1
} else {
- // writing to keep server failed for some reason.
+ // writing to keep server failed for some reason
+ log.Printf("Got error %s uploading to %s", status.err, status.url)
}
active -= 1
}
}
+
+ return nil
+}
+
+func (this KeepClient) PutHR(hash string, r io.Reader) error {
+
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 64*1024*1024)
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Reporting reader error states
+ reader_status := make(chan error)
+
+ // Start the transfer goroutine
+ go Transfer(buffer, r, requests, reader_status)
+
+ return this.putReplicas(hash, requests, reader_status)
+}
+
+func (this KeepClient) PutHB(hash string, buffer []byte) error {
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Start the transfer goroutine
+ go Transfer(buffer, nil, requests, nil)
+
+ return this.putReplicas(hash, requests, nil)
+}
+
+func (this KeepClient) PutB(buffer []byte) error {
+ return this.PutHB(fmt.Sprintf("%x", md5.Sum(buffer)), buffer)
+}
+
+func (this KeepClient) PutR(r io.Reader) error {
+ if buffer, err := ioutil.ReadAll(r); err != nil {
+ return err
+ } else {
+ return this.PutB(buffer)
+ }
}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 13a203d..8f6813d 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -1,62 +1,355 @@
package keepclient
import (
- "fmt"
+ "flag"
+ //"fmt"
. "gopkg.in/check.v1"
+ "io"
+ //"log"
"os"
"os/exec"
"testing"
+ "time"
)
// Gocheck boilerplate
func Test(t *testing.T) { TestingT(t) }
// Gocheck boilerplate
-var _ = Suite(&MySuite{})
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&StandaloneSuite{})
-// Our test fixture
-type MySuite struct{}
+var no_server = flag.Bool("no-server", false, "Skip 'ServerRequireSuite'")
-func (s *MySuite) SetUpSuite(c *C) {
- os.Chdir(os.ExpandEnv("$GOPATH../python"))
- exec.Command("python", "run_test_server.py", "start").Run()
+// Tests that require the Keep server running
+type ServerRequiredSuite struct{}
+
+// Standalone tests
+type StandaloneSuite struct{}
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+ if *no_server {
+ c.Skip("Skipping tests that require server")
+ } else {
+ os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "start").Run()
+ exec.Command("python", "run_test_server.py", "start_keep").Run()
+ }
}
-func (s *MySuite) TearDownSuite(c *C) {
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "stop_keep").Run()
exec.Command("python", "run_test_server.py", "stop").Run()
}
-func (s *MySuite) TestInit(c *C) {
+func (s *ServerRequiredSuite) TestInit(c *C) {
os.Setenv("ARVADOS_API_HOST", "localhost:3001")
- os.Setenv("ARVADOS_API_TOKEN", "12345")
+ os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
os.Setenv("ARVADOS_API_HOST_INSECURE", "")
- kc := InitKeepClient()
- c.Assert(kc.apiServer, Equals, "localhost:3001")
- c.Assert(kc.apiToken, Equals, "12345")
- c.Assert(kc.apiInsecure, Equals, false)
+
+ kc, err := MakeKeepClient()
+ c.Assert(kc.ApiServer, Equals, "localhost:3001")
+ c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+ c.Assert(kc.ApiInsecure, Equals, false)
os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
- kc = InitKeepClient()
- c.Assert(kc.apiServer, Equals, "localhost:3001")
- c.Assert(kc.apiToken, Equals, "12345")
- c.Assert(kc.apiInsecure, Equals, true)
-}
-func (s *MySuite) TestGetKeepDisks(c *C) {
- sr, err := KeepDisks()
+ kc, err = MakeKeepClient()
+ c.Assert(kc.ApiServer, Equals, "localhost:3001")
+ c.Assert(kc.ApiToken, Equals, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+ c.Assert(kc.ApiInsecure, Equals, true)
+
c.Assert(err, Equals, nil)
- c.Assert(len(sr), Equals, 2)
- c.Assert(sr[0], Equals, "http://localhost:25107")
- c.Assert(sr[1], Equals, "http://localhost:25108")
+ c.Assert(len(kc.Service_roots), Equals, 2)
+ c.Assert(kc.Service_roots[0], Equals, "http://localhost:25107")
+ c.Assert(kc.Service_roots[1], Equals, "http://localhost:25108")
+}
- service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
+func (s *StandaloneSuite) TestShuffleServiceRoots(c *C) {
+ kc := KeepClient{Service_roots: []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}}
// "foo" acbd18db4cc2f85cedef654fccc4a4d8
foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
- c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
+ c.Check(kc.ShuffledServiceRoots("acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
// "bar" 37b51d194a7513e45b56f6524f2d51f2
bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
- c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
+ c.Check(kc.ShuffledServiceRoots("37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
+}
+
+func ReadIntoBufferHelper(c *C, bufsize int) {
+ buffer := make([]byte, bufsize)
+
+ reader, writer := io.Pipe()
+ slices := make(chan ReaderSlice)
+
+ go ReadIntoBuffer(buffer, reader, slices)
+
+ {
+ out := make([]byte, 128)
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 128)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 128; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 96)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 96; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 96) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ writer.Close()
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.EOF)
+ }
+}
+
+func (s *StandaloneSuite) TestReadIntoBuffer(c *C) {
+ ReadIntoBufferHelper(c, 512)
+ ReadIntoBufferHelper(c, 225)
+ ReadIntoBufferHelper(c, 224)
+}
+
+func (s *StandaloneSuite) TestReadIntoShortBuffer(c *C) {
+ buffer := make([]byte, 223)
+ reader, writer := io.Pipe()
+ slices := make(chan ReaderSlice)
+
+ go ReadIntoBuffer(buffer, reader, slices)
+
+ {
+ out := make([]byte, 128)
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+ writer.Write(out)
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 128)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 128; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ out := make([]byte, 96)
+ for i := 0; i < 96; i += 1 {
+ out[i] = byte(i / 2)
+ }
+
+ // Write will deadlock because it can't write all the data, so
+ // spin it off to a goroutine
+ go writer.Write(out)
+ s1 := <-slices
+
+ c.Check(len(s1.slice), Equals, 95)
+ c.Check(s1.reader_error, Equals, nil)
+ for i := 0; i < 95; i += 1 {
+ c.Check(s1.slice[i], Equals, byte(i/2))
+ }
+ for i := 0; i < len(buffer); i += 1 {
+ if i < 128 {
+ c.Check(buffer[i], Equals, byte(i))
+ } else if i < (128 + 95) {
+ c.Check(buffer[i], Equals, byte((i-128)/2))
+ } else {
+ c.Check(buffer[i], Equals, byte(0))
+ }
+ }
+ }
+ {
+ writer.Close()
+ s1 := <-slices
+ c.Check(len(s1.slice), Equals, 0)
+ c.Check(s1.reader_error, Equals, io.ErrShortBuffer)
+ }
+
+}
+
+func (s *StandaloneSuite) TestTransfer(c *C) {
+ reader, writer := io.Pipe()
+
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 512)
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Reporting reader error states
+ reader_status := make(chan error)
+
+ go Transfer(buffer, reader, requests, reader_status)
+
+ br1 := MakeBufferReader(requests)
+ out := make([]byte, 128)
+
+ {
+ // Write some data, and read into a buffer shorter than
+ // available data
+ for i := 0; i < 128; i += 1 {
+ out[i] = byte(i)
+ }
+
+ writer.Write(out[:100])
+
+ in := make([]byte, 64)
+ n, err := br1.Read(in)
+
+ c.Check(n, Equals, 64)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 64; i += 1 {
+ c.Check(in[i], Equals, out[i])
+ }
+ }
+
+ {
+ // Write some more data, and read into buffer longer than
+ // available data
+ in := make([]byte, 64)
+ n, err := br1.Read(in)
+ c.Check(n, Equals, 36)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 36; i += 1 {
+ c.Check(in[i], Equals, out[64+i])
+ }
+
+ }
+
+ {
+ // Test read before write
+ type Rd struct {
+ n int
+ err error
+ }
+ rd := make(chan Rd)
+ in := make([]byte, 64)
+
+ go func() {
+ n, err := br1.Read(in)
+ rd <- Rd{n, err}
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+ writer.Write(out[100:])
+
+ got := <-rd
+
+ c.Check(got.n, Equals, 28)
+ c.Check(got.err, Equals, nil)
+
+ for i := 0; i < 28; i += 1 {
+ c.Check(in[i], Equals, out[100+i])
+ }
+ }
+
+ br2 := MakeBufferReader(requests)
+ {
+ // Test 'catch up' reader
+ in := make([]byte, 256)
+ n, err := br2.Read(in)
+
+ c.Check(n, Equals, 128)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 128; i += 1 {
+ c.Check(in[i], Equals, out[i])
+ }
+ }
+
+ {
+ // Test closing the reader
+ writer.Close()
+ status := <-reader_status
+ c.Check(status, Equals, io.EOF)
+
+ in := make([]byte, 256)
+ n1, err1 := br1.Read(in)
+ n2, err2 := br2.Read(in)
+ c.Check(n1, Equals, 0)
+ c.Check(err1, Equals, io.EOF)
+ c.Check(n2, Equals, 0)
+ c.Check(err2, Equals, io.EOF)
+ }
+
+ {
+ // Test 'catch up' reader after closing
+ br3 := MakeBufferReader(requests)
+ in := make([]byte, 256)
+ n, err := br3.Read(in)
+
+ c.Check(n, Equals, 128)
+ c.Check(err, Equals, nil)
+
+ for i := 0; i < 128; i += 1 {
+ c.Check(in[i], Equals, out[i])
+ }
+
+ n, err = br3.Read(in)
+
+ c.Check(n, Equals, 0)
+ c.Check(err, Equals, io.EOF)
+ }
+}
+
+func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
+ reader, writer := io.Pipe()
+
+ // Buffer for reads from 'r'
+ buffer := make([]byte, 100)
+
+ // Read requests on Transfer() buffer
+ requests := make(chan ReadRequest)
+ defer close(requests)
+
+ // Reporting reader error states
+ reader_status := make(chan error)
+
+ go Transfer(buffer, reader, requests, reader_status)
+
+ out := make([]byte, 101)
+ go writer.Write(out)
+
+ status := <-reader_status
+ c.Check(status, Equals, io.ErrShortBuffer)
}
commit 2a493a9215f604c63ab7bc6f0e0956d10af8ef10
Merge: 6132d8e 66d5cdb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 13 09:07:05 2014 -0400
2798: Merged branch with code to read environment variables with branch working on Put support.
Merge remote-tracking branch 'origin/2798-go-keep-client' into 2798-go-keep-client
Conflicts:
sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go
diff --cc sdk/go/src/arvados.org/keepclient/keepclient.go
index 6d75425,073a76e..f9dce5f
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@@ -1,20 -1,379 +1,387 @@@
package keepclient
import (
- //"net/http"
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "sort"
+ "strconv"
)
type KeepClient struct {
- apiServer string
- apiToken string
- apiInsecure bool
- Service_roots []string
++ ApiServer string
+ ApiToken string
++ ApiInsecure bool
++ Service_roots []string
}
- func InitKeepClient() *KeepClient {
- kc := KeepClient{os.Getenv("ARVADOS_API_HOST"),
- os.Getenv("ARVADOS_API_TOKEN"),
- os.Getenv("ARVADOS_API_HOST_INSECURE") != ""}
+ type KeepDisk struct {
+ Hostname string `json:"service_host"`
+ Port int `json:"service_port"`
+ SSL bool `json:"service_ssl_flag"`
+ }
+
+ func MakeKeepClient() (kc *KeepClient, err error) {
- kc := KeepClient{}
- err := kc.DiscoverKeepDisks()
- if err != nil {
++ kc = &KeepClient{
++ ApiServer: os.Getenv("ARVADOS_API_HOST"),
++ ApiToken: os.Getenv("ARVADOS_API_TOKEN"),
++ ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")}
++
++ if err := kc.DiscoverKeepDisks(); err != nil {
+ return nil, err
+ }
- return &kc, nil
++
++ return kc, nil
+ }
+
+ func (this *KeepClient) DiscoverKeepDisks() error {
+ tr := &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
++ TLSClientConfig: &tls.Config{InsecureSkipVerify: this.ApiInsecure},
+ }
+ client := &http.Client{Transport: tr}
+
+ var req *http.Request
++ var err error
+ if req, err = http.NewRequest("GET", "https://localhost:3001/arvados/v1/keep_disks", nil); err != nil {
- return nil, err
++ return err
+ }
+
+ var resp *http.Response
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ if resp, err = client.Do(req); err != nil {
- return nil, err
++ return err
+ }
+
+ type SvcList struct {
+ Items []KeepDisk `json:"items"`
+ }
+ dec := json.NewDecoder(resp.Body)
+ var m SvcList
+ if err := dec.Decode(&m); err != nil {
- return nil, err
++ return err
+ }
+
- this.service_roots = make([]string, len(m.Items))
++ this.Service_roots = make([]string, len(m.Items))
+ for index, element := range m.Items {
+ n := ""
+ if element.SSL {
+ n = "s"
+ }
- this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
++ this.Service_roots[index] = fmt.Sprintf("http%s://%s:%d",
+ n, element.Hostname, element.Port)
+ }
- sort.Strings(this.service_roots)
++ sort.Strings(this.Service_roots)
+ return nil
+ }
+
+ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
+ // Build an ordering with which to query the Keep servers based on the
+ // contents of the hash. "hash" is a hex-encoded number at least 8
+ // digits (32 bits) long
+
+ // seed used to calculate the next keep server from 'pool' to be added
+ // to 'pseq'
+ seed := hash
+
+ // Keep servers still to be added to the ordering
+ pool := make([]string, len(this.Service_roots))
+ copy(pool, this.Service_roots)
+
+ // output probe sequence
+ pseq = make([]string, 0, len(this.Service_roots))
+
+ // iterate while there are servers left to be assigned
+ for len(pool) > 0 {
+
+ if len(seed) < 8 {
+ // ran out of digits in the seed
+ if len(pseq) < (len(hash) / 4) {
+ // the number of servers added to the probe
+ // sequence is less than the number of 4-digit
+ // slices in 'hash' so refill the seed with the
+ // last 4 digits.
+ seed = hash[len(hash)-4:]
+ }
+ seed += hash
+ }
+
+ // Take the next 8 digits (32 bytes) and interpret as an integer,
+ // then modulus with the size of the remaining pool to get the next
+ // selected server.
+ probe, _ := strconv.ParseUint(seed[0:8], 16, 32)
+ probe %= uint64(len(pool))
+
+ // Append the selected server to the probe sequence and remove it
+ // from the pool.
+ pseq = append(pseq, pool[probe])
+ pool = append(pool[:probe], pool[probe+1:]...)
+
+ // Remove the digits just used from the seed
+ seed = seed[8:]
+ }
+ return pseq
+ }
+
+ type ReaderSlice struct {
+ slice []byte
+ reader_error error
+ }
+
+ type Source <-chan ReaderSlice
+ type Sink chan<- ReaderSlice
+ type Status chan error
+
+ // Read repeatedly from the reader into the specified buffer, and report each
+ // read to channel 'c'. Completes when Reader 'r' reports an error and closes
+ // channel 'c'.
+ func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
+ defer close(c)
+
+ // Initially use entire buffer as scratch space
+ ptr := buffer[:]
+ for len(ptr) > 0 {
- v // Read into the scratch space
++ // Read into the scratch space
+ n, err := r.Read(ptr)
+
+ // End on error (includes EOF)
+ if err != nil {
+ c <- ReaderSlice{nil, err}
+ return
+ }
+
+ // Make a slice with the contents of the read
+ c <- ReaderSlice{ptr[:n], nil}
+
+ // Adjust the scratch space slice
+ ptr = ptr[n:]
+ }
+ if len(ptr) == 0 {
+ c <- ReaderSlice{nil, io.ErrShortBuffer}
+ }
+ }
+
+ // Take slices from 'source' channel and write them to Writer 'w'. Reports read
+ // or write errors on 'status'. Completes when 'source' channel is closed.
-func SinkWriter(source Source, w io.Writer, status Status) {
++/*func SinkWriter(source Source, w io.Writer, status Status) {
+ can_write = true
+
+ for {
+ // Get the next block from the source
+ rs, valid := <-source
+
+ if valid {
+ if rs.error != nil {
+ // propagate reader status (should only be EOF)
+ status <- rs.error
+ } else if can_write {
+ buf := rs.slice[:]
+ for len(buf) > 0 {
+ n, err := w.Write(buf)
+ buf = buf[n:]
+ if err == io.ErrShortWrite {
+ // short write, so go around again
+ } else if err != nil {
+ // some other write error,
+ // propagate error and stop
+ // further writes
+ status <- err
+ can_write = false
+ }
+ }
+ }
+ } else {
+ // source channel closed
+ break
+ }
+ }
-}
++}*/
+
+ func closeSinks(sinks_slice []Sink) {
+ for _, s := range sinks_slice {
+ close(s)
+ }
+ }
+
+ // Transfer data from a source (either an already-filled buffer, or a reader)
+ // into one or more 'sinks'. If 'source' is valid, it will read from the
+ // reader into the buffer and send the data to the sinks. Otherwise 'buffer'
+ // it will just send the contents of the buffer to the sinks. Completes when
+ // the 'sinks' channel is closed.
+ func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
+ // currently buffered data
+ var body []byte
+
+ // for receiving slices from ReadIntoBuffer
+ var slices chan []byte = nil
+
+ // indicates whether the buffered data is complete
+ var complete bool = false
+
- if source != nil {
++ if source_reader != nil {
+ // 'body' is the buffer slice representing the body content read so far
+ body = source_buffer[:0]
+
+ // used to communicate slices of the buffer as read
+ reader_slices := make(chan []ReaderSlice)
+
+ // Spin it off
+ go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
+ } else {
+ // use the whole buffer
+ body = source_buffer[:]
+
+ // that's it
+ complete = true
+ }
+
+ // list of sinks to send to
+ sinks_slice := make([]Sink, 0)
+ defer closeSinks(sinks_slice)
+
+ for {
+ select {
+ case s, valid := <-sinks:
+ if valid {
+ // add to the sinks slice
+ sinks_slice = append(sinks_slice, s)
+
+ // catch up the sink with the current body contents
+ if len(body) > 0 {
+ s <- ReaderSlice{body, nil}
+ if complete {
+ s <- ReaderSlice{nil, io.EOF}
+ }
+ }
+ } else {
+ // closed 'sinks' channel indicates we're done
+ return
+ }
+
+ case bk, valid := <-slices:
+ if valid {
+ if bk.err != nil {
+ reader_error <- bk.err
+ if bk.err == io.EOF {
+ // EOF indicates the reader is done
+ // sending, so our buffer is complete.
+ complete = true
+ } else {
+ // some other reader error
+ return
+ }
+ }
+
+ if bk.slice != nil {
+ // adjust body bounds now that another slice has been read
+ body = source_buffer[0 : len(body)+len(bk.slice)]
+ }
+
+ // send the new slice to the sinks
+ for _, s := range sinks_slice {
+ s <- bk
+ }
+
+ if complete {
+ // got an EOF, so close the sinks
+ closeSinks(sinks_slice)
+
+ // truncate sinks slice
+ sinks_slice = sinks_slice[:0]
+ }
+ } else {
+ // no more reads
+ slices = nil
+ }
+ }
+ }
+ }
+
+ func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
+ pipereader, pipewriter := io.Pipe()
+
+ var req *http.Request
+ if req, err = http.NewRequest("POST", url, nil); err != nil {
+ write_status <- err
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ req.Body = pipereader
+
+ // create channel to transfer slices from reader to writer
+ tr := make(chan ReaderSlice)
+
+ // start the writer goroutine
+ go SinkWriter(tr, pipewriter, write_status)
+
+ // now transfer the channel to the reader goroutine
+ sinks <- tr
+
+ var resp *http.Response
+
+ if resp, err = client.Do(req); err != nil {
+ return nil, err
+ }
+ }
+
+ var KeepWriteError = errors.new("Could not write sufficient replicas")
+
+ func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
+ // Calculate the ordering to try writing to servers
+ sv := this.ShuffledServiceRoots(hash)
+
+ // The next server to try contacting
+ n := 0
+
+ // The number of active writers
+ active := 0
+
+ // Used to buffer reads from 'r'
+ buffer := make([]byte, 64*1024*1024)
+
+ // Used to send writers to the reader goroutine
+ sinks := make(chan Sink)
+ defer close(sinks)
+
+ // Used to communicate status from the reader goroutine
+ reader_status := make(chan error)
+
+ // Start the reader goroutine
+ go Transfer(buffer, r, sinks, reader_status)
+
+ // Used to communicate status from the writer goroutines
+ write_status := make(chan error)
+
+ for want_replicas > 0 {
+ for active < want_replicas {
+ // Start some writers
+ if n < len(sv) {
+ go this.ConnectToKeepServer(sv[n], sinks, write_status)
+ n += 1
+ active += 1
+ } else {
+ return KeepWriteError
+ }
+ }
- return &kc
+ // Now wait for something to happen.
+ select {
+ case status := <-reader_status:
+ if status == io.EOF {
+ // good news!
+ } else {
+ // bad news
+ return status
+ }
+ case status := <-write_status:
+ if status == io.EOF {
+ // good news!
+ want_replicas -= 1
+ } else {
+ // writing to keep server failed for some reason.
+ }
+ active -= 1
+ }
+ }
}
diff --cc sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 733cc26,bc719c0..13a203d
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@@ -1,43 -1,33 +1,62 @@@
package keepclient
import (
+ "fmt"
. "gopkg.in/check.v1"
+ "os"
+ "os/exec"
"testing"
)
-// Hook up gocheck into the "go test" runner.
+// Gocheck boilerplate
func Test(t *testing.T) { TestingT(t) }
+// Gocheck boilerplate
+var _ = Suite(&MySuite{})
+
+// Our test fixture
type MySuite struct{}
-var _ = Suite(&MySuite{})
+func (s *MySuite) SetUpSuite(c *C) {
+ os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "start").Run()
+}
+
+func (s *MySuite) TearDownSuite(c *C) {
+ os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "stop").Run()
+}
+
+func (s *MySuite) TestInit(c *C) {
+ os.Setenv("ARVADOS_API_HOST", "localhost:3001")
+ os.Setenv("ARVADOS_API_TOKEN", "12345")
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+ kc := InitKeepClient()
+ c.Assert(kc.apiServer, Equals, "localhost:3001")
+ c.Assert(kc.apiToken, Equals, "12345")
+ c.Assert(kc.apiInsecure, Equals, false)
+
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+ kc = InitKeepClient()
+ c.Assert(kc.apiServer, Equals, "localhost:3001")
+ c.Assert(kc.apiToken, Equals, "12345")
+ c.Assert(kc.apiInsecure, Equals, true)
+}
+
+ func (s *MySuite) TestGetKeepDisks(c *C) {
+ sr, err := KeepDisks()
+ c.Assert(err, Equals, nil)
+ c.Assert(len(sr), Equals, 2)
+ c.Assert(sr[0], Equals, "http://localhost:25107")
+ c.Assert(sr[1], Equals, "http://localhost:25108")
+
+ service_roots := []string{"http://localhost:25107", "http://localhost:25108", "http://localhost:25109", "http://localhost:25110", "http://localhost:25111", "http://localhost:25112", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25116", "http://localhost:25117", "http://localhost:25118", "http://localhost:25119", "http://localhost:25120", "http://localhost:25121", "http://localhost:25122", "http://localhost:25123"}
+
+ // "foo" acbd18db4cc2f85cedef654fccc4a4d8
+ foo_shuffle := []string{"http://localhost:25116", "http://localhost:25120", "http://localhost:25119", "http://localhost:25122", "http://localhost:25108", "http://localhost:25114", "http://localhost:25112", "http://localhost:25107", "http://localhost:25118", "http://localhost:25111", "http://localhost:25113", "http://localhost:25121", "http://localhost:25110", "http://localhost:25117", "http://localhost:25109", "http://localhost:25115", "http://localhost:25123"}
+ c.Check(ShuffledServiceRoots(service_roots, "acbd18db4cc2f85cedef654fccc4a4d8"), DeepEquals, foo_shuffle)
+
+ // "bar" 37b51d194a7513e45b56f6524f2d51f2
+ bar_shuffle := []string{"http://localhost:25108", "http://localhost:25112", "http://localhost:25119", "http://localhost:25107", "http://localhost:25110", "http://localhost:25116", "http://localhost:25122", "http://localhost:25120", "http://localhost:25121", "http://localhost:25117", "http://localhost:25111", "http://localhost:25123", "http://localhost:25118", "http://localhost:25113", "http://localhost:25114", "http://localhost:25115", "http://localhost:25109"}
+ c.Check(ShuffledServiceRoots(service_roots, "37b51d194a7513e45b56f6524f2d51f2"), DeepEquals, bar_shuffle)
-
+ }
commit 6132d8efbc522b71d0084160abaaa87031678bdc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 8 11:23:51 2014 -0400
2798: Initial commit of Go Keep client library.
diff --git a/sdk/go/build.sh b/sdk/go/build.sh
new file mode 100755
index 0000000..ed95228
--- /dev/null
+++ b/sdk/go/build.sh
@@ -0,0 +1,37 @@
+#! /bin/sh
+
+# This script builds a Keep executable and installs it in
+# ./bin/keep.
+#
+# In idiomatic Go style, a user would install Keep with something
+# like:
+#
+# go get arvados.org/keep
+# go install arvados.org/keep
+#
+# which would download both the Keep source and any third-party
+# packages it depends on.
+#
+# Since the Keep source is bundled within the overall Arvados source,
+# "go get" is not the primary tool for delivering Keep source and this
+# process doesn't work. Instead, this script sets the environment
+# properly and fetches any necessary dependencies by hand.
+
+if [ -z "$GOPATH" ]
+then
+ GOPATH=$(pwd)
+else
+ GOPATH=$(pwd):${GOPATH}
+fi
+
+export GOPATH
+
+set -o errexit # fail if any command returns an error
+
+mkdir -p pkg
+mkdir -p bin
+go get gopkg.in/check.v1
+go install arvados.org/keepclient
+if ls -l pkg/*/arvados.org/keepclient.a ; then
+ echo "success!"
+fi
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
new file mode 100644
index 0000000..6d75425
--- /dev/null
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -0,0 +1,20 @@
+package keepclient
+
+import (
+ //"net/http"
+ "os"
+)
+
+type KeepClient struct {
+ apiServer string
+ apiToken string
+ apiInsecure bool
+}
+
+func InitKeepClient() *KeepClient {
+ kc := KeepClient{os.Getenv("ARVADOS_API_HOST"),
+ os.Getenv("ARVADOS_API_TOKEN"),
+ os.Getenv("ARVADOS_API_HOST_INSECURE") != ""}
+
+ return &kc
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
new file mode 100644
index 0000000..733cc26
--- /dev/null
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -0,0 +1,43 @@
+package keepclient
+
+import (
+ . "gopkg.in/check.v1"
+ "os"
+ "os/exec"
+ "testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) { TestingT(t) }
+
+// Gocheck boilerplate
+var _ = Suite(&MySuite{})
+
+// Our test fixture
+type MySuite struct{}
+
+func (s *MySuite) SetUpSuite(c *C) {
+ os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "start").Run()
+}
+
+func (s *MySuite) TearDownSuite(c *C) {
+ os.Chdir(os.ExpandEnv("$GOPATH../python"))
+ exec.Command("python", "run_test_server.py", "stop").Run()
+}
+
+func (s *MySuite) TestInit(c *C) {
+ os.Setenv("ARVADOS_API_HOST", "localhost:3001")
+ os.Setenv("ARVADOS_API_TOKEN", "12345")
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+ kc := InitKeepClient()
+ c.Assert(kc.apiServer, Equals, "localhost:3001")
+ c.Assert(kc.apiToken, Equals, "12345")
+ c.Assert(kc.apiInsecure, Equals, false)
+
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+ kc = InitKeepClient()
+ c.Assert(kc.apiServer, Equals, "localhost:3001")
+ c.Assert(kc.apiToken, Equals, "12345")
+ c.Assert(kc.apiInsecure, Equals, true)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list