[arvados] created: 2.7.0-6017-g0473be4a7f
git repository hosting
git at public.arvados.org
Wed Feb 14 16:54:53 UTC 2024
at 0473be4a7fc9dee99124b268789fa8d54fdc3ac0 (commit)
commit 0473be4a7fc9dee99124b268789fa8d54fdc3ac0
Author: Tom Clegg <tom at curii.com>
Date: Wed Feb 14 11:54:30 2024 -0500
21023: Add exponential-backoff delay between keepclient retries.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 2bd7996b59..e7d6d24632 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -15,6 +15,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "math/rand"
"net"
"net/http"
"os"
@@ -44,6 +45,8 @@ var (
DefaultProxyTLSHandshakeTimeout = 10 * time.Second
DefaultProxyKeepAlive = 120 * time.Second
+ DefaultRetryDelay = 2 * time.Second
+
rootCacheDir = "/var/cache/arvados/keep"
userCacheDir = ".cache/arvados/keep" // relative to HOME
)
@@ -113,6 +116,7 @@ type KeepClient struct {
lock sync.RWMutex
HTTPClient HTTPClient
Retries int
+ RetryDelay time.Duration // Delay after first attempt (increases on each attempt); if 0, use DefaultRetryDelay
RequestID string
StorageClasses []string
DefaultStorageClasses []string // Set by cluster's exported config
@@ -141,6 +145,7 @@ func (kc *KeepClient) Clone() *KeepClient {
gatewayRoots: kc.gatewayRoots,
HTTPClient: kc.HTTPClient,
Retries: kc.Retries,
+ RetryDelay: kc.RetryDelay,
RequestID: kc.RequestID,
StorageClasses: kc.StorageClasses,
DefaultStorageClasses: kc.DefaultStorageClasses,
@@ -190,6 +195,7 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient {
Arvados: arv,
Want_replicas: defaultReplicationLevel,
Retries: 2,
+ RetryDelay: DefaultRetryDelay,
}
err = kc.loadDefaultClasses()
if err != nil {
@@ -269,6 +275,10 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
var errs []string
+ retryDelay := kc.RetryDelay
+ if retryDelay < 1 {
+ retryDelay = DefaultRetryDelay
+ }
triesRemaining := 1 + kc.Retries
serversToTry := kc.getSortedRoots(locator)
@@ -348,6 +358,12 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
return nil, expectLength, url, resp.Header, nil
}
serversToTry = retryList
+ if len(serversToTry) > 0 && triesRemaining > 0 {
+ time.Sleep(retryDelay)
+ // Increase delay by a random factor between
+ // 1.75x and 2x
+ retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay))
+ }
}
DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs)
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 94591bd064..d9150e2c53 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -17,6 +17,7 @@ import (
"os"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -26,8 +27,8 @@ import (
. "gopkg.in/check.v1"
)
-// Gocheck boilerplate
func Test(t *testing.T) {
+ DefaultRetryDelay = 50 * time.Millisecond
TestingT(t)
}
@@ -421,17 +422,17 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}
type FailThenSucceedHandler struct {
+ morefails int // fail 1 + this many times before succeeding
handled chan string
- count int
+ count atomic.Int64
successhandler http.Handler
reqIDs []string
}
func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
- if fh.count == 0 {
+ if int(fh.count.Add(1)) <= fh.morefails+1 {
resp.WriteHeader(500)
- fh.count++
fh.handled <- fmt.Sprintf("http://%s", req.Host)
} else {
fh.successhandler.ServeHTTP(resp, req)
@@ -560,14 +561,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
- reader, writer := io.Pipe()
-
- go func() {
- writer.Write([]byte("foo"))
- writer.Close()
- }()
-
- kc.PutHR(hash, reader, 3)
+ kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
@@ -804,40 +798,64 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
}
func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+ defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+ DefaultRetryDelay = time.Second / 8
+
hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
- st := &FailThenSucceedHandler{
- handled: make(chan string, 1),
- successhandler: StubGetHandler{
- c,
- hash,
- "abc123",
- http.StatusOK,
- []byte("foo")}}
+ for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+ c.Logf("=== initial delay %v", delay)
- ks := RunFakeKeepServer(st)
- defer ks.listener.Close()
+ st := &FailThenSucceedHandler{
+ morefails: 2,
+ handled: make(chan string, 4),
+ successhandler: StubGetHandler{
+ c,
+ hash,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")}}
- arv, err := arvadosclient.MakeArvadosClient()
- c.Check(err, IsNil)
- kc, _ := MakeKeepClient(arv)
- arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
- r, n, _, err := kc.Get(hash)
- c.Assert(err, IsNil)
- c.Check(n, Equals, int64(3))
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Check(err, IsNil)
+ kc, _ := MakeKeepClient(arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+ kc.Retries = 3
+ kc.RetryDelay = delay
+ kc.DiskCacheSize = DiskCacheDisabled
- content, err := ioutil.ReadAll(r)
- c.Check(err, IsNil)
- c.Check(content, DeepEquals, []byte("foo"))
- c.Check(r.Close(), IsNil)
+ t0 := time.Now()
+ r, n, _, err := kc.Get(hash)
+ c.Assert(err, IsNil)
+ c.Check(n, Equals, int64(3))
+ elapsed := time.Since(t0)
+
+ nonsleeptime := time.Second / 10
+ expect := kc.RetryDelay
+ if expect == 0 {
+ expect = DefaultRetryDelay
+ }
+ min := expect + expect*7/4 + expect*7/4*7/4
+ max := expect + expect*2 + expect*2*2 + nonsleeptime
+ c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+ c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
- c.Logf("%q", st.reqIDs)
- c.Assert(len(st.reqIDs) > 1, Equals, true)
- for _, reqid := range st.reqIDs {
- c.Check(reqid, Not(Equals), "")
- c.Check(reqid, Equals, st.reqIDs[0])
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, IsNil)
+ c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
+
+ c.Logf("%q", st.reqIDs)
+ if c.Check(st.reqIDs, Not(HasLen), 0) {
+ for _, reqid := range st.reqIDs {
+ c.Check(reqid, Not(Equals), "")
+ c.Check(reqid, Equals, st.reqIDs[0])
+ }
+ }
}
}
@@ -1484,6 +1502,9 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
}
func (s *StandaloneSuite) TestPutBRetry(c *C) {
+ defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+ DefaultRetryDelay = time.Second
+
st := &FailThenSucceedHandler{
handled: make(chan string, 1),
successhandler: &StubPutHandler{
@@ -1515,11 +1536,14 @@ func (s *StandaloneSuite) TestPutBRetry(c *C) {
kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+ t0 := time.Now()
hash, replicas, err := kc.PutB([]byte("foo"))
c.Check(err, IsNil)
c.Check(hash, Equals, "")
c.Check(replicas, Equals, 2)
+ elapsed := time.Since(t0)
+ c.Check(elapsed > DefaultRetryDelay, Equals, true, Commentf("elapsed %v <= DefaultRetryDelay %v", elapsed, DefaultRetryDelay))
}
func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 6acaf64baa..9dc57267c0 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -17,6 +17,7 @@ import (
"os"
"strconv"
"strings"
+ "time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -218,6 +219,10 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite
replicasPerThread = req.Replicas
}
+ retryDelay := kc.RetryDelay
+ if retryDelay < 1 {
+ retryDelay = DefaultRetryDelay
+ }
retriesRemaining := req.Attempts
var retryServers []string
@@ -306,14 +311,18 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite
}
if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
- (status.statusCode >= 500 && status.statusCode != 503) {
+ (status.statusCode >= 500 && status.statusCode != http.StatusInsufficientStorage) {
// Timeout, too many requests, or other server side failure
- // Do not retry when status code is 503, which means the keep server is full
+ // (do not auto-retry status 507 "full")
retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
}
}
sv = retryServers
+ if len(sv) > 0 {
+ time.Sleep(retryDelay)
+ retryDelay = 2 * retryDelay
+ }
}
return resp, nil
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 39ffd45cbe..8afe20f911 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -320,6 +320,7 @@ func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
kc := h.makeKeepClient(req)
kc.DiskCacheSize = keepclient.DiskCacheDisabled
+ kc.RetryDelay = time.Second
var pass bool
var tok string
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index 7efba2348b..473be4d3a6 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -32,8 +32,8 @@ import (
. "gopkg.in/check.v1"
)
-// Gocheck boilerplate
func Test(t *testing.T) {
+ keepclient.DefaultRetryDelay = time.Millisecond
TestingT(t)
}
diff --git a/services/keepstore/command.go b/services/keepstore/command.go
index 48c8256a3c..7de9fa833e 100644
--- a/services/keepstore/command.go
+++ b/services/keepstore/command.go
@@ -206,6 +206,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str
Arvados: ac,
Want_replicas: 1,
DiskCacheSize: keepclient.DiskCacheDisabled,
+ Retries: 0,
}
h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63())
commit c300d42d1b07d74a054be8bb561b3152c537376c
Author: Tom Clegg <tom at curii.com>
Date: Thu Jan 18 14:04:26 2024 -0500
21023: Avoid overloading 503 status.
When all volumes are full, return 507 (cf WebDAV) indicating the
client should not auto-retry without additional user interaction.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5bdafb77c2..4352d9a54a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -198,7 +198,7 @@ func (s *HandlerSuite) TestGetHandler(c *check.C) {
// should make the service return a 503 so that clients can retry.
ExpectStatusCode(c,
"Volume backend busy",
- 503, response)
+ http.StatusServiceUnavailable, response)
}
// Test PutBlockHandler on the following situations:
@@ -1270,7 +1270,7 @@ func (s *HandlerSuite) TestPutStorageClasses(c *check.C) {
c.Logf("failure case %#v", trial)
rt.storageClasses = trial.ask
resp := IssueRequest(s.handler, &rt)
- c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+ c.Check(resp.Code, check.Equals, http.StatusInsufficientStorage)
}
}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index abeb20fe86..089ebb46da 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -853,7 +853,7 @@ func newPutProgress(classes []string) putProgress {
//
// The MD5 hash of the BLOCK does not match the argument HASH.
//
-// 503 Full
+// 507 Full
//
// There was not enough space left in any Keep volume to store
// the object.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 953aa047cb..46b00339b5 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -5,6 +5,7 @@
package keepstore
import (
+ "net/http"
"time"
)
@@ -23,22 +24,22 @@ type KeepError struct {
}
var (
- BadRequestError = &KeepError{400, "Bad Request"}
- UnauthorizedError = &KeepError{401, "Unauthorized"}
- CollisionError = &KeepError{500, "Collision"}
- RequestHashError = &KeepError{422, "Hash mismatch in request"}
- PermissionError = &KeepError{403, "Forbidden"}
- DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
- ExpiredError = &KeepError{401, "Expired permission signature"}
- NotFoundError = &KeepError{404, "Not Found"}
- VolumeBusyError = &KeepError{503, "Volume backend busy"}
- GenericError = &KeepError{500, "Fail"}
- FullError = &KeepError{503, "Full"}
- SizeRequiredError = &KeepError{411, "Missing Content-Length"}
- TooLongError = &KeepError{413, "Block is too large"}
- MethodDisabledError = &KeepError{405, "Method disabled"}
- ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
- ErrClientDisconnect = &KeepError{503, "Client disconnected"}
+ BadRequestError = &KeepError{http.StatusBadRequest, "Bad Request"}
+ UnauthorizedError = &KeepError{http.StatusUnauthorized, "Unauthorized"}
+ CollisionError = &KeepError{http.StatusInternalServerError, "Collision"}
+ RequestHashError = &KeepError{http.StatusUnprocessableEntity, "Hash mismatch in request"}
+ PermissionError = &KeepError{http.StatusForbidden, "Forbidden"}
+ DiskHashError = &KeepError{http.StatusInternalServerError, "Hash mismatch in stored data"}
+ ExpiredError = &KeepError{http.StatusUnauthorized, "Expired permission signature"}
+ NotFoundError = &KeepError{http.StatusNotFound, "Not Found"}
+ VolumeBusyError = &KeepError{http.StatusServiceUnavailable, "Volume backend busy"}
+ GenericError = &KeepError{http.StatusInternalServerError, "Fail"}
+ FullError = &KeepError{http.StatusInsufficientStorage, "Full"}
+ SizeRequiredError = &KeepError{http.StatusLengthRequired, "Missing Content-Length"}
+ TooLongError = &KeepError{http.StatusRequestEntityTooLarge, "Block is too large"}
+ MethodDisabledError = &KeepError{http.StatusMethodNotAllowed, "Method disabled"}
+ ErrNotImplemented = &KeepError{http.StatusInternalServerError, "Unsupported configuration"}
+ ErrClientDisconnect = &KeepError{499, "Client disconnected"} // non-RFC Nginx status code
)
func (e *KeepError) Error() string {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list