[ARVADOS] updated: 2b19cf9f9522dd0e8774031a54ce695e73fb72fe

git at public.curoverse.com git at public.curoverse.com
Wed May 14 14:07:15 EDT 2014


Summary of changes:
 sdk/go/src/arvados.org/keepclient/keepclient.go    |  95 +++--
 .../src/arvados.org/keepclient/keepclient_test.go  | 396 +++++++++++++++++----
 2 files changed, 383 insertions(+), 108 deletions(-)

       via  2b19cf9f9522dd0e8774031a54ce695e73fb72fe (commit)
      from  fb7f238945e33b07f1c80b0623315c1ecf86bca2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2b19cf9f9522dd0e8774031a54ce695e73fb72fe
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed May 14 14:07:06 2014 -0400

    2798: All PUT tests against stub server pass.  Added explict setting of
    Content-Length to avoid unnecessary chunked transfer encoding, and is also
    necessary so slices can directly WriteTo() the socket.

diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index ce67503..4222124 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -163,7 +163,6 @@ func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
 	// Initially use entire buffer as scratch space
 	ptr := buffer[:]
 	for {
-		log.Printf("ReadIntoBuffer doing read")
 		var n int
 		var err error
 		if len(ptr) > 0 {
@@ -187,18 +186,13 @@ func ReadIntoBuffer(buffer []byte, r io.Reader, slices chan<- ReaderSlice) {
 
 		// End on error (includes EOF)
 		if err != nil {
-			log.Printf("ReadIntoBuffer sending error %d %s", n, err.Error())
 			slices <- ReaderSlice{nil, err}
 			return
 		}
 
-		log.Printf("ReadIntoBuffer got %d", n)
-
 		if n > 0 {
-			log.Printf("ReadIntoBuffer sending readerslice")
 			// Make a slice with the contents of the read
 			slices <- ReaderSlice{ptr[:n], nil}
-			log.Printf("ReadIntoBuffer sent readerslice")
 
 			// Adjust the scratch space slice
 			ptr = ptr[n:]
@@ -232,7 +226,6 @@ func MakeBufferReader(requests chan<- ReadRequest) BufferReader {
 
 // Reads from the buffer managed by the Transfer()
 func (this BufferReader) Read(p []byte) (n int, err error) {
-	log.Printf("BufferReader Read %d", len(p))
 	this.requests <- ReadRequest{*this.offset, len(p), this.responses}
 	rr, valid := <-this.responses
 	if valid {
@@ -244,18 +237,24 @@ func (this BufferReader) Read(p []byte) (n int, err error) {
 }
 
 func (this BufferReader) WriteTo(dest io.Writer) (written int64, err error) {
-	log.Printf("BufferReader WriteTo")
+	// Record starting offset in order to correctly report the number of bytes sent
+	starting_offset := *this.offset
 	for {
-		this.requests <- ReadRequest{*this.offset, 64 * 1024, this.responses}
+		this.requests <- ReadRequest{*this.offset, 32 * 1024, this.responses}
 		rr, valid := <-this.responses
 		if valid {
+			log.Printf("WriteTo slice %v %d %v", *this.offset, len(rr.slice), rr.err)
 			*this.offset += len(rr.slice)
-			if err != nil {
-				return int64(*this.offset), err
+			if rr.err != nil {
+				if rr.err == io.EOF {
+					// EOF is not an error.
+					return int64(*this.offset - starting_offset), nil
+				} else {
+					return int64(*this.offset - starting_offset), rr.err
+				}
 			} else {
 				dest.Write(rr.slice)
 			}
-
 		} else {
 			return int64(*this.offset), io.ErrUnexpectedEOF
 		}
@@ -271,7 +270,7 @@ func (this BufferReader) Close() error {
 // Handle a read request.  Returns true if a response was sent, and false if
 // the request should be queued.
 func HandleReadRequest(req ReadRequest, body []byte, complete bool) bool {
-	log.Printf("HandleReadRequest offset: %d  max: %d body: %d %t", req.offset, req.maxsize, len(body), complete)
+	log.Printf("HandleReadRequest %d %d %d", req.offset, req.maxsize, len(body))
 	if req.offset < len(body) {
 		var end int
 		if req.offset+req.maxsize < len(body) {
@@ -308,6 +307,7 @@ func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan Rea
 		body = source_buffer[:0]
 
 		// used to communicate slices of the buffer as they are
+		// ReadIntoBuffer will close 'slices' when it is done with it
 		slices = make(chan ReaderSlice)
 
 		// Spin it off
@@ -323,14 +323,11 @@ func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan Rea
 	pending_requests := make([]ReadRequest, 0)
 
 	for {
-		log.Printf("Doing select")
 		select {
 		case req, valid := <-requests:
-			log.Printf("Got read request")
 			// Handle a buffer read request
 			if valid {
 				if !HandleReadRequest(req, body, complete) {
-					log.Printf("Queued")
 					pending_requests = append(pending_requests, req)
 				}
 			} else {
@@ -341,8 +338,6 @@ func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan Rea
 		case bk, valid := <-slices:
 			// Got a new slice from the reader
 			if valid {
-				log.Printf("Got readerslice %d", len(bk.slice))
-
 				if bk.reader_error != nil {
 					reader_error <- bk.reader_error
 					if bk.reader_error == io.EOF {
@@ -364,7 +359,6 @@ func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan Rea
 				n := 0
 				for n < len(pending_requests) {
 					if HandleReadRequest(pending_requests[n], body, complete) {
-						log.Printf("ReadRequest handled")
 
 						// move the element from the
 						// back of the slice to
@@ -373,7 +367,6 @@ func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan Rea
 						pending_requests[n] = pending_requests[len(pending_requests)-1]
 						pending_requests = pending_requests[0 : len(pending_requests)-1]
 					} else {
-						log.Printf("ReadRequest re-queued")
 
 						// Request wasn't handled, so keep it in the request slice
 						n += 1
@@ -393,39 +386,52 @@ func Transfer(source_buffer []byte, source_reader io.Reader, requests <-chan Rea
 	}
 }
 
-type UploadError struct {
-	err error
-	url string
+type UploadStatus struct {
+	Err        error
+	Url        string
+	StatusCode int
 }
 
-func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, upload_status chan<- UploadError) {
+func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
+	upload_status chan<- UploadStatus, expectedLength int64) {
+
+	log.Printf("Uploading to %s", host)
+
 	var req *http.Request
 	var err error
 	var url = fmt.Sprintf("%s/%s", host, hash)
 	if req, err = http.NewRequest("PUT", url, nil); err != nil {
-		upload_status <- UploadError{err, url}
+		upload_status <- UploadStatus{err, url, 0}
 		return
 	}
 
+	if expectedLength > 0 {
+		req.ContentLength = expectedLength
+	}
+
 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
 	req.Body = body
 
 	var resp *http.Response
 	if resp, err = this.client.Do(req); err != nil {
-		upload_status <- UploadError{err, url}
+		upload_status <- UploadStatus{err, url, 0}
+		return
 	}
 
 	if resp.StatusCode == http.StatusOK {
-		upload_status <- UploadError{io.EOF, url}
+		upload_status <- UploadStatus{nil, url, resp.StatusCode}
+	} else {
+		upload_status <- UploadStatus{errors.New(resp.Status), url, resp.StatusCode}
 	}
 }
 
-var KeepWriteError = errors.New("Could not write sufficient replicas")
+var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
 
 func (this KeepClient) putReplicas(
 	hash string,
 	requests chan ReadRequest,
-	reader_status chan error) error {
+	reader_status chan error,
+	expectedLength int64) error {
 
 	// Calculate the ordering for uploading to servers
 	sv := this.ShuffledServiceRoots(hash)
@@ -437,7 +443,7 @@ func (this KeepClient) putReplicas(
 	active := 0
 
 	// Used to communicate status from the upload goroutines
-	upload_status := make(chan UploadError)
+	upload_status := make(chan UploadStatus)
 	defer close(upload_status)
 
 	// Desired number of replicas
@@ -447,11 +453,11 @@ func (this KeepClient) putReplicas(
 		for active < want_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
-				go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status)
+				go this.uploadToKeepServer(sv[next_server], hash, MakeBufferReader(requests), upload_status, expectedLength)
 				next_server += 1
 				active += 1
 			} else {
-				return KeepWriteError
+				return InsufficientReplicasError
 			}
 		}
 
@@ -465,24 +471,36 @@ func (this KeepClient) putReplicas(
 				return status
 			}
 		case status := <-upload_status:
-			if status.err == io.EOF {
+			if status.StatusCode == 200 {
 				// good news!
 				want_replicas -= 1
 			} else {
 				// writing to keep server failed for some reason
-				log.Printf("Got error %s uploading to %s", status.err, status.url)
+				log.Printf("Keep server put to %v failed with '%v'",
+					status.Url, status.Err)
 			}
 			active -= 1
+			log.Printf("Upload status %v %v %v", status.StatusCode, want_replicas, active)
 		}
 	}
 
 	return nil
 }
 
-func (this KeepClient) PutHR(hash string, r io.Reader) error {
+var OversizeBlockError = errors.New("Block too big")
+
+func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) error {
 
 	// Buffer for reads from 'r'
-	buffer := make([]byte, 64*1024*1024)
+	var buffer []byte
+	if expectedLength > 0 {
+		if expectedLength > 64*1024*1024 {
+			return OversizeBlockError
+		}
+		buffer = make([]byte, expectedLength)
+	} else {
+		buffer = make([]byte, 64*1024*1024)
+	}
 
 	// Read requests on Transfer() buffer
 	requests := make(chan ReadRequest)
@@ -490,11 +508,12 @@ func (this KeepClient) PutHR(hash string, r io.Reader) error {
 
 	// Reporting reader error states
 	reader_status := make(chan error)
+	defer close(reader_status)
 
 	// Start the transfer goroutine
 	go Transfer(buffer, r, requests, reader_status)
 
-	return this.putReplicas(hash, requests, reader_status)
+	return this.putReplicas(hash, requests, reader_status, expectedLength)
 }
 
 func (this KeepClient) PutHB(hash string, buffer []byte) error {
@@ -505,7 +524,7 @@ func (this KeepClient) PutHB(hash string, buffer []byte) error {
 	// Start the transfer goroutine
 	go Transfer(buffer, nil, requests, nil)
 
-	return this.putReplicas(hash, requests, nil)
+	return this.putReplicas(hash, requests, nil, int64(len(buffer)))
 }
 
 func (this KeepClient) PutB(buffer []byte) error {
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index c87b87e..f8144f1 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -1,6 +1,7 @@
 package keepclient
 
 import (
+	"crypto/md5"
 	"flag"
 	"fmt"
 	. "gopkg.in/check.v1"
@@ -11,6 +12,7 @@ import (
 	"net/http"
 	"os"
 	"os/exec"
+	"sort"
 	"testing"
 	"time"
 )
@@ -357,12 +359,84 @@ func (s *StandaloneSuite) TestTransferShortBuffer(c *C) {
 	c.Check(status, Equals, io.ErrShortBuffer)
 }
 
+func (s *StandaloneSuite) TestTransferFromBuffer(c *C) {
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 100)
+	for i := 0; i < 100; i += 1 {
+		buffer[i] = byte(i)
+	}
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	go Transfer(buffer, nil, requests, nil)
+
+	br1 := MakeBufferReader(requests)
+
+	in := make([]byte, 64)
+	{
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 64)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 64; i += 1 {
+			c.Check(in[i], Equals, buffer[i])
+		}
+	}
+	{
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 36)
+		c.Check(err, Equals, nil)
+
+		for i := 0; i < 36; i += 1 {
+			c.Check(in[i], Equals, buffer[64+i])
+		}
+	}
+	{
+		n, err := br1.Read(in)
+
+		c.Check(n, Equals, 0)
+		c.Check(err, Equals, io.EOF)
+	}
+}
+
+func (s *StandaloneSuite) TestTransferIoCopy(c *C) {
+	// Buffer for reads from 'r'
+	buffer := make([]byte, 100)
+	for i := 0; i < 100; i += 1 {
+		buffer[i] = byte(i)
+	}
+
+	// Read requests on Transfer() buffer
+	requests := make(chan ReadRequest)
+	defer close(requests)
+
+	go Transfer(buffer, nil, requests, nil)
+
+	br1 := MakeBufferReader(requests)
+
+	reader, writer := io.Pipe()
+
+	go func() {
+		p := make([]byte, 100)
+		n, err := reader.Read(p)
+		c.Check(n, Equals, 100)
+		c.Check(err, Equals, nil)
+		c.Check(p, DeepEquals, buffer)
+	}()
+
+	io.Copy(writer, br1)
+}
+
 type StubHandler struct {
 	c              *C
 	expectPath     string
 	expectApiToken string
 	expectBody     string
-	handled        chan bool
+	handled        chan string
 }
 
 func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -372,132 +446,314 @@ func (this StubHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	this.c.Check(err, Equals, nil)
 	this.c.Check(body, DeepEquals, []byte(this.expectBody))
 	resp.WriteHeader(200)
-	this.handled <- true
+	this.handled <- fmt.Sprintf("http://%s", req.Host)
 }
 
-func UploadToStubHelper(c *C, f func(*KeepClient, string, StubHandler,
-	io.ReadCloser, io.WriteCloser, chan UploadError)) {
-
-	st := StubHandler{
-		c,
-		"acbd18db4cc2f85cedef654fccc4a4d8",
-		"abc123",
-		"foo",
-		make(chan bool)}
+func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url string) {
 	server := http.Server{Handler: st}
 
-	listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
-	defer listener.Close()
+	var err error
+	listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: port})
+	if err != nil {
+		panic(fmt.Sprintf("Could not listen on tcp port %v", port))
+	}
 
-	url := fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
+	url = fmt.Sprintf("http://localhost:%d", listener.Addr().(*net.TCPAddr).Port)
 
 	go server.Serve(listener)
+	return listener, url
+}
+
+func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
+	io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
+
+	listener, url := RunBogusKeepServer(st, 2990)
+	defer listener.Close()
+
 	kc, _ := MakeKeepClient()
 	kc.ApiToken = "abc123"
 
 	reader, writer := io.Pipe()
-	upload_status := make(chan UploadError)
+	upload_status := make(chan UploadStatus)
 
-	f(kc, url, st, reader, writer, upload_status)
+	f(kc, url, reader, writer, upload_status)
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
-	log.Printf("Started TestUploadToStubKeepServer")
+	st := StubHandler{
+		c,
+		"acbd18db4cc2f85cedef654fccc4a4d8",
+		"abc123",
+		"foo",
+		make(chan string)}
 
-	UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
-		reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+	UploadToStubHelper(c, st,
+		func(kc *KeepClient, url string, reader io.ReadCloser,
+			writer io.WriteCloser, upload_status chan UploadStatus) {
 
-		go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status)
+			go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
 
-		writer.Write([]byte("foo"))
-		writer.Close()
+			writer.Write([]byte("foo"))
+			writer.Close()
 
-		<-st.handled
-		status := <-upload_status
-		c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
-	})
+			<-st.handled
+			status := <-upload_status
+			c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
+		})
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
-	log.Printf("Started TestUploadToStubKeepServerBufferReader")
+	st := StubHandler{
+		c,
+		"acbd18db4cc2f85cedef654fccc4a4d8",
+		"abc123",
+		"foo",
+		make(chan string)}
 
-	UploadToStubHelper(c, func(kc *KeepClient, url string, st StubHandler,
-		reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadError) {
+	UploadToStubHelper(c, st,
+		func(kc *KeepClient, url string, reader io.ReadCloser,
+			writer io.WriteCloser, upload_status chan UploadStatus) {
 
-		// Buffer for reads from 'r'
-		buffer := make([]byte, 512)
+			// Buffer for reads from 'r'
+			buffer := make([]byte, 512)
 
-		// Read requests on Transfer() buffer
-		requests := make(chan ReadRequest)
-		defer close(requests)
+			// Read requests on Transfer() buffer
+			requests := make(chan ReadRequest)
+			defer close(requests)
 
-		// Reporting reader error states
-		reader_status := make(chan error)
+			// Reporting reader error states
+			reader_status := make(chan error)
 
-		go Transfer(buffer, reader, requests, reader_status)
+			go Transfer(buffer, reader, requests, reader_status)
 
-		br1 := MakeBufferReader(requests)
+			br1 := MakeBufferReader(requests)
 
-		go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status)
+			go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
 
-		writer.Write([]byte("foo"))
-		writer.Close()
+			writer.Write([]byte("foo"))
+			writer.Close()
 
-		<-reader_status
-		<-st.handled
+			<-reader_status
+			<-st.handled
 
-		status := <-upload_status
-		c.Check(status, DeepEquals, UploadError{io.EOF, fmt.Sprintf("%s/%s", url, st.expectPath)})
+			status := <-upload_status
+			c.Check(status, DeepEquals, UploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
 
-		//c.Check(true, Equals, false)
-	})
+			//c.Check(true, Equals, false)
+		})
 }
 
 type FailHandler struct {
-	handled chan bool
+	handled chan string
 }
 
 func (this FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	resp.WriteHeader(400)
-	this.handled <- true
+	this.handled <- fmt.Sprintf("http://%s", req.Host)
 }
 
-/*func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
-	log.Printf("blup")
+func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
+	st := FailHandler{
+		make(chan string)}
 
-	c.Check(true, Equals, false)
+	hash := "acbd18db4cc2f85cedef654fccc4a4d8"
 
-	log.Printf("blug")
+	UploadToStubHelper(c, st,
+		func(kc *KeepClient, url string, reader io.ReadCloser,
+			writer io.WriteCloser, upload_status chan UploadStatus) {
 
-	st := FailHandler{make(chan bool)}
-	server := http.Server{Handler: st}
+			go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
 
-	listener, _ := net.ListenTCP("tcp", &net.TCPAddr{})
-	defer listener.Close()
+			writer.Write([]byte("foo"))
+			writer.Close()
+
+			<-st.handled
+
+			status := <-upload_status
+			c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
+			c.Check(status.StatusCode, Equals, 400)
+		})
+
+}
+
+type KeepServer struct {
+	listener net.Listener
+	url      string
+}
+
+func RunSomeFakeKeepServers(st http.Handler, n int, port int) (ks []KeepServer) {
+	ks = make([]KeepServer, n)
+
+	for i := 0; i < n; i += 1 {
+		boguslistener, bogusurl := RunBogusKeepServer(st, port+i)
+		ks[i] = KeepServer{boguslistener, bogusurl}
+	}
+
+	return ks
+}
+
+func (s *StandaloneSuite) TestPutB(c *C) {
+	log.Printf("TestPutB")
+
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+	st := StubHandler{
+		c,
+		hash,
+		"abc123",
+		"foo",
+		make(chan string, 2)}
+
+	kc, _ := MakeKeepClient()
+
+	kc.Want_replicas = 2
+	kc.ApiToken = "abc123"
+	kc.Service_roots = make([]string, 5)
+
+	ks := RunSomeFakeKeepServers(st, 5, 2990)
+
+	for i := 0; i < len(ks); i += 1 {
+		kc.Service_roots[i] = ks[i].url
+		defer ks[i].listener.Close()
+	}
+
+	sort.Strings(kc.Service_roots)
+
+	kc.PutB([]byte("foo"))
+
+	shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+
+	c.Check(<-st.handled, Equals, shuff[0])
+	c.Check(<-st.handled, Equals, shuff[1])
+}
+
+func (s *StandaloneSuite) TestPutHR(c *C) {
+	log.Printf("TestPutHR")
+
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+	st := StubHandler{
+		c,
+		hash,
+		"abc123",
+		"foo",
+		make(chan string, 2)}
 
-	go server.Serve(listener)
 	kc, _ := MakeKeepClient()
+
+	kc.Want_replicas = 2
 	kc.ApiToken = "abc123"
+	kc.Service_roots = make([]string, 5)
+
+	ks := RunSomeFakeKeepServers(st, 5, 2990)
+
+	for i := 0; i < len(ks); i += 1 {
+		kc.Service_roots[i] = ks[i].url
+		defer ks[i].listener.Close()
+	}
+
+	sort.Strings(kc.Service_roots)
 
 	reader, writer := io.Pipe()
-	upload_status := make(chan UploadError)
 
-	go kc.uploadToKeepServer(fmt.Sprintf("http://localhost:%s", listener.Addr().String()), "acbd18db4cc2f85cedef654fccc4a4d8", reader, upload_status)
+	go func() {
+		writer.Write([]byte("foo"))
+		writer.Close()
+	}()
 
-	log.Printf("Writing 1")
+	kc.PutHR(hash, reader, 3)
 
-	writer.Write([]byte("foo"))
+	shuff := kc.ShuffledServiceRoots(hash)
 
-	log.Printf("Writing 2")
+	c.Check(<-st.handled, Equals, shuff[0])
+	c.Check(<-st.handled, Equals, shuff[1])
+}
 
-	writer.Close()
+func (s *StandaloneSuite) TestPutWithFail(c *C) {
+	log.Printf("TestPutWithFail")
 
-	log.Printf("Writing 3")
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-	<-st.handled
+	st := StubHandler{
+		c,
+		hash,
+		"abc123",
+		"foo",
+		make(chan string, 2)}
+
+	fh := FailHandler{
+		make(chan string, 1)}
 
-	log.Printf("Handled?!")
+	kc, _ := MakeKeepClient()
+
+	kc.Want_replicas = 2
+	kc.ApiToken = "abc123"
+	kc.Service_roots = make([]string, 5)
+
+	ks1 := RunSomeFakeKeepServers(st, 4, 2990)
+	ks2 := RunSomeFakeKeepServers(fh, 1, 2995)
+
+	for i, k := range ks1 {
+		kc.Service_roots[i] = k.url
+		defer k.listener.Close()
+	}
+	for i, k := range ks2 {
+		kc.Service_roots[len(ks1)+i] = k.url
+		defer k.listener.Close()
+	}
+
+	sort.Strings(kc.Service_roots)
+
+	shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+
+	err := kc.PutB([]byte("foo"))
+
+	<-fh.handled
+
+	c.Check(err, Equals, nil)
+	c.Check(<-st.handled, Equals, shuff[1])
+	c.Check(<-st.handled, Equals, shuff[2])
+}
 
-	status := <-upload_status
-	c.Check(status, DeepEquals, UploadError{io.EOF, "http://localhost:2999/acbd18db4cc2f85cedef654fccc4a4d8"})
-}*/
+func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
+	log.Printf("TestPutWithTooManyFail")
+
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+	st := StubHandler{
+		c,
+		hash,
+		"abc123",
+		"foo",
+		make(chan string, 1)}
+
+	fh := FailHandler{
+		make(chan string, 4)}
+
+	kc, _ := MakeKeepClient()
+
+	kc.Want_replicas = 2
+	kc.ApiToken = "abc123"
+	kc.Service_roots = make([]string, 5)
+
+	ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+	ks2 := RunSomeFakeKeepServers(fh, 4, 2991)
+
+	for i, k := range ks1 {
+		kc.Service_roots[i] = k.url
+		defer k.listener.Close()
+	}
+	for i, k := range ks2 {
+		kc.Service_roots[len(ks1)+i] = k.url
+		defer k.listener.Close()
+	}
+
+	sort.Strings(kc.Service_roots)
+
+	shuff := kc.ShuffledServiceRoots(fmt.Sprintf("%x", md5.Sum([]byte("foo"))))
+
+	err := kc.PutB([]byte("foo"))
+
+	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(<-st.handled, Equals, shuff[1])
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list