[ARVADOS] updated: 2.1.0-1102-g46a33e285
Git user
git at public.arvados.org
Wed Jul 28 04:20:08 UTC 2021
Summary of changes:
lib/config/config.default.yml | 7 ++++
lib/config/export.go | 1 +
lib/config/generated_config.go | 7 ++++
lib/service/cmd.go | 8 +++++
sdk/go/arvados/collection.go | 33 +++++++++--------
sdk/go/arvados/config.go | 1 +
services/keep-balance/balance.go | 60 ++++++++++++++++---------------
services/keep-balance/balance_run_test.go | 53 ++++++++++-----------------
services/keep-balance/collection.go | 35 +++++++++++-------
services/keep-balance/integration_test.go | 7 ++--
services/keep-balance/main.go | 21 ++++++++---
services/keep-balance/main_test.go | 21 +++++++++--
services/keep-balance/server.go | 11 +++---
13 files changed, 157 insertions(+), 108 deletions(-)
via 46a33e285d6179ebd5041733f98949a23147d55d (commit)
via 504b1b430076f15d27ff3e8da3e1d3623431aa84 (commit)
via d012d691bde541c8dae0dac4c00cb74ed30d9e47 (commit)
via 11339f23c194809c53cb4a888aea18eb51ff27f8 (commit)
via fdaca8b1c797b47ae7e08f0a9624dc84a3be3c44 (commit)
from 325ba452bdb2b8ebe4ef2a85d495291429df8082 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 46a33e285d6179ebd5041733f98949a23147d55d
Author: Tom Clegg <tom at curii.com>
Date: Wed Jul 28 00:19:03 2021 -0400
17574: Process collections in a worker pool.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 67021bef3..e69d941b1 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -18,6 +18,7 @@ import (
"sort"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"time"
@@ -52,7 +53,7 @@ type Balancer struct {
classes []string
mounts int
mountsByClass map[string]map[*KeepMount]bool
- collScanned int
+ collScanned int64
serviceRoots map[string]string
errors []error
stats balancerStats
@@ -399,35 +400,10 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
}(mounts)
}
- // collQ buffers incoming collections so we can start fetching
- // the next page without waiting for the current page to
- // finish processing.
collQ := make(chan arvados.Collection, bufs)
- // Start a goroutine to process collections. (We could use a
- // worker pool here, but even with a single worker we already
- // process collections much faster than we can retrieve them.)
- wg.Add(1)
- go func() {
- defer wg.Done()
- for coll := range collQ {
- err := bal.addCollection(coll)
- if err != nil || len(errs) > 0 {
- select {
- case errs <- err:
- default:
- }
- for range collQ {
- }
- cancel()
- return
- }
- bal.collScanned++
- }
- }()
-
- // Start a goroutine to retrieve all collections from the
- // Arvados database and send them to collQ for processing.
+ // Retrieve all collections from the database and send them to
+ // collQ.
wg.Add(1)
go func() {
defer wg.Done()
@@ -455,6 +431,27 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
}
}()
+ // Parse manifests from collQ and pass the block hashes to
+ // BlockStateMap to track desired replication.
+ for i := 0; i < runtime.NumCPU(); i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ err := bal.addCollection(coll)
+ if err != nil || len(errs) > 0 {
+ select {
+ case errs <- err:
+ default:
+ }
+ cancel()
+ continue
+ }
+ atomic.AddInt64(&bal.collScanned, 1)
+ }
+ }()
+ }
+
wg.Wait()
if len(errs) > 0 {
return <-errs
commit 504b1b430076f15d27ff3e8da3e1d3623431aa84
Author: Tom Clegg <tom at curii.com>
Date: Wed Jul 28 00:18:53 2021 -0400
17574: Add BalanceUpdateLimit config, fix tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index e28d5cbb7..c5bc1c8e8 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -456,6 +456,13 @@ Clusters:
# long-running balancing operation.
BalanceTimeout: 6h
+ # Maximum number of replication_confirmed /
+ # storage_classes_confirmed updates to write to the database
+ # after a rebalancing run. When many updates are needed, this
+ # spreads them over a few runs rather than applying them all at
+ # once.
+ BalanceUpdateLimit: 100000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
diff --git a/lib/config/export.go b/lib/config/export.go
index bb939321c..2faacc859 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -84,6 +84,7 @@ var whitelist = map[string]bool{
"Collections.BalanceCollectionBuffers": false,
"Collections.BalancePeriod": false,
"Collections.BalanceTimeout": false,
+ "Collections.BalanceUpdateLimit": false,
"Collections.BlobDeleteConcurrency": false,
"Collections.BlobMissingReport": false,
"Collections.BlobReplicateConcurrency": false,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index b15bf7eeb..fb9f888eb 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -462,6 +462,13 @@ Clusters:
# long-running balancing operation.
BalanceTimeout: 6h
+ # Maximum number of replication_confirmed /
+ # storage_classes_confirmed updates to write to the database
+ # after a rebalancing run. When many updates are needed, this
+ # spreads them over a few runs rather than applying them all at
+ # once.
+ BalanceUpdateLimit: 100000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 6e59828a3..dcffcd25e 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -138,6 +138,7 @@ type Cluster struct {
BalanceCollectionBatch int
BalanceCollectionBuffers int
BalanceTimeout Duration
+ BalanceUpdateLimit int
WebDAVCache WebDAVCacheConfig
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 5e1c0e45c..18a8bdcf4 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -352,6 +352,9 @@ func (s *runSuite) TearDownTest(c *check.C) {
}
func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+ defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+ _, err := s.db.Exec(`delete from collections`)
+ c.Assert(err, check.IsNil)
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
@@ -365,7 +368,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err = srv.runOnce()
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -390,26 +393,6 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
c.Check(pullReqs.Count(), check.Equals, 0)
}
-func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
- opts := RunOptions{
- CommitPulls: true,
- CommitTrash: true,
- Logger: ctxlog.TestLogger(c),
- }
- s.stub.serveCurrentUserAdmin()
- s.stub.serveCollectionsButSkipOne()
- s.stub.serveKeepServices(stubServices)
- s.stub.serveKeepstoreMounts()
- s.stub.serveKeepstoreIndexFoo4Bar1()
- trashReqs := s.stub.serveKeepstoreTrash()
- pullReqs := s.stub.serveKeepstorePull()
- srv := s.newServer(&opts)
- _, err := srv.runOnce()
- c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
- c.Check(trashReqs.Count(), check.Equals, 4)
- c.Check(pullReqs.Count(), check.Equals, 0)
-}
-
func (s *runSuite) TestWriteLostBlocks(c *check.C) {
lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
c.Assert(err, check.IsNil)
@@ -433,7 +416,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
- c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n")
+ c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
}
func (s *runSuite) TestDryRun(c *check.C) {
@@ -464,11 +447,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
}
func (s *runSuite) TestCommit(c *check.C) {
- lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
- c.Assert(err, check.IsNil)
- s.config.Collections.BlobMissingReport = lostf.Name()
- defer os.Remove(lostf.Name())
-
+ s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
@@ -494,17 +473,18 @@ func (s *runSuite) TestCommit(c *check.C) {
// in a poor rendezvous position
c.Check(bal.stats.pulls, check.Equals, 2)
- lost, err := ioutil.ReadFile(lostf.Name())
+ lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
c.Assert(err, check.IsNil)
- c.Check(string(lost), check.Equals, "")
+ c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
buf, err := s.getMetrics(c, srv)
c.Check(err, check.IsNil)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
+ bufstr := buf.String()
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
}
func (s *runSuite) TestRunForever(c *check.C) {
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index ba9edf939..6e0a066e8 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -8,7 +8,6 @@ import (
"context"
"encoding/json"
"fmt"
- "io"
"runtime"
"sync"
"sync/atomic"
@@ -52,6 +51,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
if err != nil {
return err
}
+ progressTicker := time.NewTicker(10 * time.Second)
+ defer progressTicker.Stop()
callCount := 0
for rows.Next() {
var coll arvados.Collection
@@ -74,8 +75,13 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
if err != nil {
return err
}
- progress(callCount, expectCount)
+ select {
+ case <-progressTicker.C:
+ progress(callCount, expectCount)
+ default:
+ }
}
+ progress(callCount, expectCount)
rows.Close()
if err := rows.Err(); err != nil {
return err
@@ -104,36 +110,37 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
threshold := time.Now()
thresholdStr := threshold.Format(time.RFC3339Nano)
+ updated := int64(0)
+
var err error
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
- err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
- if coll.ModifiedAt.After(threshold) {
- return io.EOF
- }
- if coll.IsTrashed {
- return nil
+ err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+ if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+ bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+ cancel()
+ return context.Canceled
}
collQ <- coll
return nil
}, func(done, total int) {
bal.logf("update collections: %d/%d", done, total)
})
- if err == io.EOF {
- err = nil
- } else if err != nil {
+ if err != nil && err != context.Canceled {
bal.logf("error updating collections: %s", err)
}
}()
- var updated int64
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for coll := range collQ {
+ if ctx.Err() != nil {
+ continue
+ }
blkids, err := coll.SizedDigests()
if err != nil {
bal.logf("%s: %s", coll.UUID, err)
@@ -153,7 +160,9 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
where uuid=$4`,
repl, thresholdStr, classes, coll.UUID)
if err != nil {
- bal.logf("%s: update failed: %s", coll.UUID, err)
+ if err != context.Canceled {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ }
continue
}
atomic.AddInt64(&updated, 1)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 90ae0695d..e1573e7f7 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -19,6 +19,7 @@ import (
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
"github.com/jmoiron/sqlx"
+ _ "github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index b154f6e99..65a2d5567 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -11,6 +11,11 @@ import (
"net/http"
"time"
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
@@ -26,6 +31,8 @@ func (s *mainSuite) TestVersionFlag(c *check.C) {
}
func (s *mainSuite) TestHTTPServer(c *check.C) {
+ arvadostest.StartKeep(2, true)
+
ln, err := net.Listen("tcp", ":0")
if err != nil {
c.Fatal(err)
@@ -33,10 +40,17 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
_, p, err := net.SplitHostPort(ln.Addr().String())
c.Check(err, check.IsNil)
ln.Close()
- config := "Clusters:\n zzzzz:\n ManagementToken: abcdefg\n Services: {Keepbalance: {InternalURLs: {'http://localhost:" + p + "/': {}}}}\n"
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: "localhost:" + p, Path: "/"}] = arvados.ServiceInstance{}
+ cfg.Clusters[cluster.ClusterID] = *cluster
+ config, err := yaml.Marshal(cfg)
+ c.Assert(err, check.IsNil)
var stdout bytes.Buffer
- go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBufferString(config), &stdout, &stdout)
+ go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBuffer(config), &stdout, &stdout)
done := make(chan struct{})
go func() {
defer close(done)
@@ -47,7 +61,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
c.Fatal(err)
return
}
- req.Header.Set("Authorization", "Bearer abcdefg")
+ req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
c.Logf("error %s", err)
@@ -73,6 +87,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
c.Log(stdout.String())
c.Fatal("timeout")
}
+ c.Log(stdout.String())
// Check non-metrics URL that gets passed through to us from
// service.Command
commit d012d691bde541c8dae0dac4c00cb74ed30d9e47
Author: Tom Clegg <tom at curii.com>
Date: Wed Jul 28 00:13:50 2021 -0400
17574: add -pprof flag to service commands.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 9ca243125..40db4f9c7 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -12,6 +12,7 @@ import (
"io"
"net"
"net/http"
+ _ "net/http/pprof"
"net/url"
"os"
"strings"
@@ -70,6 +71,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
loader := config.NewLoader(stdin, log)
loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
+ pprofAddr := flags.String("pprof", "", "Serve Go profile data at `[addr]:port`")
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
@@ -80,6 +82,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
}
+ if *pprofAddr != "" {
+ go func() {
+ log.Println(http.ListenAndServe(*pprofAddr, nil))
+ }()
+ }
+
if strings.HasSuffix(prog, "controller") {
// Some config-loader checks try to make API calls via
// controller. Those can't be expected to work if this
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 93457fb82..90ae0695d 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -9,6 +9,8 @@ import (
"flag"
"fmt"
"io"
+ "net/http"
+ _ "net/http/pprof"
"os"
"git.arvados.org/arvados.git/lib/config"
@@ -40,6 +42,13 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
"update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
flags.Bool("version", false, "Write version information to stdout and exit 0")
dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+ pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+
+ if *pprofAddr != "" {
+ go func() {
+ logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+ }()
+ }
loader := config.NewLoader(os.Stdin, logger)
loader.SetupFlags(flags)
commit 11339f23c194809c53cb4a888aea18eb51ff27f8
Author: Tom Clegg <tom at curii.com>
Date: Wed Jul 28 00:04:56 2021 -0400
17574: Speed up manifest parsing.
Previous code spent a lot of time allocating, copying, and GCing
strings. Working with byte slices is much faser.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
index cec20279d..785c18d4a 100644
--- a/sdk/go/arvados/collection.go
+++ b/sdk/go/arvados/collection.go
@@ -5,11 +5,10 @@
package arvados
import (
- "bufio"
+ "bytes"
"crypto/md5"
"fmt"
"regexp"
- "strings"
"time"
"git.arvados.org/arvados.git/sdk/go/blockdigest"
@@ -56,40 +55,40 @@ func (c Collection) resourceName() string {
//
// Zero-length blocks are not included.
func (c *Collection) SizedDigests() ([]SizedDigest, error) {
- manifestText := c.ManifestText
- if manifestText == "" {
- manifestText = c.UnsignedManifestText
+ manifestText := []byte(c.ManifestText)
+ if len(manifestText) == 0 {
+ manifestText = []byte(c.UnsignedManifestText)
}
- if manifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ if len(manifestText) == 0 && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
// TODO: Check more subtle forms of corruption, too
return nil, fmt.Errorf("manifest is missing")
}
- var sds []SizedDigest
- scanner := bufio.NewScanner(strings.NewReader(manifestText))
- scanner.Buffer(make([]byte, 1048576), len(manifestText))
- for scanner.Scan() {
- line := scanner.Text()
- tokens := strings.Split(line, " ")
+ sds := make([]SizedDigest, 0, len(manifestText)/40)
+ for _, line := range bytes.Split(manifestText, []byte{'\n'}) {
+ if len(line) == 0 {
+ continue
+ }
+ tokens := bytes.Split(line, []byte{' '})
if len(tokens) < 3 {
return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
}
for _, token := range tokens[1:] {
- if !blockdigest.LocatorPattern.MatchString(token) {
+ if !blockdigest.LocatorPattern.Match(token) {
// FIXME: ensure it's a file token
break
}
- if strings.HasPrefix(token, "d41d8cd98f00b204e9800998ecf8427e+0") {
+ if bytes.HasPrefix(token, []byte("d41d8cd98f00b204e9800998ecf8427e+0")) {
// Exclude "empty block" placeholder
continue
}
// FIXME: shouldn't assume 32 char hash
- if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+ if i := bytes.IndexRune(token[33:], '+'); i >= 0 {
token = token[:33+i]
}
- sds = append(sds, SizedDigest(token))
+ sds = append(sds, SizedDigest(string(token)))
}
}
- return sds, scanner.Err()
+ return sds, nil
}
type CollectionList struct {
commit fdaca8b1c797b47ae7e08f0a9624dc84a3be3c44
Author: Tom Clegg <tom at curii.com>
Date: Tue Jul 27 11:23:48 2021 -0400
17574: Add -update-confirmed-fields=false option.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 6a71cf99f..67021bef3 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -173,7 +173,12 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
return
}
}
- err = bal.updateCollections(ctx, client, cluster)
+ if runOptions.CommitConfirmedFields {
+ err = bal.updateCollections(ctx, client, cluster)
+ if err != nil {
+ return
+ }
+ }
return
}
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index cbdde595e..5e1c0e45c 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -21,6 +21,7 @@ import (
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
check "gopkg.in/check.v1"
@@ -309,6 +310,7 @@ func (s *stubServer) serveKeepstorePull() *reqTracker {
type runSuite struct {
stub stubServer
config *arvados.Cluster
+ db *sqlx.DB
client *arvados.Client
}
@@ -320,6 +322,7 @@ func (s *runSuite) newServer(options *RunOptions) *Server {
Metrics: newMetrics(prometheus.NewRegistry()),
Logger: options.Logger,
Dumper: options.Dumper,
+ DB: s.db,
}
return srv
}
@@ -329,6 +332,8 @@ func (s *runSuite) SetUpTest(c *check.C) {
c.Assert(err, check.Equals, nil)
s.config, err = cfg.GetCluster("")
c.Assert(err, check.Equals, nil)
+ s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+ c.Assert(err, check.IsNil)
s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index 564e36a43..1458fe452 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -88,9 +88,10 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
logger := logrus.New()
logger.Out = io.MultiWriter(&logBuf, os.Stderr)
opts := RunOptions{
- CommitPulls: true,
- CommitTrash: true,
- Logger: logger,
+ CommitPulls: true,
+ CommitTrash: true,
+ CommitConfirmedFields: true,
+ Logger: logger,
}
bal := &Balancer{
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 80b1ed301..93457fb82 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -36,6 +36,8 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
flags.BoolVar(&options.CommitTrash, "commit-trash", false,
"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+ flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+ "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
flags.Bool("version", false, "Write version information to stdout and exit 0")
dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
@@ -56,10 +58,11 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
// service.Command
args = nil
dropFlag := map[string]bool{
- "once": true,
- "commit-pulls": true,
- "commit-trash": true,
- "dump": true,
+ "once": true,
+ "commit-pulls": true,
+ "commit-trash": true,
+ "commit-confirmed-fields": true,
+ "dump": true,
}
flags.Visit(func(f *flag.Flag) {
if !dropFlag[f.Name] {
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index b42fa23a3..5299b96c1 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -24,11 +24,12 @@ import (
//
// RunOptions fields are controlled by command line flags.
type RunOptions struct {
- Once bool
- CommitPulls bool
- CommitTrash bool
- Logger logrus.FieldLogger
- Dumper logrus.FieldLogger
+ Once bool
+ CommitPulls bool
+ CommitTrash bool
+ CommitConfirmedFields bool
+ Logger logrus.FieldLogger
+ Dumper logrus.FieldLogger
// SafeRendezvousState from the most recent balance operation,
// or "" if unknown. If this changes from one run to the next,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list