[ARVADOS] updated: 66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab

git at public.curoverse.com git at public.curoverse.com
Mon May 12 16:54:29 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/keepclient/keepclient.go | 308 ++++++++++++++++++------
 1 file changed, 238 insertions(+), 70 deletions(-)

       via  66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab (commit)
      from  90bf8cae5f1ba931c7cc3be7864cad8ecca1ca5e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 66d5cdb1f34d614e5ecf1da5ef6efcad3a5a51ab
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon May 12 16:54:23 2014 -0400

    2798: Work in progress connecting data read from input Reader to POST requests.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index ba73523..073a76e 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -12,6 +12,7 @@ import (
 
 type KeepClient struct {
 	Service_roots []string
+	ApiToken      string
 }
 
 type KeepDisk struct {
@@ -20,7 +21,16 @@ type KeepDisk struct {
 	SSL      bool   `json:"service_ssl_flag"`
 }
 
-func KeepDisks() (service_roots []string, err error) {
+func MakeKeepClient() (kc *KeepClient, err error) {
+	kc := KeepClient{}
+	err := kc.DiscoverKeepDisks()
+	if err != nil {
+		return nil, err
+	}
+	return &kc, nil
+}
+
+func (this *KeepClient) DiscoverKeepDisks() error {
 	tr := &http.Transport{
 		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
 	}
@@ -32,7 +42,7 @@ func KeepDisks() (service_roots []string, err error) {
 	}
 
 	var resp *http.Response
-	req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
 	if resp, err = client.Do(req); err != nil {
 		return nil, err
 	}
@@ -46,25 +56,17 @@ func KeepDisks() (service_roots []string, err error) {
 		return nil, err
 	}
 
-	service_roots = make([]string, len(m.Items))
+	this.service_roots = make([]string, len(m.Items))
 	for index, element := range m.Items {
 		n := ""
 		if element.SSL {
 			n = "s"
 		}
-		service_roots[index] = fmt.Sprintf("http%s://%s:%d",
+		this.service_roots[index] = fmt.Sprintf("http%s://%s:%d",
 			n, element.Hostname, element.Port)
 	}
-	sort.Strings(service_roots)
-	return service_roots, nil
-}
-
-func MakeKeepClient() (kc *KeepClient, err error) {
-	sv, err := KeepDisks()
-	if err != nil {
-		return nil, err
-	}
-	return &KeepClient{sv}, nil
+	sort.Strings(this.service_roots)
+	return nil
 }
 
 func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
@@ -115,97 +117,263 @@ func (this KeepClient) ShuffledServiceRoots(hash string) (pseq []string) {
 	return pseq
 }
 
-func ReadIntoBuffer(buffer []byte, r io.Reader, c chan []byte, reader_error chan error) {
+type ReaderSlice struct {
+	slice        []byte
+	reader_error error
+}
+
+type Source <-chan ReaderSlice
+type Sink chan<- ReaderSlice
+type Status chan error
+
+// Read repeatedly from the reader into the specified buffer, and report each
+// read to channel 'c'.  Completes when Reader 'r' reports an error and closes
+// channel 'c'.
+func ReadIntoBuffer(buffer []byte, r io.Reader, c Sink) {
+	defer close(c)
+
+	// Initially use entire buffer as scratch space
 	ptr := buffer[:]
-	for {
+	for len(ptr) > 0 {
+		v // Read into the scratch space
 		n, err := r.Read(ptr)
+
+		// End on error (includes EOF)
 		if err != nil {
-			reader_error <- err
+			c <- ReaderSlice{nil, err}
 			return
 		}
-		c <- ptr[:n]
+
+		// Make a slice with the contents of the read
+		c <- ReaderSlice{ptr[:n], nil}
+
+		// Adjust the scratch space slice
 		ptr = ptr[n:]
 	}
+	if len(ptr) == 0 {
+		c <- ReaderSlice{nil, io.ErrShortBuffer}
+	}
 }
 
-type Sink struct {
-	out io.Writer
-	err chan error
+// Take slices from 'source' channel and write them to Writer 'w'.  Reports read
+// or write errors on 'status'.  Completes when 'source' channel is closed.
+func SinkWriter(source Source, w io.Writer, status Status) {
+	can_write = true
+
+	for {
+		// Get the next block from the source
+		rs, valid := <-source
+
+		if valid {
+			if rs.error != nil {
+				// propagate reader status (should only be EOF)
+				status <- rs.error
+			} else if can_write {
+				buf := rs.slice[:]
+				for len(buf) > 0 {
+					n, err := w.Write(buf)
+					buf = buf[n:]
+					if err == io.ErrShortWrite {
+						// short write, so go around again
+					} else if err != nil {
+						// some other write error,
+						// propagate error and stop
+						// further writes
+						status <- err
+						can_write = false
+					}
+				}
+			}
+		} else {
+			// source channel closed
+			break
+		}
+	}
 }
 
-// Transfer data from a buffer into one or more 'sinks'.
-//
-// Forwards all data read to the writers in "Sinks", including any previous
-// reads into the buffer.  Either one of buffer or io.Reader must be valid, and
-// the other must be nil.  If 'source' is valid, it will read from the reader,
-// store the data in the buffer, and send the data to the sinks.  Otherwise
-// 'buffer' must be valid, and it will send the contents of the buffer to the
-// sinks.
-func Transfer(buffer []byte, source io.Reader, sinks chan Sink, errorchan chan error) {
+func closeSinks(sinks_slice []Sink) {
+	for _, s := range sinks_slice {
+		close(s)
+	}
+}
+
+// Transfer data from a source (either an already-filled buffer, or a reader)
+// into one or more 'sinks'.  If 'source' is valid, it will read from the
+// reader into the buffer and send the data to the sinks.  Otherwise 'buffer'
+// it will just send the contents of the buffer to the sinks.  Completes when
+// the 'sinks' channel is closed.
+func Transfer(source_buffer []byte, source_reader io.Reader, sinks <-chan Sink, reader_error chan error) {
 	// currently buffered data
-	var ptr []byte
+	var body []byte
 
 	// for receiving slices from ReadIntoBuffer
-	var slices chan []byte
+	var slices chan []byte = nil
 
 	// indicates whether the buffered data is complete
 	var complete bool = false
 
-	// for receiving errors from ReadIntoBuffer
-	var reader_error chan error = nil
-
 	if source != nil {
-		// allocate the scratch buffer at 64 MiB
-		buffer = make([]byte, 1024*1024*64)
-
-		// 'ptr' is a slice indicating the buffer slice that has been
-		// read so far
-		ptr = buffer[0:0]
+		// 'body' is the buffer slice representing the body content read so far
+		body = source_buffer[:0]
 
 		// used to communicate slices of the buffer as read
-		slices := make(chan []byte)
-
-		// communicate read errors
-		reader_error = make(chan error)
+		reader_slices := make(chan []ReaderSlice)
 
 		// Spin it off
-		go ReadIntoBuffer(buffer, source, slices, reader_error)
+		go ReadIntoBuffer(source_buffer, source_reader, reader_slices)
 	} else {
 		// use the whole buffer
-		ptr = buffer[:]
+		body = source_buffer[:]
 
 		// that's it
 		complete = true
 	}
 
 	// list of sinks to send to
-	sinks_slice := make([]io.Writer, 0)
-
-	select {
-	case e := <-reader_error:
-		// barf
-	case s, valid := <-sinks:
-		if !valid {
-			// sinks channel closed
-			return
-		}
-		sinks_slice = append(sinks_slice, s)
-		go s.Write(ptr)
-	case bk := <-slices:
-		ptr = buffer[0 : len(ptr)+len(bk)]
-		for _, s := range sinks_slice {
-			go s.Write(bk)
+	sinks_slice := make([]Sink, 0)
+	defer closeSinks(sinks_slice)
+
+	for {
+		select {
+		case s, valid := <-sinks:
+			if valid {
+				// add to the sinks slice
+				sinks_slice = append(sinks_slice, s)
+
+				// catch up the sink with the current body contents
+				if len(body) > 0 {
+					s <- ReaderSlice{body, nil}
+					if complete {
+						s <- ReaderSlice{nil, io.EOF}
+					}
+				}
+			} else {
+				// closed 'sinks' channel indicates we're done
+				return
+			}
+
+		case bk, valid := <-slices:
+			if valid {
+				if bk.err != nil {
+					reader_error <- bk.err
+					if bk.err == io.EOF {
+						// EOF indicates the reader is done
+						// sending, so our buffer is complete.
+						complete = true
+					} else {
+						// some other reader error
+						return
+					}
+				}
+
+				if bk.slice != nil {
+					// adjust body bounds now that another slice has been read
+					body = source_buffer[0 : len(body)+len(bk.slice)]
+				}
+
+				// send the new slice to the sinks
+				for _, s := range sinks_slice {
+					s <- bk
+				}
+
+				if complete {
+					// got an EOF, so close the sinks
+					closeSinks(sinks_slice)
+
+					// truncate sinks slice
+					sinks_slice = sinks_slice[:0]
+				}
+			} else {
+				// no more reads
+				slices = nil
+			}
 		}
 	}
 }
 
-func (this KeepClient) KeepPut(hash string, r io.Reader) {
-	//sv := this.ShuffledServiceRoots(hash)
-	//n := 0
+func (this KeepClient) ConnectToKeepServer(url string, sinks chan<- Sink, write_status chan<- error) {
+	pipereader, pipewriter := io.Pipe()
+
+	var req *http.Request
+	if req, err = http.NewRequest("POST", url, nil); err != nil {
+		write_status <- err
+	}
+	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+	req.Body = pipereader
+
+	// create channel to transfer slices from reader to writer
+	tr := make(chan ReaderSlice)
 
-	//success := make(chan int)
-	sinks := make(chan []io.Writer)
-	errorchan := make(chan error)
+	// start the writer goroutine
+	go SinkWriter(tr, pipewriter, write_status)
+
+	// now transfer the channel to the reader goroutine
+	sinks <- tr
+
+	var resp *http.Response
+
+	if resp, err = client.Do(req); err != nil {
+		return nil, err
+	}
+}
 
-	go Transfer(nil, r, reads, errorchan)
+var KeepWriteError = errors.new("Could not write sufficient replicas")
+
+func (this KeepClient) KeepPut(hash string, r io.Reader, want_replicas int) error {
+	// Calculate the ordering to try writing to servers
+	sv := this.ShuffledServiceRoots(hash)
+
+	// The next server to try contacting
+	n := 0
+
+	// The number of active writers
+	active := 0
+
+	// Used to buffer reads from 'r'
+	buffer := make([]byte, 64*1024*1024)
+
+	// Used to send writers to the reader goroutine
+	sinks := make(chan Sink)
+	defer close(sinks)
+
+	// Used to communicate status from the reader goroutine
+	reader_status := make(chan error)
+
+	// Start the reader goroutine
+	go Transfer(buffer, r, sinks, reader_status)
+
+	// Used to communicate status from the writer goroutines
+	write_status := make(chan error)
+
+	for want_replicas > 0 {
+		for active < want_replicas {
+			// Start some writers
+			if n < len(sv) {
+				go this.ConnectToKeepServer(sv[n], sinks, write_status)
+				n += 1
+				active += 1
+			} else {
+				return KeepWriteError
+			}
+		}
+
+		// Now wait for something to happen.
+		select {
+		case status := <-reader_status:
+			if status == io.EOF {
+				// good news!
+			} else {
+				// bad news
+				return status
+			}
+		case status := <-write_status:
+			if status == io.EOF {
+				// good news!
+				want_replicas -= 1
+			} else {
+				// writing to keep server failed for some reason.
+			}
+			active -= 1
+		}
+	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list