[ARVADOS] created: 08a15a1e6b8e6f2e44f18328c1f6dd25343cffc2
git at public.curoverse.com
git at public.curoverse.com
Sun Oct 18 23:22:49 EDT 2015
at 08a15a1e6b8e6f2e44f18328c1f6dd25343cffc2 (commit)
commit 08a15a1e6b8e6f2e44f18328c1f6dd25343cffc2
Author: radhika <radhika at curoverse.com>
Date: Sun Oct 18 23:21:58 2015 -0400
7546: enhance putReplicas method to retry.
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index b5bc5ce..6e7f331 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -1186,3 +1186,53 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
c.Check(err2, Equals, nil)
c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}
+
+type FailThenSucceedPutHandler struct {
+ handled chan string
+ count int
+ successhandler StubPutHandler
+}
+
+func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if h.count == 0 {
+ resp.WriteHeader(500)
+ h.count += 1
+ h.handled <- fmt.Sprintf("http://%s", req.Host)
+ } else {
+ h.successhandler.ServeHTTP(resp, req)
+ }
+}
+
+func (s *StandaloneSuite) TestPutBRetry(c *C) {
+ st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
+ StubPutHandler{
+ c,
+ Md5String("foo"),
+ "abc123",
+ "foo",
+ make(chan string, 5)}}
+
+ arv, _ := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+
+ kc.Want_replicas = 2
+ arv.ApiToken = "abc123"
+ localRoots := make(map[string]string)
+ writableLocalRoots := make(map[string]string)
+
+ ks := RunSomeFakeKeepServers(st, 2)
+
+ for i, k := range ks {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
+ }
+
+ kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+ hash, replicas, err := kc.PutB([]byte("foo"))
+
+ c.Check(err, Equals, nil)
+ c.Check(hash, Equals, "")
+ c.Check(replicas, Equals, 2)
+}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 0791d3c..47d3e3a 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -269,34 +269,51 @@ func (this KeepClient) putReplicas(
replicasPerThread = remaining_replicas
}
- for remaining_replicas > 0 {
- for active*replicasPerThread < remaining_replicas {
- // Start some upload requests
- if next_server < len(sv) {
- log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
- next_server += 1
- active += 1
- } else {
- if active == 0 {
- return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ retriesRemaining := 1 + this.Retries
+ var retryServers []string
+
+ for retriesRemaining > 0 {
+ retriesRemaining -= 1
+ next_server = 0
+ retryServers = []string{}
+ for remaining_replicas > 0 {
+ for active*replicasPerThread < remaining_replicas {
+ // Start some upload requests
+ if next_server < len(sv) {
+ log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+ next_server += 1
+ active += 1
} else {
- break
+ if active == 0 && retriesRemaining == 0 {
+ return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ } else {
+ break
+ }
+ }
+ }
+ log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+ requestId, remaining_replicas, active)
+
+ // Now wait for something to happen.
+ if active > 0 {
+ status := <-upload_status
+ active -= 1
+
+ if status.statusCode == 200 {
+ // good news!
+ remaining_replicas -= status.replicas_stored
+ locator = status.response
+ } else if status.statusCode == 408 || status.statusCode == 429 || status.statusCode >= 500 {
+ // Timeout, too many requests, or other server side failure
+ retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
}
+ } else {
+ break
}
}
- log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
- requestId, remaining_replicas, active)
-
- // Now wait for something to happen.
- status := <-upload_status
- active -= 1
- if status.statusCode == 200 {
- // good news!
- remaining_replicas -= status.replicas_stored
- locator = status.response
- }
+ sv = retryServers
}
return locator, this.Want_replicas, nil
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list