[ARVADOS] created: 1.3.0-2662-g9db71ff51
Git user
git at public.arvados.org
Mon Jun 15 17:35:54 UTC 2020
at 9db71ff51a26e61e9be51f46a2187c57a5bb48cc (commit)
commit 9db71ff51a26e61e9be51f46a2187c57a5bb48cc
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Jun 15 11:11:39 2020 -0400
16480: Use longer timeout for keepstore index requests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 1e2c07e86..562c8c1e7 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -57,9 +57,16 @@ type Client struct {
// HTTP headers to add/override in outgoing requests.
SendHeader http.Header
+ // Timeout for requests. NewClientFromConfig and
+ // NewClientFromEnv return a Client with a default 5 minute
+ // timeout. To disable this timeout and rely on each
+ // http.Request's context deadline instead, set Timeout to
+ // zero.
+ Timeout time.Duration
+
dd *DiscoveryDocument
- ctx context.Context
+ defaultRequestID string
}
// The default http.Client used by a Client with Insecure==true and
@@ -67,12 +74,10 @@ type Client struct {
var InsecureHTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
- InsecureSkipVerify: true}},
- Timeout: 5 * time.Minute}
+ InsecureSkipVerify: true}}}
// The default http.Client used by a Client otherwise.
-var DefaultSecureClient = &http.Client{
- Timeout: 5 * time.Minute}
+var DefaultSecureClient = &http.Client{}
// NewClientFromConfig creates a new Client that uses the endpoints in
// the given cluster.
@@ -87,6 +92,7 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
Scheme: ctrlURL.Scheme,
APIHost: ctrlURL.Host,
Insecure: cluster.TLS.Insecure,
+ Timeout: 5 * time.Minute,
}, nil
}
@@ -116,6 +122,7 @@ func NewClientFromEnv() *Client {
AuthToken: os.Getenv("ARVADOS_API_TOKEN"),
Insecure: insecure,
KeepServiceURIs: svcs,
+ Timeout: 5 * time.Minute,
}
}
@@ -131,11 +138,12 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
if req.Header.Get("X-Request-Id") == "" {
- reqid, _ := req.Context().Value(contextKeyRequestID{}).(string)
- if reqid == "" {
- reqid, _ = c.context().Value(contextKeyRequestID{}).(string)
- }
- if reqid == "" {
+ var reqid string
+ if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+ reqid = ctxreqid
+ } else if c.defaultRequestID != "" {
+ reqid = c.defaultRequestID
+ } else {
reqid = reqIDGen.Next()
}
if req.Header == nil {
@@ -144,7 +152,36 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
req.Header.Set("X-Request-Id", reqid)
}
}
- return c.httpClient().Do(req)
+ var cancel context.CancelFunc
+ if c.Timeout > 0 {
+ ctx := req.Context()
+ ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
+ req = req.WithContext(ctx)
+ }
+ resp, err := c.httpClient().Do(req)
+ if err == nil && cancel != nil {
+ // We need to call cancel() eventually, but we can't
+ // use "defer cancel()" because the context has to
+ // stay alive until the caller has finished reading
+ // the response body.
+ resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
+ } else if cancel != nil {
+ cancel()
+ }
+ return resp, err
+}
+
+// cancelOnClose calls a provided CancelFunc when its wrapped
+// ReadCloser's Close() method is called.
+type cancelOnClose struct {
+ io.ReadCloser
+ cancel context.CancelFunc
+}
+
+func (coc cancelOnClose) Close() error {
+ err := coc.ReadCloser.Close()
+ coc.cancel()
+ return err
}
func isRedirectStatus(code int) bool {
@@ -266,7 +303,7 @@ func anythingToValues(params interface{}) (url.Values, error) {
//
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
- return c.RequestAndDecodeContext(c.context(), dst, method, path, body, params)
+ return c.RequestAndDecodeContext(context.Background(), dst, method, path, body, params)
}
func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, method, path string, body io.Reader, params interface{}) error {
@@ -332,17 +369,10 @@ func (c *Client) UpdateBody(rsc resource) io.Reader {
// header.
func (c *Client) WithRequestID(reqid string) *Client {
cc := *c
- cc.ctx = ContextWithRequestID(cc.context(), reqid)
+ cc.defaultRequestID = reqid
return &cc
}
-func (c *Client) context() context.Context {
- if c.ctx == nil {
- return context.Background()
- }
- return c.ctx
-}
-
func (c *Client) httpClient() *http.Client {
switch {
case c.Client != nil:
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 3af747920..da1710374 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -141,20 +141,20 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
}
// Index returns an unsorted list of blocks at the given mount point.
-func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
}
// Index returns an unsorted list of blocks that can be retrieved from
// this server.
-func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(c, s.url("index/"+prefix))
+func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
+ return s.index(ctx, c, s.url("index/"+prefix))
}
-func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
- req, err := http.NewRequest("GET", url, nil)
+func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+ req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
- return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
+ return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
}
resp, err := c.Do(req)
if err != nil {
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 3c35d304c..261714605 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -6,6 +6,7 @@ package main
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io"
@@ -126,6 +127,13 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
// succeed in clearing existing trash lists.
nextRunOptions.SafeRendezvousState = rs
}
+
+ // Indexing and sending trash/pull lists can take a long time
+ // on a big site. Prefer a long timeout (causing slow recovery
+ // from undetected network problems) to a short timeout
+ // (causing starvation via perpetual timeout/restart cycle).
+ client.Timeout = 24 * time.Hour
+
if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
@@ -305,6 +313,9 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
//
// It encodes the resulting information in BlockStateMap.
func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
defer bal.time("get_state", "wall clock time to get current state")()
bal.BlockStateMap = NewBlockStateMap()
@@ -348,12 +359,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
go func(mounts []*KeepMount) {
defer wg.Done()
bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
- idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+ idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
if err != nil {
select {
case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
default:
}
+ cancel()
return
}
if len(errs) > 0 {
@@ -391,6 +403,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
}
for range collQ {
}
+ cancel()
return
}
bal.collScanned++
@@ -422,6 +435,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
case errs <- err:
default:
}
+ cancel()
}
}()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list