[ARVADOS] updated: 1.3.0-2663-gfd080b34a

Git user git at public.arvados.org
Fri Jun 19 14:44:06 UTC 2020


Summary of changes:
 lib/config/config.default.yml            |  7 ++++++
 lib/config/export.go                     |  1 +
 lib/config/generated_config.go           |  7 ++++++
 lib/controller/federation_test.go        |  1 +
 lib/controller/handler_test.go           |  1 +
 lib/controller/proxy.go                  |  4 +--
 sdk/go/arvados/config.go                 |  1 +
 services/keep-balance/balance.go         | 43 ++++++++++++++++++--------------
 services/keep-balance/collection.go      |  5 ++--
 services/keep-balance/collection_test.go |  3 ++-
 services/keep-balance/keep_service.go    | 13 +++++-----
 11 files changed, 55 insertions(+), 31 deletions(-)

       via  fd080b34a321cbd6593d69f427b9eaeab890712f (commit)
      from  2bc1a7a89597ab02aaeef84b82fdc51f8e375b79 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit fd080b34a321cbd6593d69f427b9eaeab890712f
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Fri Jun 19 10:34:01 2020 -0400

    16480: Configurable timeout for entire keep-balance iteration.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 219f6ef0b..409d2ccfa 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -440,6 +440,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
diff --git a/lib/config/export.go b/lib/config/export.go
index fc4908c15..1c0662470 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -102,6 +102,7 @@ var whitelist = map[string]bool{
 	"Collections.WebDAVCache":                      false,
 	"Collections.BalanceCollectionBatch":           false,
 	"Collections.BalancePeriod":                    false,
+	"Collections.BalanceTimeout":                   false,
 	"Collections.BlobMissingReport":                false,
 	"Collections.BalanceCollectionBuffers":         false,
 	"Containers":                                   true,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 6f8cab462..30bc66fc1 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -446,6 +446,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go
index 2b0cb22b0..6a9ad8c15 100644
--- a/lib/controller/federation_test.go
+++ b/lib/controller/federation_test.go
@@ -64,6 +64,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
 	cluster.TLS.Insecure = true
 	cluster.API.MaxItemsPerResponse = 1000
 	cluster.API.MaxRequestAmplification = 4
+	cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
 	arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
 	arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
 	s.testHandler = &Handler{Cluster: cluster}
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index c7bce9713..ef6b9195f 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -52,6 +52,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
 		PostgreSQL:       integrationTestCluster().PostgreSQL,
 		ForceLegacyAPI14: forceLegacyAPI14,
 	}
+	s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
 	s.cluster.TLS.Insecure = true
 	arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
 	arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 939868a17..d7381860e 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -77,9 +77,7 @@ func (p *proxy) Do(
 		Header: hdrOut,
 		Body:   reqIn.Body,
 	}).WithContext(reqIn.Context())
-
-	resp, err := client.Do(reqOut)
-	return resp, err
+	return client.Do(reqOut)
 }
 
 // Copy a response (or error) to the downstream client
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index dbd9f7109..636728f1f 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -126,6 +126,7 @@ type Cluster struct {
 		BalancePeriod            Duration
 		BalanceCollectionBatch   int
 		BalanceCollectionBuffers int
+		BalanceTimeout           Duration
 
 		WebDAVCache WebDAVCacheConfig
 	}
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 261714605..86423a297 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -72,6 +72,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 
 	defer bal.time("sweep", "wall clock time to run one full sweep")()
 
+	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+	defer cancel()
+
 	var lbFile *os.File
 	if bal.LostBlocksFile != "" {
 		tmpfn := bal.LostBlocksFile + ".tmp"
@@ -112,13 +115,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 	if err = bal.CheckSanityEarly(client); err != nil {
 		return
 	}
+
+	// On a big site, indexing and sending trash/pull lists can
+	// take much longer than the usual 5 minute client
+	// timeout. From here on, we rely on the context deadline
+	// instead, aborting the entire operation if any part takes
+	// too long.
+	client.Timeout = 0
+
 	rs := bal.rendezvousState()
 	if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
 		if runOptions.SafeRendezvousState != "" {
 			bal.logf("notice: KeepServices list has changed since last run")
 		}
 		bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-		if err = bal.ClearTrashLists(client); err != nil {
+		if err = bal.ClearTrashLists(ctx, client); err != nil {
 			return
 		}
 		// The current rendezvous state becomes "safe" (i.e.,
@@ -128,13 +139,7 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 		nextRunOptions.SafeRendezvousState = rs
 	}
 
-	// Indexing and sending trash/pull lists can take a long time
-	// on a big site. Prefer a long timeout (causing slow recovery
-	// from undetected network problems) to a short timeout
-	// (causing starvation via perpetual timeout/restart cycle).
-	client.Timeout = 24 * time.Hour
-
-	if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
+	if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
 		return
 	}
 	bal.ComputeChangeSets()
@@ -154,14 +159,14 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 		lbFile = nil
 	}
 	if runOptions.CommitPulls {
-		err = bal.CommitPulls(client)
+		err = bal.CommitPulls(ctx, client)
 		if err != nil {
 			// Skip trash if we can't pull. (Too cautious?)
 			return
 		}
 	}
 	if runOptions.CommitTrash {
-		err = bal.CommitTrash(client)
+		err = bal.CommitTrash(ctx, client)
 	}
 	return
 }
@@ -294,11 +299,11 @@ func (bal *Balancer) rendezvousState() string {
 // We avoid this problem if we clear all trash lists before getting
 // indexes. (We also assume there is only one rebalancing process
 // running at a time.)
-func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
 	for _, srv := range bal.KeepServices {
 		srv.ChangeSet = &ChangeSet{}
 	}
-	return bal.CommitTrash(c)
+	return bal.CommitTrash(ctx, c)
 }
 
 // GetCurrentState determines the current replication state, and the
@@ -312,8 +317,8 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 // collection manifests in the database (API server).
 //
 // It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
-	ctx, cancel := context.WithCancel(context.Background())
+func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
+	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
 	defer bal.time("get_state", "wall clock time to get current state")()
@@ -415,7 +420,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		err = EachCollection(c, pageSize,
+		err = EachCollection(ctx, c, pageSize,
 			func(coll arvados.Collection) error {
 				collQ <- coll
 				if len(errs) > 0 {
@@ -1098,22 +1103,22 @@ func (bal *Balancer) CheckSanityLate() error {
 // keepstore servers. This has the effect of increasing replication of
 // existing blocks that are either underreplicated or poorly
 // distributed according to rendezvous hashing.
-func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
 	defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
 	return bal.commitAsync(c, "send pull list",
 		func(srv *KeepService) error {
-			return srv.CommitPulls(c)
+			return srv.CommitPulls(ctx, c)
 		})
 }
 
 // CommitTrash sends the computed lists of trash requests to the
 // keepstore servers. This has the effect of deleting blocks that are
 // overreplicated or unreferenced.
-func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
 	defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
 	return bal.commitAsync(c, "send trash list",
 		func(srv *KeepService) error {
-			return srv.CommitTrash(c)
+			return srv.CommitTrash(ctx, c)
 		})
 }
 
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index c4ddc90c4..1659918ca 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -5,6 +5,7 @@
 package main
 
 import (
+	"context"
 	"fmt"
 	"time"
 
@@ -30,7 +31,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 //
 // If pageSize > 0 it is used as the maximum page size in each API
 // call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
 	if progress == nil {
 		progress = func(_, _ int) {}
 	}
@@ -75,7 +76,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
 	for {
 		progress(callCount, expectCount)
 		var page arvados.CollectionList
-		err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+		err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
 		if err != nil {
 			return err
 		}
diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go
index f8921c294..3ab9d07b2 100644
--- a/services/keep-balance/collection_test.go
+++ b/services/keep-balance/collection_test.go
@@ -5,6 +5,7 @@
 package main
 
 import (
+	"context"
 	"sync"
 	"time"
 
@@ -29,7 +30,7 @@ func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
 			longestStreak := 0
 			var lastMod time.Time
 			sawUUID := make(map[string]bool)
-			err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
+			err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
 				if c.ModifiedAt.IsZero() {
 					return nil
 				}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index e2adf1a4b..17f8418f6 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -5,6 +5,7 @@
 package main
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"io"
@@ -35,19 +36,19 @@ func (srv *KeepService) URLBase() string {
 
 // CommitPulls sends the current list of pull requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitPulls(c *arvados.Client) error {
-	return srv.put(c, "pull", srv.ChangeSet.Pulls)
+func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error {
+	return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls)
 }
 
 // CommitTrash sends the current list of trash requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitTrash(c *arvados.Client) error {
-	return srv.put(c, "trash", srv.ChangeSet.Trashes)
+func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error {
+	return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes)
 }
 
 // Perform a PUT request at path, with data (as JSON) in the request
 // body.
-func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error {
 	// We'll start a goroutine to do the JSON encoding, so we can
 	// stream it to the http client through a Pipe, rather than
 	// keeping the entire encoded version in memory.
@@ -64,7 +65,7 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
 	}()
 
 	url := srv.URLBase() + "/" + path
-	req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+	req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR))
 	if err != nil {
 		return fmt.Errorf("building request for %s: %v", url, err)
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list