[ARVADOS] created: 08a15a1e6b8e6f2e44f18328c1f6dd25343cffc2

git at public.curoverse.com git at public.curoverse.com
Sun Oct 18 23:22:49 EDT 2015


        at  08a15a1e6b8e6f2e44f18328c1f6dd25343cffc2 (commit)


commit 08a15a1e6b8e6f2e44f18328c1f6dd25343cffc2
Author: radhika <radhika at curoverse.com>
Date:   Sun Oct 18 23:21:58 2015 -0400

    7546: enhance putReplicas method to retry.

diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index b5bc5ce..6e7f331 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -1186,3 +1186,53 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
 	c.Check(err2, Equals, nil)
 	c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
 }
+
+type FailThenSucceedPutHandler struct {
+	handled        chan string
+	count          int
+	successhandler StubPutHandler
+}
+
+func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	if h.count == 0 {
+		resp.WriteHeader(500)
+		h.count += 1
+		h.handled <- fmt.Sprintf("http://%s", req.Host)
+	} else {
+		h.successhandler.ServeHTTP(resp, req)
+	}
+}
+
+func (s *StandaloneSuite) TestPutBRetry(c *C) {
+	st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
+		StubPutHandler{
+			c,
+			Md5String("foo"),
+			"abc123",
+			"foo",
+			make(chan string, 5)}}
+
+	arv, _ := arvadosclient.MakeArvadosClient()
+	kc, _ := MakeKeepClient(&arv)
+
+	kc.Want_replicas = 2
+	arv.ApiToken = "abc123"
+	localRoots := make(map[string]string)
+	writableLocalRoots := make(map[string]string)
+
+	ks := RunSomeFakeKeepServers(st, 2)
+
+	for i, k := range ks {
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		defer k.listener.Close()
+	}
+
+	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+	hash, replicas, err := kc.PutB([]byte("foo"))
+
+	c.Check(err, Equals, nil)
+	c.Check(hash, Equals, "")
+	c.Check(replicas, Equals, 2)
+}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 0791d3c..47d3e3a 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -269,34 +269,51 @@ func (this KeepClient) putReplicas(
 		replicasPerThread = remaining_replicas
 	}
 
-	for remaining_replicas > 0 {
-		for active*replicasPerThread < remaining_replicas {
-			// Start some upload requests
-			if next_server < len(sv) {
-				log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
-				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
-				next_server += 1
-				active += 1
-			} else {
-				if active == 0 {
-					return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+	retriesRemaining := 1 + this.Retries
+	var retryServers []string
+
+	for retriesRemaining > 0 {
+		retriesRemaining -= 1
+		next_server = 0
+		retryServers = []string{}
+		for remaining_replicas > 0 {
+			for active*replicasPerThread < remaining_replicas {
+				// Start some upload requests
+				if next_server < len(sv) {
+					log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+					go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+					next_server += 1
+					active += 1
 				} else {
-					break
+					if active == 0 && retriesRemaining == 0 {
+						return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+					} else {
+						break
+					}
+				}
+			}
+			log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+				requestId, remaining_replicas, active)
+
+			// Now wait for something to happen.
+			if active > 0 {
+				status := <-upload_status
+				active -= 1
+
+				if status.statusCode == 200 {
+					// good news!
+					remaining_replicas -= status.replicas_stored
+					locator = status.response
+				} else if status.statusCode == 408 || status.statusCode == 429 || status.statusCode >= 500 {
+					// Timeout, too many requests, or other server side failure
+					retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
 				}
+			} else {
+				break
 			}
 		}
-		log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
-			requestId, remaining_replicas, active)
-
-		// Now wait for something to happen.
-		status := <-upload_status
-		active -= 1
 
-		if status.statusCode == 200 {
-			// good news!
-			remaining_replicas -= status.replicas_stored
-			locator = status.response
-		}
+		sv = retryServers
 	}
 
 	return locator, this.Want_replicas, nil

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list