[arvados] created: 2.7.0-6017-g39cb7e209f

git repository hosting git at public.arvados.org
Wed Feb 14 19:28:45 UTC 2024


        at  39cb7e209f415766650a67cbe377e3054ec02453 (commit)


commit 39cb7e209f415766650a67cbe377e3054ec02453
Author: Tom Clegg <tom at curii.com>
Date:   Wed Feb 14 14:28:27 2024 -0500

    21023: Add exponential-backoff delay between keepclient retries.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 2bd7996b59..e7d6d24632 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -15,6 +15,7 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"math/rand"
 	"net"
 	"net/http"
 	"os"
@@ -44,6 +45,8 @@ var (
 	DefaultProxyTLSHandshakeTimeout = 10 * time.Second
 	DefaultProxyKeepAlive           = 120 * time.Second
 
+	DefaultRetryDelay = 2 * time.Second
+
 	rootCacheDir = "/var/cache/arvados/keep"
 	userCacheDir = ".cache/arvados/keep" // relative to HOME
 )
@@ -113,6 +116,7 @@ type KeepClient struct {
 	lock                  sync.RWMutex
 	HTTPClient            HTTPClient
 	Retries               int
+	RetryDelay            time.Duration // Delay after first attempt (increases on each attempt); if 0, use DefaultRetryDelay
 	RequestID             string
 	StorageClasses        []string
 	DefaultStorageClasses []string                  // Set by cluster's exported config
@@ -141,6 +145,7 @@ func (kc *KeepClient) Clone() *KeepClient {
 		gatewayRoots:          kc.gatewayRoots,
 		HTTPClient:            kc.HTTPClient,
 		Retries:               kc.Retries,
+		RetryDelay:            kc.RetryDelay,
 		RequestID:             kc.RequestID,
 		StorageClasses:        kc.StorageClasses,
 		DefaultStorageClasses: kc.DefaultStorageClasses,
@@ -190,6 +195,7 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient {
 		Arvados:       arv,
 		Want_replicas: defaultReplicationLevel,
 		Retries:       2,
+		RetryDelay:    DefaultRetryDelay,
 	}
 	err = kc.loadDefaultClasses()
 	if err != nil {
@@ -269,6 +275,10 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
 
 	var errs []string
 
+	retryDelay := kc.RetryDelay
+	if retryDelay < 1 {
+		retryDelay = DefaultRetryDelay
+	}
 	triesRemaining := 1 + kc.Retries
 
 	serversToTry := kc.getSortedRoots(locator)
@@ -348,6 +358,12 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
 			return nil, expectLength, url, resp.Header, nil
 		}
 		serversToTry = retryList
+		if len(serversToTry) > 0 && triesRemaining > 0 {
+			time.Sleep(retryDelay)
+			// Increase delay by a random factor between
+			// 1.75x and 2x
+			retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay))
+		}
 	}
 	DebugPrintf("DEBUG: %s %s failed: %v", method, locator, errs)
 
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 94591bd064..19a6d4e03b 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -17,6 +17,7 @@ import (
 	"os"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -26,8 +27,8 @@ import (
 	. "gopkg.in/check.v1"
 )
 
-// Gocheck boilerplate
 func Test(t *testing.T) {
+	DefaultRetryDelay = 50 * time.Millisecond
 	TestingT(t)
 }
 
@@ -421,17 +422,17 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 }
 
 type FailThenSucceedHandler struct {
+	morefails      int // fail 1 + this many times before succeeding
 	handled        chan string
-	count          int
+	count          atomic.Int64
 	successhandler http.Handler
 	reqIDs         []string
 }
 
 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
-	if fh.count == 0 {
+	if int(fh.count.Add(1)) <= fh.morefails+1 {
 		resp.WriteHeader(500)
-		fh.count++
 		fh.handled <- fmt.Sprintf("http://%s", req.Host)
 	} else {
 		fh.successhandler.ServeHTTP(resp, req)
@@ -560,14 +561,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
 	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
 
-	reader, writer := io.Pipe()
-
-	go func() {
-		writer.Write([]byte("foo"))
-		writer.Close()
-	}()
-
-	kc.PutHR(hash, reader, 3)
+	kc.PutHR(hash, bytes.NewBuffer([]byte("foo")), 3)
 
 	shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
 
@@ -804,40 +798,64 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 }
 
 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+	defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+	DefaultRetryDelay = time.Second / 8
+
 	hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
 
-	st := &FailThenSucceedHandler{
-		handled: make(chan string, 1),
-		successhandler: StubGetHandler{
-			c,
-			hash,
-			"abc123",
-			http.StatusOK,
-			[]byte("foo")}}
+	for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+		c.Logf("=== initial delay %v", delay)
 
-	ks := RunFakeKeepServer(st)
-	defer ks.listener.Close()
+		st := &FailThenSucceedHandler{
+			morefails: 2,
+			handled:   make(chan string, 4),
+			successhandler: StubGetHandler{
+				c,
+				hash,
+				"abc123",
+				http.StatusOK,
+				[]byte("foo")}}
 
-	arv, err := arvadosclient.MakeArvadosClient()
-	c.Check(err, IsNil)
-	kc, _ := MakeKeepClient(arv)
-	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+		ks := RunFakeKeepServer(st)
+		defer ks.listener.Close()
 
-	r, n, _, err := kc.Get(hash)
-	c.Assert(err, IsNil)
-	c.Check(n, Equals, int64(3))
+		arv, err := arvadosclient.MakeArvadosClient()
+		c.Check(err, IsNil)
+		kc, _ := MakeKeepClient(arv)
+		arv.ApiToken = "abc123"
+		kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+		kc.Retries = 3
+		kc.RetryDelay = delay
+		kc.DiskCacheSize = DiskCacheDisabled
 
-	content, err := ioutil.ReadAll(r)
-	c.Check(err, IsNil)
-	c.Check(content, DeepEquals, []byte("foo"))
-	c.Check(r.Close(), IsNil)
+		t0 := time.Now()
+		r, n, _, err := kc.Get(hash)
+		c.Assert(err, IsNil)
+		c.Check(n, Equals, int64(3))
+		elapsed := time.Since(t0)
 
-	c.Logf("%q", st.reqIDs)
-	c.Assert(len(st.reqIDs) > 1, Equals, true)
-	for _, reqid := range st.reqIDs {
-		c.Check(reqid, Not(Equals), "")
-		c.Check(reqid, Equals, st.reqIDs[0])
+		nonsleeptime := time.Second / 10
+		expect := kc.RetryDelay
+		if expect == 0 {
+			expect = DefaultRetryDelay
+		}
+		min := expect + expect*7/4 + expect*7/4*7/4
+		max := expect + expect*2 + expect*2*2 + nonsleeptime
+		c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+		c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
+
+		content, err := ioutil.ReadAll(r)
+		c.Check(err, IsNil)
+		c.Check(content, DeepEquals, []byte("foo"))
+		c.Check(r.Close(), IsNil)
+
+		c.Logf("%q", st.reqIDs)
+		if c.Check(st.reqIDs, Not(HasLen), 0) {
+			for _, reqid := range st.reqIDs {
+				c.Check(reqid, Not(Equals), "")
+				c.Check(reqid, Equals, st.reqIDs[0])
+			}
+		}
 	}
 }
 
@@ -1484,42 +1502,65 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
 }
 
 func (s *StandaloneSuite) TestPutBRetry(c *C) {
-	st := &FailThenSucceedHandler{
-		handled: make(chan string, 1),
-		successhandler: &StubPutHandler{
-			c:                    c,
-			expectPath:           Md5String("foo"),
-			expectAPIToken:       "abc123",
-			expectBody:           "foo",
-			expectStorageClass:   "default",
-			returnStorageClasses: "",
-			handled:              make(chan string, 5),
-		},
-	}
+	defer func(orig time.Duration) { DefaultRetryDelay = orig }(DefaultRetryDelay)
+	DefaultRetryDelay = time.Second / 8
+
+	for _, delay := range []time.Duration{0, time.Nanosecond, time.Second / 8, time.Second / 16} {
+		c.Logf("=== initial delay %v", delay)
+
+		st := &FailThenSucceedHandler{
+			morefails: 5, // handler will fail 6x in total, 3 for each server
+			handled:   make(chan string, 10),
+			successhandler: &StubPutHandler{
+				c:                    c,
+				expectPath:           Md5String("foo"),
+				expectAPIToken:       "abc123",
+				expectBody:           "foo",
+				expectStorageClass:   "default",
+				returnStorageClasses: "",
+				handled:              make(chan string, 5),
+			},
+		}
 
-	arv, _ := arvadosclient.MakeArvadosClient()
-	kc, _ := MakeKeepClient(arv)
+		arv, _ := arvadosclient.MakeArvadosClient()
+		kc, _ := MakeKeepClient(arv)
+		kc.Retries = 3
+		kc.RetryDelay = delay
+		kc.DiskCacheSize = DiskCacheDisabled
+		kc.Want_replicas = 2
 
-	kc.Want_replicas = 2
-	arv.ApiToken = "abc123"
-	localRoots := make(map[string]string)
-	writableLocalRoots := make(map[string]string)
+		arv.ApiToken = "abc123"
+		localRoots := make(map[string]string)
+		writableLocalRoots := make(map[string]string)
 
-	ks := RunSomeFakeKeepServers(st, 2)
+		ks := RunSomeFakeKeepServers(st, 2)
 
-	for i, k := range ks {
-		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
-		writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
-		defer k.listener.Close()
-	}
+		for i, k := range ks {
+			localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+			writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+			defer k.listener.Close()
+		}
 
-	kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+		kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
 
-	hash, replicas, err := kc.PutB([]byte("foo"))
+		t0 := time.Now()
+		hash, replicas, err := kc.PutB([]byte("foo"))
 
-	c.Check(err, IsNil)
-	c.Check(hash, Equals, "")
-	c.Check(replicas, Equals, 2)
+		c.Check(err, IsNil)
+		c.Check(hash, Equals, "")
+		c.Check(replicas, Equals, 2)
+		elapsed := time.Since(t0)
+
+		nonsleeptime := time.Second / 10
+		expect := kc.RetryDelay
+		if expect == 0 {
+			expect = DefaultRetryDelay
+		}
+		min := expect + expect*7/4 + expect*7/4*7/4
+		max := expect + expect*2 + expect*2*2 + nonsleeptime
+		c.Check(elapsed >= min, Equals, true, Commentf("elapsed %v / expect min %v", elapsed, min))
+		c.Check(elapsed <= max, Equals, true, Commentf("elapsed %v / expect max %v", elapsed, max))
+	}
 }
 
 func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 6acaf64baa..459648b9a6 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -13,10 +13,12 @@ import (
 	"io"
 	"io/ioutil"
 	"log"
+	"math/rand"
 	"net/http"
 	"os"
 	"strconv"
 	"strings"
+	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -218,6 +220,10 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite
 		replicasPerThread = req.Replicas
 	}
 
+	retryDelay := kc.RetryDelay
+	if retryDelay < 1 {
+		retryDelay = DefaultRetryDelay
+	}
 	retriesRemaining := req.Attempts
 	var retryServers []string
 
@@ -306,14 +312,20 @@ func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWrite
 			}
 
 			if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
-				(status.statusCode >= 500 && status.statusCode != 503) {
+				(status.statusCode >= 500 && status.statusCode != http.StatusInsufficientStorage) {
 				// Timeout, too many requests, or other server side failure
-				// Do not retry when status code is 503, which means the keep server is full
+				// (do not auto-retry status 507 "full")
 				retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
 			}
 		}
 
 		sv = retryServers
+		if len(sv) > 0 {
+			time.Sleep(retryDelay)
+			// Increase delay by a random factor between
+			// 1.75x and 2x
+			retryDelay = time.Duration((2 - rand.Float64()/4) * float64(retryDelay))
+		}
 	}
 
 	return resp, nil
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 39ffd45cbe..8afe20f911 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -320,6 +320,7 @@ func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
 
 	kc := h.makeKeepClient(req)
 	kc.DiskCacheSize = keepclient.DiskCacheDisabled
+	kc.RetryDelay = time.Second
 
 	var pass bool
 	var tok string
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index 7efba2348b..473be4d3a6 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -32,8 +32,8 @@ import (
 	. "gopkg.in/check.v1"
 )
 
-// Gocheck boilerplate
 func Test(t *testing.T) {
+	keepclient.DefaultRetryDelay = time.Millisecond
 	TestingT(t)
 }
 
diff --git a/services/keepstore/command.go b/services/keepstore/command.go
index 48c8256a3c..7de9fa833e 100644
--- a/services/keepstore/command.go
+++ b/services/keepstore/command.go
@@ -206,6 +206,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str
 		Arvados:       ac,
 		Want_replicas: 1,
 		DiskCacheSize: keepclient.DiskCacheDisabled,
+		Retries:       0,
 	}
 	h.keepClient.Arvados.ApiToken = fmt.Sprintf("%x", rand.Int63())
 

commit c300d42d1b07d74a054be8bb561b3152c537376c
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jan 18 14:04:26 2024 -0500

    21023: Avoid overloading 503 status.
    
    When all volumes are full, return 507 (cf WebDAV) indicating the
    client should not auto-retry without additional user interaction.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5bdafb77c2..4352d9a54a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -198,7 +198,7 @@ func (s *HandlerSuite) TestGetHandler(c *check.C) {
 	// should make the service return a 503 so that clients can retry.
 	ExpectStatusCode(c,
 		"Volume backend busy",
-		503, response)
+		http.StatusServiceUnavailable, response)
 }
 
 // Test PutBlockHandler on the following situations:
@@ -1270,7 +1270,7 @@ func (s *HandlerSuite) TestPutStorageClasses(c *check.C) {
 		c.Logf("failure case %#v", trial)
 		rt.storageClasses = trial.ask
 		resp := IssueRequest(s.handler, &rt)
-		c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+		c.Check(resp.Code, check.Equals, http.StatusInsufficientStorage)
 	}
 }
 
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index abeb20fe86..089ebb46da 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -853,7 +853,7 @@ func newPutProgress(classes []string) putProgress {
 //
 //	The MD5 hash of the BLOCK does not match the argument HASH.
 //
-// 503 Full
+// 507 Full
 //
 //	There was not enough space left in any Keep volume to store
 //	the object.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 953aa047cb..46b00339b5 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -5,6 +5,7 @@
 package keepstore
 
 import (
+	"net/http"
 	"time"
 )
 
@@ -23,22 +24,22 @@ type KeepError struct {
 }
 
 var (
-	BadRequestError     = &KeepError{400, "Bad Request"}
-	UnauthorizedError   = &KeepError{401, "Unauthorized"}
-	CollisionError      = &KeepError{500, "Collision"}
-	RequestHashError    = &KeepError{422, "Hash mismatch in request"}
-	PermissionError     = &KeepError{403, "Forbidden"}
-	DiskHashError       = &KeepError{500, "Hash mismatch in stored data"}
-	ExpiredError        = &KeepError{401, "Expired permission signature"}
-	NotFoundError       = &KeepError{404, "Not Found"}
-	VolumeBusyError     = &KeepError{503, "Volume backend busy"}
-	GenericError        = &KeepError{500, "Fail"}
-	FullError           = &KeepError{503, "Full"}
-	SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
-	TooLongError        = &KeepError{413, "Block is too large"}
-	MethodDisabledError = &KeepError{405, "Method disabled"}
-	ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
-	ErrClientDisconnect = &KeepError{503, "Client disconnected"}
+	BadRequestError     = &KeepError{http.StatusBadRequest, "Bad Request"}
+	UnauthorizedError   = &KeepError{http.StatusUnauthorized, "Unauthorized"}
+	CollisionError      = &KeepError{http.StatusInternalServerError, "Collision"}
+	RequestHashError    = &KeepError{http.StatusUnprocessableEntity, "Hash mismatch in request"}
+	PermissionError     = &KeepError{http.StatusForbidden, "Forbidden"}
+	DiskHashError       = &KeepError{http.StatusInternalServerError, "Hash mismatch in stored data"}
+	ExpiredError        = &KeepError{http.StatusUnauthorized, "Expired permission signature"}
+	NotFoundError       = &KeepError{http.StatusNotFound, "Not Found"}
+	VolumeBusyError     = &KeepError{http.StatusServiceUnavailable, "Volume backend busy"}
+	GenericError        = &KeepError{http.StatusInternalServerError, "Fail"}
+	FullError           = &KeepError{http.StatusInsufficientStorage, "Full"}
+	SizeRequiredError   = &KeepError{http.StatusLengthRequired, "Missing Content-Length"}
+	TooLongError        = &KeepError{http.StatusRequestEntityTooLarge, "Block is too large"}
+	MethodDisabledError = &KeepError{http.StatusMethodNotAllowed, "Method disabled"}
+	ErrNotImplemented   = &KeepError{http.StatusInternalServerError, "Unsupported configuration"}
+	ErrClientDisconnect = &KeepError{499, "Client disconnected"} // non-RFC Nginx status code
 )
 
 func (e *KeepError) Error() string {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list