[ARVADOS] updated: 1.3.0-2663-gfd080b34a
Git user
git at public.arvados.org
Fri Jun 19 14:44:06 UTC 2020
Summary of changes:
lib/config/config.default.yml | 7 ++++++
lib/config/export.go | 1 +
lib/config/generated_config.go | 7 ++++++
lib/controller/federation_test.go | 1 +
lib/controller/handler_test.go | 1 +
lib/controller/proxy.go | 4 +--
sdk/go/arvados/config.go | 1 +
services/keep-balance/balance.go | 43 ++++++++++++++++++--------------
services/keep-balance/collection.go | 5 ++--
services/keep-balance/collection_test.go | 3 ++-
services/keep-balance/keep_service.go | 13 +++++-----
11 files changed, 55 insertions(+), 31 deletions(-)
via fd080b34a321cbd6593d69f427b9eaeab890712f (commit)
from 2bc1a7a89597ab02aaeef84b82fdc51f8e375b79 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit fd080b34a321cbd6593d69f427b9eaeab890712f
Author: Tom Clegg <tom at tomclegg.ca>
Date: Fri Jun 19 10:34:01 2020 -0400
16480: Configurable timeout for entire keep-balance iteration.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 219f6ef0b..409d2ccfa 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -440,6 +440,13 @@ Clusters:
# or omitted, pages are processed serially.
BalanceCollectionBuffers: 1000
+ # Maximum time for a rebalancing run. This ensures keep-balance
+ # eventually gives up and retries if, for example, a network
+ # error causes a hung connection that is never closed by the
+ # OS. It should be long enough that it doesn't interrupt a
+ # long-running balancing operation.
+ BalanceTimeout: 6h
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
diff --git a/lib/config/export.go b/lib/config/export.go
index fc4908c15..1c0662470 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -102,6 +102,7 @@ var whitelist = map[string]bool{
"Collections.WebDAVCache": false,
"Collections.BalanceCollectionBatch": false,
"Collections.BalancePeriod": false,
+ "Collections.BalanceTimeout": false,
"Collections.BlobMissingReport": false,
"Collections.BalanceCollectionBuffers": false,
"Containers": true,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 6f8cab462..30bc66fc1 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -446,6 +446,13 @@ Clusters:
# or omitted, pages are processed serially.
BalanceCollectionBuffers: 1000
+ # Maximum time for a rebalancing run. This ensures keep-balance
+ # eventually gives up and retries if, for example, a network
+ # error causes a hung connection that is never closed by the
+ # OS. It should be long enough that it doesn't interrupt a
+ # long-running balancing operation.
+ BalanceTimeout: 6h
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 2b0cb22b0..6a9ad8c15 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -64,6 +64,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
cluster.TLS.Insecure = true
cluster.API.MaxItemsPerResponse = 1000
cluster.API.MaxRequestAmplification = 4
+ cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
s.testHandler = &Handler{Cluster: cluster}
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index c7bce9713..ef6b9195f 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -52,6 +52,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
PostgreSQL: integrationTestCluster().PostgreSQL,
ForceLegacyAPI14: forceLegacyAPI14,
}
+ s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
s.cluster.TLS.Insecure = true
arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 939868a17..d7381860e 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -77,9 +77,7 @@ func (p *proxy) Do(
Header: hdrOut,
Body: reqIn.Body,
}).WithContext(reqIn.Context())
-
- resp, err := client.Do(reqOut)
- return resp, err
+ return client.Do(reqOut)
}
// Copy a response (or error) to the downstream client
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index dbd9f7109..636728f1f 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -126,6 +126,7 @@ type Cluster struct {
BalancePeriod Duration
BalanceCollectionBatch int
BalanceCollectionBuffers int
+ BalanceTimeout Duration
WebDAVCache WebDAVCacheConfig
}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 261714605..86423a297 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -72,6 +72,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
defer bal.time("sweep", "wall clock time to run one full sweep")()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+ defer cancel()
+
var lbFile *os.File
if bal.LostBlocksFile != "" {
tmpfn := bal.LostBlocksFile + ".tmp"
@@ -112,13 +115,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
if err = bal.CheckSanityEarly(client); err != nil {
return
}
+
+ // On a big site, indexing and sending trash/pull lists can
+ // take much longer than the usual 5 minute client
+ // timeout. From here on, we rely on the context deadline
+ // instead, aborting the entire operation if any part takes
+ // too long.
+ client.Timeout = 0
+
rs := bal.rendezvousState()
if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
if runOptions.SafeRendezvousState != "" {
bal.logf("notice: KeepServices list has changed since last run")
}
bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
- if err = bal.ClearTrashLists(client); err != nil {
+ if err = bal.ClearTrashLists(ctx, client); err != nil {
return
}
// The current rendezvous state becomes "safe" (i.e.,
@@ -128,13 +139,7 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
nextRunOptions.SafeRendezvousState = rs
}
- // Indexing and sending trash/pull lists can take a long time
- // on a big site. Prefer a long timeout (causing slow recovery
- // from undetected network problems) to a short timeout
- // (causing starvation via perpetual timeout/restart cycle).
- client.Timeout = 24 * time.Hour
-
- if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
+ if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
@@ -154,14 +159,14 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
lbFile = nil
}
if runOptions.CommitPulls {
- err = bal.CommitPulls(client)
+ err = bal.CommitPulls(ctx, client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
if runOptions.CommitTrash {
- err = bal.CommitTrash(client)
+ err = bal.CommitTrash(ctx, client)
}
return
}
@@ -294,11 +299,11 @@ func (bal *Balancer) rendezvousState() string {
// We avoid this problem if we clear all trash lists before getting
// indexes. (We also assume there is only one rebalancing process
// running at a time.)
-func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
for _, srv := range bal.KeepServices {
srv.ChangeSet = &ChangeSet{}
}
- return bal.CommitTrash(c)
+ return bal.CommitTrash(ctx, c)
}
// GetCurrentState determines the current replication state, and the
@@ -312,8 +317,8 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
- ctx, cancel := context.WithCancel(context.Background())
+func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
+ ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer bal.time("get_state", "wall clock time to get current state")()
@@ -415,7 +420,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(c, pageSize,
+ err = EachCollection(ctx, c, pageSize,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
@@ -1098,22 +1103,22 @@ func (bal *Balancer) CheckSanityLate() error {
// keepstore servers. This has the effect of increasing replication of
// existing blocks that are either underreplicated or poorly
// distributed according to rendezvous hashing.
-func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
return bal.commitAsync(c, "send pull list",
func(srv *KeepService) error {
- return srv.CommitPulls(c)
+ return srv.CommitPulls(ctx, c)
})
}
// CommitTrash sends the computed lists of trash requests to the
// keepstore servers. This has the effect of deleting blocks that are
// overreplicated or unreferenced.
-func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
return bal.commitAsync(c, "send trash list",
func(srv *KeepService) error {
- return srv.CommitTrash(c)
+ return srv.CommitTrash(ctx, c)
})
}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index c4ddc90c4..1659918ca 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -5,6 +5,7 @@
package main
import (
+ "context"
"fmt"
"time"
@@ -30,7 +31,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
//
// If pageSize > 0 it is used as the maximum page size in each API
// call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
@@ -75,7 +76,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
for {
progress(callCount, expectCount)
var page arvados.CollectionList
- err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+ err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
if err != nil {
return err
}
diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go
index f8921c294..3ab9d07b2 100644
--- a/services/keep-balance/collection_test.go
+++ b/services/keep-balance/collection_test.go
@@ -5,6 +5,7 @@
package main
import (
+ "context"
"sync"
"time"
@@ -29,7 +30,7 @@ func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
longestStreak := 0
var lastMod time.Time
sawUUID := make(map[string]bool)
- err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
+ err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
if c.ModifiedAt.IsZero() {
return nil
}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index e2adf1a4b..17f8418f6 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -5,6 +5,7 @@
package main
import (
+ "context"
"encoding/json"
"fmt"
"io"
@@ -35,19 +36,19 @@ func (srv *KeepService) URLBase() string {
// CommitPulls sends the current list of pull requests to the storage
// server (even if the list is empty).
-func (srv *KeepService) CommitPulls(c *arvados.Client) error {
- return srv.put(c, "pull", srv.ChangeSet.Pulls)
+func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error {
+ return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls)
}
// CommitTrash sends the current list of trash requests to the storage
// server (even if the list is empty).
-func (srv *KeepService) CommitTrash(c *arvados.Client) error {
- return srv.put(c, "trash", srv.ChangeSet.Trashes)
+func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error {
+ return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes)
}
// Perform a PUT request at path, with data (as JSON) in the request
// body.
-func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error {
// We'll start a goroutine to do the JSON encoding, so we can
// stream it to the http client through a Pipe, rather than
// keeping the entire encoded version in memory.
@@ -64,7 +65,7 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
}()
url := srv.URLBase() + "/" + path
- req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+ req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR))
if err != nil {
return fmt.Errorf("building request for %s: %v", url, err)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list