[ARVADOS] updated: 66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab
git at public.curoverse.com
git at public.curoverse.com
Mon May 12 16:54:29 EDT 2014
Summary of changes:
sdk/go/src/arvados.org/keepclient/keepclient.go | 308 ++++++++++++++++++------
1 file changed, 238 insertions(+), 70 deletions(-)
via 66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab (commit)
from 90bf8cae5f1ba931c7cc3be7864cad8ecca1ca5e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon May 12 16:54:23 2014 -0400
2798: Work in progress connecting data read from input Reader to POST requests.
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index ba73523..073a76e 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -12,6 +12,7 @@ import (
type KeepClient struct {
Service_roots []string
+ ApiToken string
}
type KeepDisk struct {
@@ -20,7 +21,16 @@ type KeepDisk struct {
SSL bool `json:"service_ssl_flag"`
}
-func KeepDisks() (service_roots []string, err error) {
+func MakeKeepClient() (kc *KeepClient, err error) {
+ kc := KeepClient{}
+ err := kc.DiscoverKeepDisks()
+ if err != nil {
+ return nil, err
+ }
+ return &kc, nil
+}
+
+func (this *KeepClient) DiscoverKeepDisks() error {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
@@ -32,7 +42,7 @@ func KeepDisks() (service_roots []string, err error) {
}
var resp *http.Response
- req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
if resp, err = client.Do(req); err != nil {
return nil, err
}
@@ -46,25 +56,17 @@ func KeepDisks() (service_roots []string, err error) {
return nil, err
}
- service_roots = make([]string, len(m.Items))
+ this.service_roots = make([]string, len(m.Items))
for index, element := range m.Items {
n := ""
if element.SSL {
n = "s"
}
- service_roots[index] = fmt.Sprintf("http%s://%s:%d",
+ this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
n, element.Hostname, element.Port)
}
- sort.Strings(service_roots)
- return service_roots, nil
-}
-
-func MakeKeepClient() (kc *KeepClient, err error) {
- sv, err := KeepDisks()
- if err != nil {
- return nil, err
- }
- return &KeepClient{sv}, nil
+ sort.Strings(this.service_roots)
+ return nil
}
func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
@@ -115,97 +117,263 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
return pseq
}
-func ReadIntoBuffer(buffer []byte, r io.Reader, c chan []byte, reader_error chan error) {
+type ReaderSlice struct {
+ slice []byte
+ reader_error error
+}
+
+type Source <-chan ReaderSlice
+type Sink chan<- ReaderSlice
+type Status chan error
+
+// Read repeatedly from the reader into the specified buffer, and report each
+// read to channel 'c'. Completes when Reader 'r' reports an error and closes
+// channel 'c'.
+func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
+ defer close(c)
+
+ // Initially use entire buffer as scratch space
ptr := buffer[:]
- for {
+ for len(ptr) > 0 {
+ v // Read into the scratch space
n, err := r.Read(ptr)
+
+ // End on error (includes EOF)
if err != nil {
- reader_error <- err
+ c <- ReaderSlice{nil, err}
return
}
- c <- ptr[:n]
+
+ // Make a slice with the contents of the read
+ c <- ReaderSlice{ptr[:n], nil}
+
+ // Adjust the scratch space slice
ptr = ptr[n:]
}
+ if len(ptr) == 0 {
+ c <- ReaderSlice{nil, io.ErrShortBuffer}
+ }
}
-type Sink struct {
- out io.Writer
- err chan error
+// Take slices from 'source' channel and write them to Writer 'w'. Reports read
+// or write errors on 'status'. Completes when 'source' channel is closed.
+func SinkWriter(source Source, w io.Writer, status Status) {
+ can_write = true
+
+ for {
+ // Get the next block from the source
+ rs, valid := <-source
+
+ if valid {
+ if rs.error != nil {
+ // propagate reader status (should only be EOF)
+ status <- rs.error
+ } else if can_write {
+ buf := rs.slice[:]
+ for len(buf) > 0 {
+ n, err := w.Write(buf)
+ buf = buf[n:]
+ if err == io.ErrShortWrite {
+ // short write, so go around again
+ } else if err != nil {
+ // some other write error,
+ // propagate error and stop
+ // further writes
+ status <- err
+ can_write = false
+ }
+ }
+ }
+ } else {
+ // source channel closed
+ break
+ }
+ }
}
-// Transfer data from a buffer into one or more 'sinks'.
-//
-// Forwards all data read to the writers in "Sinks", including any previous
-// reads into the buffer. Either one of buffer or io.Reader must be valid, and
-// the other must be nil. If 'source' is valid, it will read from the reader,
-// store the data in the buffer, and send the data to the sinks. Otherwise
-// 'buffer' must be valid, and it will send the contents of the buffer to the
-// sinks.
-func Transfer(buffer []byte, source io.Reader, sinks chan Sink, errorchan chan error) {
+func closeSinks(sinks_slice []Sink) {
+ for _, s := range sinks_slice {
+ close(s)
+ }
+}
+
+// Transfer data from a source (either an already-filled buffer, or a reader)
+// into one or more 'sinks'. If 'source' is valid, it will read from the
+// reader into the buffer and send the data to the sinks. Otherwise 'buffer'
+// it will just send the contents of the buffer to the sinks. Completes when
+// the 'sinks' channel is closed.
+func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
// currently buffered data
- var ptr []byte
+ var body []byte
// for receiving slices from ReadIntoBuffer
- var slices chan []byte
+ var slices chan []byte = nil
// indicates whether the buffered data is complete
var complete bool = false
- // for receiving errors from ReadIntoBuffer
- var reader_error chan error = nil
-
if source != nil {
- // allocate the scratch buffer at 64 MiB
- buffer = make([]byte, 1024*1024*64)
-
- // 'ptr' is a slice indicating the buffer slice that has been
- // read so far
- ptr = buffer[0:0]
+ // 'body' is the buffer slice representing the body content read so far
+ body = source_buffer[:0]
// used to communicate slices of the buffer as read
- slices := make(chan []byte)
-
- // communicate read errors
- reader_error = make(chan error)
+ reader_slices := make(chan []ReaderSlice)
// Spin it off
- go ReadIntoBuffer(buffer, source, slices, reader_error)
+ go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
} else {
// use the whole buffer
- ptr = buffer[:]
+ body = source_buffer[:]
// that's it
complete = true
}
// list of sinks to send to
- sinks_slice := make([]io.Writer, 0)
-
- select {
- case e := <-reader_error:
- // barf
- case s, valid := <-sinks:
- if !valid {
- // sinks channel closed
- return
- }
- sinks_slice = append(sinks_slice, s)
- go s.Write(ptr)
- case bk := <-slices:
- ptr = buffer[0 : len(ptr)+len(bk)]
- for _, s := range sinks_slice {
- go s.Write(bk)
+ sinks_slice := make([]Sink, 0)
+ defer closeSinks(sinks_slice)
+
+ for {
+ select {
+ case s, valid := <-sinks:
+ if valid {
+ // add to the sinks slice
+ sinks_slice = append(sinks_slice, s)
+
+ // catch up the sink with the current body contents
+ if len(body) > 0 {
+ s <- ReaderSlice{body, nil}
+ if complete {
+ s <- ReaderSlice{nil, io.EOF}
+ }
+ }
+ } else {
+ // closed 'sinks' channel indicates we're done
+ return
+ }
+
+ case bk, valid := <-slices:
+ if valid {
+ if bk.err != nil {
+ reader_error <- bk.err
+ if bk.err == io.EOF {
+ // EOF indicates the reader is done
+ // sending, so our buffer is complete.
+ complete = true
+ } else {
+ // some other reader error
+ return
+ }
+ }
+
+ if bk.slice != nil {
+ // adjust body bounds now that another slice has been read
+ body = source_buffer[0 : len(body)+len(bk.slice)]
+ }
+
+ // send the new slice to the sinks
+ for _, s := range sinks_slice {
+ s <- bk
+ }
+
+ if complete {
+ // got an EOF, so close the sinks
+ closeSinks(sinks_slice)
+
+ // truncate sinks slice
+ sinks_slice = sinks_slice[:0]
+ }
+ } else {
+ // no more reads
+ slices = nil
+ }
}
}
}
-func (this KeepClient) KeepPut(hash string, r io.Reader) {
- //sv := this.ShuffledServiceRoots(hash)
- //n := 0
+func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
+ pipereader, pipewriter := io.Pipe()
+
+ var req *http.Request
+ if req, err = http.NewRequest("POST", url, nil); err != nil {
+ write_status <- err
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+ req.Body = pipereader
+
+ // create channel to transfer slices from reader to writer
+ tr := make(chan ReaderSlice)
- //success := make(chan int)
- sinks := make(chan []io.Writer)
- errorchan := make(chan error)
+ // start the writer goroutine
+ go SinkWriter(tr, pipewriter, write_status)
+
+ // now transfer the channel to the reader goroutine
+ sinks <- tr
+
+ var resp *http.Response
+
+ if resp, err = client.Do(req); err != nil {
+ return nil, err
+ }
+}
- go Transfer(nil, r, reads, errorchan)
+var KeepWriteError = errors.new("Could not write sufficient replicas")
+
+func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
+ // Calculate the ordering to try writing to servers
+ sv := this.ShuffledServiceRoots(hash)
+
+ // The next server to try contacting
+ n := 0
+
+ // The number of active writers
+ active := 0
+
+ // Used to buffer reads from 'r'
+ buffer := make([]byte, 64*1024*1024)
+
+ // Used to send writers to the reader goroutine
+ sinks := make(chan Sink)
+ defer close(sinks)
+
+ // Used to communicate status from the reader goroutine
+ reader_status := make(chan error)
+
+ // Start the reader goroutine
+ go Transfer(buffer, r, sinks, reader_status)
+
+ // Used to communicate status from the writer goroutines
+ write_status := make(chan error)
+
+ for want_replicas > 0 {
+ for active < want_replicas {
+ // Start some writers
+ if n < len(sv) {
+ go this.ConnectToKeepServer(sv[n], sinks, write_status)
+ n += 1
+ active += 1
+ } else {
+ return KeepWriteError
+ }
+ }
+
+ // Now wait for something to happen.
+ select {
+ case status := <-reader_status:
+ if status == io.EOF {
+ // good news!
+ } else {
+ // bad news
+ return status
+ }
+ case status := <-write_status:
+ if status == io.EOF {
+ // good news!
+ want_replicas -= 1
+ } else {
+ // writing to keep server failed for some reason.
+ }
+ active -= 1
+ }
+ }
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list