[ARVADOS] created: 1.3.0-2662-g2bc1a7a89
Git user
git at public.arvados.org
Tue Jun 16 14:11:09 UTC 2020
at 2bc1a7a89597ab02aaeef84b82fdc51f8e375b79 (commit)
commit 2bc1a7a89597ab02aaeef84b82fdc51f8e375b79
Author: Tom Clegg <tom at tomclegg.ca>
Date: Tue Jun 16 10:10:53 2020 -0400
16480: Use longer timeout for keepstore index requests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/recovercollection/cmd.go b/lib/recovercollection/cmd.go
index cea4607c9..da466c31c 100644
--- a/lib/recovercollection/cmd.go
+++ b/lib/recovercollection/cmd.go
@@ -204,8 +204,8 @@ var errNotFound = errors.New("not found")
// Finds the timestamp of the newest copy of blk on svc. Returns
// errNotFound if blk is not on svc at all.
-func (rcvr recoverer) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
- found, err := svc.Index(rcvr.client, blk)
+func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
+ found, err := svc.Index(ctx, rcvr.client, blk)
if err != nil {
logger.WithError(err).Warn("error getting index")
return time.Time{}, err
@@ -236,7 +236,7 @@ var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -
// saved. But if the block's timestamp is more recent than blobsigttl,
// keepstore will refuse to trash it even if told to by keep-balance.
func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
- if latest, err := rcvr.newestMtime(logger, blk, svc); err != nil {
+ if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
return err
} else if latest.Add(blobsigttl).After(blobsigexp) {
return nil
@@ -245,7 +245,7 @@ func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger,
return fmt.Errorf("error updating timestamp: %s", err)
}
logger.Debug("updated timestamp")
- if latest, err := rcvr.newestMtime(logger, blk, svc); err == errNotFound {
+ if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
} else if err != nil {
return err
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 1e2c07e86..562c8c1e7 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -57,9 +57,16 @@ type Client struct {
// HTTP headers to add/override in outgoing requests.
SendHeader http.Header
+ // Timeout for requests. NewClientFromConfig and
+ // NewClientFromEnv return a Client with a default 5 minute
+ // timeout. To disable this timeout and rely on each
+ // http.Request's context deadline instead, set Timeout to
+ // zero.
+ Timeout time.Duration
+
dd *DiscoveryDocument
- ctx context.Context
+ defaultRequestID string
}
// The default http.Client used by a Client with Insecure==true and
@@ -67,12 +74,10 @@ type Client struct {
var InsecureHTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
- InsecureSkipVerify: true}},
- Timeout: 5 * time.Minute}
+ InsecureSkipVerify: true}}}
// The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
- Timeout: 5 * time.Minute}
+var DefaultSecureClient = &http.Client{}
// NewClientFromConfig creates a new Client that uses the endpoints in
// the given cluster.
@@ -87,6 +92,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
Scheme: ctrlURL.Scheme,
APIHost: ctrlURL.Host,
Insecure: cluster.TLS.Insecure,
+ Timeout: 5 * time.Minute,
}, nil
}
@@ -116,6 +122,7 @@ func NewClientFromEnv() *Client {
AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
Insecure: insecure,
KeepServiceURIs: svcs,
+ Timeout: 5 * time.Minute,
}
}
@@ -131,11 +138,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
if req.Header.Get("X-Request-Id") == "" {
- reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
- if reqid == "" {
- reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
- }
- if reqid == "" {
+ var reqid string
+ if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+ reqid = ctxreqid
+ } else if c.defaultRequestID != "" {
+ reqid = c.defaultRequestID
+ } else {
reqid = reqIDGen.Next()
}
if req.Header == nil {
@@ -144,7 +152,36 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
req.Header.Set("X-Request-Id", reqid)
}
}
- return c.httpClient().Do(req)
+ var cancel context.CancelFunc
+ if c.Timeout > 0 {
+ ctx := req.Context()
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+ req = req.WithContext(ctx)
+ }
+ resp, err := c.httpClient().Do(req)
+ if err == nil && cancel != nil {
+ // We need to call cancel() eventually, but we can't
+ // use "defer cancel()" because the context has to
+ // stay alive until the caller has finished reading
+ // the response body.
+ resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
+ } else if cancel != nil {
+ cancel()
+ }
+ return resp, err
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+ io.ReadCloser
+ cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+ err := coc.ReadCloser.Close()
+ coc.cancel()
+ return err
}
func isRedirectStatus(code int) bool {
@@ -266,7 +303,7 @@ func anythingToValues(params interface{}) (url.Values, error) {
//
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
- return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+ return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
}
func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
@@ -332,17 +369,10 @@ func (c *Client) UpdateBody(rsc resource) io.Reader {
// header.
func (c *Client) WithRequestID(reqid string) *Client {
cc := *c
- cc.ctx = ContextWithRequestID(cc.context(), reqid)
+ cc.defaultRequestID = reqid
return &cc
}
-func (c *Client) context() context.Context {
- if c.ctx == nil {
- return context.Background()
- }
- return c.ctx
-}
-
func (c *Client) httpClient() *http.Client {
switch {
case c.Client != nil:
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 3af747920..da1710374 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -141,20 +141,20 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
}
// Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
}
// Index returns an unsorted list of blocks that can be retrieved from
// this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, s.url("index/"+prefix))
}
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
- req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+ req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
- return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
}
resp, err := c.Do(req)
if err != nil {
diff --git a/sdk/go/arvados/keep_service_test.go b/sdk/go/arvados/keep_service_test.go
index 8715f74f0..3a82f4b7e 100644
--- a/sdk/go/arvados/keep_service_test.go
+++ b/sdk/go/arvados/keep_service_test.go
@@ -5,6 +5,7 @@
package arvados
import (
+ "context"
"net/http"
check "gopkg.in/check.v1"
@@ -22,6 +23,6 @@ func (*KeepServiceSuite) TestIndexTimeout(c *check.C) {
APIHost: "zzzzz.arvadosapi.com",
AuthToken: "xyzzy",
}
- _, err := (&KeepService{}).IndexMount(client, "fake", "")
+ _, err := (&KeepService{}).IndexMount(context.Background(), client, "fake", "")
c.Check(err, check.ErrorMatches, `.*timeout.*`)
}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 3c35d304c..261714605 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -6,6 +6,7 @@ package main
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io"
@@ -126,6 +127,13 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
// succeed in clearing existing trash lists.
nextRunOptions.SafeRendezvousState = rs
}
+
+ // Indexing and sending trash/pull lists can take a long time
+ // on a big site. Prefer a long timeout (causing slow recovery
+ // from undetected network problems) to a short timeout
+ // (causing starvation via perpetual timeout/restart cycle).
+ client.Timeout = 24 * time.Hour
+
if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
@@ -305,6 +313,9 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
//
// It encodes the resulting information in BlockStateMap.
func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
defer bal.time("get_state", "wall clock time to get current state")()
bal.BlockStateMap = NewBlockStateMap()
@@ -348,12 +359,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
go func(mounts []*KeepMount) {
defer wg.Done()
bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
- idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+ idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
if err != nil {
select {
case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
default:
}
+ cancel()
return
}
if len(errs) > 0 {
@@ -391,6 +403,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
}
for range collQ {
}
+ cancel()
return
}
bal.collScanned++
@@ -422,6 +435,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
case errs <- err:
default:
}
+ cancel()
}
}()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list