[ARVADOS] created: 1.3.0-2662-g9db71ff51

Git user git at public.arvados.org
Mon Jun 15 17:35:54 UTC 2020


        at  9db71ff51a26e61e9be51f46a2187c57a5bb48cc (commit)


commit 9db71ff51a26e61e9be51f46a2187c57a5bb48cc
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Jun 15 11:11:39 2020 -0400

    16480: Use longer timeout for keepstore index requests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 1e2c07e86..562c8c1e7 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -57,9 +57,16 @@ type Client struct {
 	// HTTP headers to add/override in outgoing requests.
 	SendHeader http.Header
 
+	// Timeout for requests. NewClientFromConfig and
+	// NewClientFromEnv return a Client with a default 5 minute
+	// timeout.  To disable this timeout and rely on each
+	// http.Request's context deadline instead, set Timeout to
+	// zero.
+	Timeout time.Duration
+
 	dd *DiscoveryDocument
 
-	ctx context.Context
+	defaultRequestID string
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -67,12 +74,10 @@ type Client struct {
 var InsecureHTTPClient = &http.Client{
 	Transport: &http.Transport{
 		TLSClientConfig: &tls.Config{
-			InsecureSkipVerify: true}},
-	Timeout: 5 * time.Minute}
+			InsecureSkipVerify: true}}}
 
 // The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
-	Timeout: 5 * time.Minute}
+var DefaultSecureClient = &http.Client{}
 
 // NewClientFromConfig creates a new Client that uses the endpoints in
 // the given cluster.
@@ -87,6 +92,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
 		Scheme:   ctrlURL.Scheme,
 		APIHost:  ctrlURL.Host,
 		Insecure: cluster.TLS.Insecure,
+		Timeout:  5 * time.Minute,
 	}, nil
 }
 
@@ -116,6 +122,7 @@ func NewClientFromEnv() *Client {
 		AuthToken:       os.Getenv("ARVADOS_API_TOKEN"),
 		Insecure:        insecure,
 		KeepServiceURIs: svcs,
+		Timeout:         5 * time.Minute,
 	}
 }
 
@@ -131,11 +138,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	}
 
 	if req.Header.Get("X-Request-Id") == "" {
-		reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
-		if reqid == "" {
-			reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
-		}
-		if reqid == "" {
+		var reqid string
+		if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+			reqid = ctxreqid
+		} else if c.defaultRequestID != "" {
+			reqid = c.defaultRequestID
+		} else {
 			reqid = reqIDGen.Next()
 		}
 		if req.Header == nil {
@@ -144,7 +152,36 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 			req.Header.Set("X-Request-Id", reqid)
 		}
 	}
-	return c.httpClient().Do(req)
+	var cancel context.CancelFunc
+	if c.Timeout > 0 {
+		ctx := req.Context()
+		ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+		req = req.WithContext(ctx)
+	}
+	resp, err := c.httpClient().Do(req)
+	if err == nil && cancel != nil {
+		// We need to call cancel() eventually, but we can't
+		// use "defer cancel()" because the context has to
+		// stay alive until the caller has finished reading
+		// the response body.
+		resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
+	} else if cancel != nil {
+		cancel()
+	}
+	return resp, err
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+	io.ReadCloser
+	cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+	err := coc.ReadCloser.Close()
+	coc.cancel()
+	return err
 }
 
 func isRedirectStatus(code int) bool {
@@ -266,7 +303,7 @@ func anythingToValues(params interface{}) (url.Values, error) {
 //
 // path must not contain a query string.
 func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
-	return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+	return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
 }
 
 func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
@@ -332,17 +369,10 @@ func (c *Client) UpdateBody(rsc resource) io.Reader {
 // header.
 func (c *Client) WithRequestID(reqid string) *Client {
 	cc := *c
-	cc.ctx = ContextWithRequestID(cc.context(), reqid)
+	cc.defaultRequestID = reqid
 	return &cc
 }
 
-func (c *Client) context() context.Context {
-	if c.ctx == nil {
-		return context.Background()
-	}
-	return c.ctx
-}
-
 func (c *Client) httpClient() *http.Client {
 	switch {
 	case c.Client != nil:
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 3af747920..da1710374 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -141,20 +141,20 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
 }
 
 // Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
-	return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+	return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
 }
 
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-	return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+	return s.index(ctx, c, s.url("index/"+prefix))
 }
 
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
-	req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+	req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
 	if err != nil {
-		return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+		return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
 	}
 	resp, err := c.Do(req)
 	if err != nil {
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 3c35d304c..261714605 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -6,6 +6,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"io"
@@ -126,6 +127,13 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 		// succeed in clearing existing trash lists.
 		nextRunOptions.SafeRendezvousState = rs
 	}
+
+	// Indexing and sending trash/pull lists can take a long time
+	// on a big site. Prefer a long timeout (causing slow recovery
+	// from undetected network problems) to a short timeout
+	// (causing starvation via perpetual timeout/restart cycle).
+	client.Timeout = 24 * time.Hour
+
 	if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
 		return
 	}
@@ -305,6 +313,9 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 //
 // It encodes the resulting information in BlockStateMap.
 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	defer bal.time("get_state", "wall clock time to get current state")()
 	bal.BlockStateMap = NewBlockStateMap()
 
@@ -348,12 +359,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 		go func(mounts []*KeepMount) {
 			defer wg.Done()
 			bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
-			idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+			idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
 			if err != nil {
 				select {
 				case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
 				default:
 				}
+				cancel()
 				return
 			}
 			if len(errs) > 0 {
@@ -391,6 +403,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 				}
 				for range collQ {
 				}
+				cancel()
 				return
 			}
 			bal.collScanned++
@@ -422,6 +435,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 			case errs <- err:
 			default:
 			}
+			cancel()
 		}
 	}()
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list