[ARVADOS] created: 1.3.0-2662-g2bc1a7a89

Git user git at public.arvados.org
Tue Jun 16 14:11:09 UTC 2020


        at  2bc1a7a89597ab02aaeef84b82fdc51f8e375b79 (commit)


commit 2bc1a7a89597ab02aaeef84b82fdc51f8e375b79
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Tue Jun 16 10:10:53 2020 -0400

    16480: Use longer timeout for keepstore index requests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/recovercollection/cmd.go b/lib/recovercollection/cmd.go
index cea4607c9..da466c31c 100644
--- a/lib/recovercollection/cmd.go
+++ b/lib/recovercollection/cmd.go
@@ -204,8 +204,8 @@ var errNotFound = errors.New("not found")
 
 // Finds the timestamp of the newest copy of blk on svc. Returns
 // errNotFound if blk is not on svc at all.
-func (rcvr recoverer) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
-	found, err := svc.Index(rcvr.client, blk)
+func (rcvr recoverer) newestMtime(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, error) {
+	found, err := svc.Index(ctx, rcvr.client, blk)
 	if err != nil {
 		logger.WithError(err).Warn("error getting index")
 		return time.Time{}, err
@@ -236,7 +236,7 @@ var errTouchIneffective = errors.New("(BUG?) touch succeeded but had no effect -
 // saved. But if the block's timestamp is more recent than blobsigttl,
 // keepstore will refuse to trash it even if told to by keep-balance.
 func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) error {
-	if latest, err := rcvr.newestMtime(logger, blk, svc); err != nil {
+	if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err != nil {
 		return err
 	} else if latest.Add(blobsigttl).After(blobsigexp) {
 		return nil
@@ -245,7 +245,7 @@ func (rcvr recoverer) ensureSafe(ctx context.Context, logger logrus.FieldLogger,
 		return fmt.Errorf("error updating timestamp: %s", err)
 	}
 	logger.Debug("updated timestamp")
-	if latest, err := rcvr.newestMtime(logger, blk, svc); err == errNotFound {
+	if latest, err := rcvr.newestMtime(ctx, logger, blk, svc); err == errNotFound {
 		return fmt.Errorf("(BUG?) touch succeeded, but then block did not appear in index")
 	} else if err != nil {
 		return err
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 1e2c07e86..562c8c1e7 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -57,9 +57,16 @@ type Client struct {
 	// HTTP headers to add/override in outgoing requests.
 	SendHeader http.Header
 
+	// Timeout for requests. NewClientFromConfig and
+	// NewClientFromEnv return a Client with a default 5 minute
+	// timeout.  To disable this timeout and rely on each
+	// http.Request's context deadline instead, set Timeout to
+	// zero.
+	Timeout time.Duration
+
 	dd *DiscoveryDocument
 
-	ctx context.Context
+	defaultRequestID string
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -67,12 +74,10 @@ type Client struct {
 var InsecureHTTPClient = &http.Client{
 	Transport: &http.Transport{
 		TLSClientConfig: &tls.Config{
-			InsecureSkipVerify: true}},
-	Timeout: 5 * time.Minute}
+			InsecureSkipVerify: true}}}
 
 // The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
-	Timeout: 5 * time.Minute}
+var DefaultSecureClient = &http.Client{}
 
 // NewClientFromConfig creates a new Client that uses the endpoints in
 // the given cluster.
@@ -87,6 +92,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
 		Scheme:   ctrlURL.Scheme,
 		APIHost:  ctrlURL.Host,
 		Insecure: cluster.TLS.Insecure,
+		Timeout:  5 * time.Minute,
 	}, nil
 }
 
@@ -116,6 +122,7 @@ func NewClientFromEnv() *Client {
 		AuthToken:       os.Getenv("ARVADOS_API_TOKEN"),
 		Insecure:        insecure,
 		KeepServiceURIs: svcs,
+		Timeout:         5 * time.Minute,
 	}
 }
 
@@ -131,11 +138,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	}
 
 	if req.Header.Get("X-Request-Id") == "" {
-		reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
-		if reqid == "" {
-			reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
-		}
-		if reqid == "" {
+		var reqid string
+		if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+			reqid = ctxreqid
+		} else if c.defaultRequestID != "" {
+			reqid = c.defaultRequestID
+		} else {
 			reqid = reqIDGen.Next()
 		}
 		if req.Header == nil {
@@ -144,7 +152,36 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 			req.Header.Set("X-Request-Id", reqid)
 		}
 	}
-	return c.httpClient().Do(req)
+	var cancel context.CancelFunc
+	if c.Timeout > 0 {
+		ctx := req.Context()
+		ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+		req = req.WithContext(ctx)
+	}
+	resp, err := c.httpClient().Do(req)
+	if err == nil && cancel != nil {
+		// We need to call cancel() eventually, but we can't
+		// use "defer cancel()" because the context has to
+		// stay alive until the caller has finished reading
+		// the response body.
+		resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
+	} else if cancel != nil {
+		cancel()
+	}
+	return resp, err
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+	io.ReadCloser
+	cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+	err := coc.ReadCloser.Close()
+	coc.cancel()
+	return err
 }
 
 func isRedirectStatus(code int) bool {
@@ -266,7 +303,7 @@ func anythingToValues(params interface{}) (url.Values, error) {
 //
 // path must not contain a query string.
 func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
-	return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+	return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
 }
 
 func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
@@ -332,17 +369,10 @@ func (c *Client) UpdateBody(rsc resource) io.Reader {
 // header.
 func (c *Client) WithRequestID(reqid string) *Client {
 	cc := *c
-	cc.ctx = ContextWithRequestID(cc.context(), reqid)
+	cc.defaultRequestID = reqid
 	return &cc
 }
 
-func (c *Client) context() context.Context {
-	if c.ctx == nil {
-		return context.Background()
-	}
-	return c.ctx
-}
-
 func (c *Client) httpClient() *http.Client {
 	switch {
 	case c.Client != nil:
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 3af747920..da1710374 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -141,20 +141,20 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
 }
 
 // Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
-	return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+	return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
 }
 
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-	return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+	return s.index(ctx, c, s.url("index/"+prefix))
 }
 
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
-	req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+	req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
 	if err != nil {
-		return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+		return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
 	}
 	resp, err := c.Do(req)
 	if err != nil {
diff --git a/sdk/go/arvados/keep_service_test.go b/sdk/go/arvados/keep_service_test.go
index 8715f74f0..3a82f4b7e 100644
--- a/sdk/go/arvados/keep_service_test.go
+++ b/sdk/go/arvados/keep_service_test.go
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+	"context"
 	"net/http"
 
 	check "gopkg.in/check.v1"
@@ -22,6 +23,6 @@ func (*KeepServiceSuite) TestIndexTimeout(c *check.C) {
 		APIHost:   "zzzzz.arvadosapi.com",
 		AuthToken: "xyzzy",
 	}
-	_, err := (&KeepService{}).IndexMount(client, "fake", "")
+	_, err := (&KeepService{}).IndexMount(context.Background(), client, "fake", "")
 	c.Check(err, check.ErrorMatches, `.*timeout.*`)
 }
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 3c35d304c..261714605 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -6,6 +6,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"io"
@@ -126,6 +127,13 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 		// succeed in clearing existing trash lists.
 		nextRunOptions.SafeRendezvousState = rs
 	}
+
+	// Indexing and sending trash/pull lists can take a long time
+	// on a big site. Prefer a long timeout (causing slow recovery
+	// from undetected network problems) to a short timeout
+	// (causing starvation via perpetual timeout/restart cycle).
+	client.Timeout = 24 * time.Hour
+
 	if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
 		return
 	}
@@ -305,6 +313,9 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 //
 // It encodes the resulting information in BlockStateMap.
 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	defer bal.time("get_state", "wall clock time to get current state")()
 	bal.BlockStateMap = NewBlockStateMap()
 
@@ -348,12 +359,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 		go func(mounts []*KeepMount) {
 			defer wg.Done()
 			bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
-			idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+			idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
 			if err != nil {
 				select {
 				case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
 				default:
 				}
+				cancel()
 				return
 			}
 			if len(errs) > 0 {
@@ -391,6 +403,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 				}
 				for range collQ {
 				}
+				cancel()
 				return
 			}
 			bal.collScanned++
@@ -422,6 +435,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 			case errs <- err:
 			default:
 			}
+			cancel()
 		}
 	}()
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list