[ARVADOS] updated: 2.1.0-1102-g46a33e285

Git user git at public.arvados.org
Wed Jul 28 04:20:08 UTC 2021


Summary of changes:
 lib/config/config.default.yml             |  7 ++++
 lib/config/export.go                      |  1 +
 lib/config/generated_config.go            |  7 ++++
 lib/service/cmd.go                        |  8 +++++
 sdk/go/arvados/collection.go              | 33 +++++++++--------
 sdk/go/arvados/config.go                  |  1 +
 services/keep-balance/balance.go          | 60 ++++++++++++++++---------------
 services/keep-balance/balance_run_test.go | 53 ++++++++++-----------------
 services/keep-balance/collection.go       | 35 +++++++++++-------
 services/keep-balance/integration_test.go |  7 ++--
 services/keep-balance/main.go             | 21 ++++++++---
 services/keep-balance/main_test.go        | 21 +++++++++--
 services/keep-balance/server.go           | 11 +++---
 13 files changed, 157 insertions(+), 108 deletions(-)

       via  46a33e285d6179ebd5041733f98949a23147d55d (commit)
       via  504b1b430076f15d27ff3e8da3e1d3623431aa84 (commit)
       via  d012d691bde541c8dae0dac4c00cb74ed30d9e47 (commit)
       via  11339f23c194809c53cb4a888aea18eb51ff27f8 (commit)
       via  fdaca8b1c797b47ae7e08f0a9624dc84a3be3c44 (commit)
      from  325ba452bdb2b8ebe4ef2a85d495291429df8082 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 46a33e285d6179ebd5041733f98949a23147d55d
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 28 00:19:03 2021 -0400

    17574: Process collections in a worker pool.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 67021bef3..e69d941b1 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -18,6 +18,7 @@ import (
 	"sort"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"syscall"
 	"time"
 
@@ -52,7 +53,7 @@ type Balancer struct {
 	classes       []string
 	mounts        int
 	mountsByClass map[string]map[*KeepMount]bool
-	collScanned   int
+	collScanned   int64
 	serviceRoots  map[string]string
 	errors        []error
 	stats         balancerStats
@@ -399,35 +400,10 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
 		}(mounts)
 	}
 
-	// collQ buffers incoming collections so we can start fetching
-	// the next page without waiting for the current page to
-	// finish processing.
 	collQ := make(chan arvados.Collection, bufs)
 
-	// Start a goroutine to process collections. (We could use a
-	// worker pool here, but even with a single worker we already
-	// process collections much faster than we can retrieve them.)
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-		for coll := range collQ {
-			err := bal.addCollection(coll)
-			if err != nil || len(errs) > 0 {
-				select {
-				case errs <- err:
-				default:
-				}
-				for range collQ {
-				}
-				cancel()
-				return
-			}
-			bal.collScanned++
-		}
-	}()
-
-	// Start a goroutine to retrieve all collections from the
-	// Arvados database and send them to collQ for processing.
+	// Retrieve all collections from the database and send them to
+	// collQ.
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
@@ -455,6 +431,27 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
 		}
 	}()
 
+	// Parse manifests from collQ and pass the block hashes to
+	// BlockStateMap to track desired replication.
+	for i := 0; i < runtime.NumCPU(); i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for coll := range collQ {
+				err := bal.addCollection(coll)
+				if err != nil || len(errs) > 0 {
+					select {
+					case errs <- err:
+					default:
+					}
+					cancel()
+					continue
+				}
+				atomic.AddInt64(&bal.collScanned, 1)
+			}
+		}()
+	}
+
 	wg.Wait()
 	if len(errs) > 0 {
 		return <-errs

commit 504b1b430076f15d27ff3e8da3e1d3623431aa84
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 28 00:18:53 2021 -0400

    17574: Add BalanceUpdateLimit config, fix tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index e28d5cbb7..c5bc1c8e8 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -456,6 +456,13 @@ Clusters:
       # long-running balancing operation.
       BalanceTimeout: 6h
 
+      # Maximum number of replication_confirmed /
+      # storage_classes_confirmed updates to write to the database
+      # after a rebalancing run. When many updates are needed, this
+      # spreads them over a few runs rather than applying them all at
+      # once.
+      BalanceUpdateLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
diff --git a/lib/config/export.go b/lib/config/export.go
index bb939321c..2faacc859 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -84,6 +84,7 @@ var whitelist = map[string]bool{
 	"Collections.BalanceCollectionBuffers":                false,
 	"Collections.BalancePeriod":                           false,
 	"Collections.BalanceTimeout":                          false,
+	"Collections.BalanceUpdateLimit":                      false,
 	"Collections.BlobDeleteConcurrency":                   false,
 	"Collections.BlobMissingReport":                       false,
 	"Collections.BlobReplicateConcurrency":                false,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index b15bf7eeb..fb9f888eb 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -462,6 +462,13 @@ Clusters:
       # long-running balancing operation.
       BalanceTimeout: 6h
 
+      # Maximum number of replication_confirmed /
+      # storage_classes_confirmed updates to write to the database
+      # after a rebalancing run. When many updates are needed, this
+      # spreads them over a few runs rather than applying them all at
+      # once.
+      BalanceUpdateLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 6e59828a3..dcffcd25e 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -138,6 +138,7 @@ type Cluster struct {
 		BalanceCollectionBatch   int
 		BalanceCollectionBuffers int
 		BalanceTimeout           Duration
+		BalanceUpdateLimit       int
 
 		WebDAVCache WebDAVCacheConfig
 
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 5e1c0e45c..18a8bdcf4 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -352,6 +352,9 @@ func (s *runSuite) TearDownTest(c *check.C) {
 }
 
 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+	defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+	_, err := s.db.Exec(`delete from collections`)
+	c.Assert(err, check.IsNil)
 	opts := RunOptions{
 		CommitPulls: true,
 		CommitTrash: true,
@@ -365,7 +368,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	srv := s.newServer(&opts)
-	_, err := srv.runOnce()
+	_, err = srv.runOnce()
 	c.Check(err, check.ErrorMatches, "received zero collections")
 	c.Check(trashReqs.Count(), check.Equals, 4)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -390,26 +393,6 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
 	c.Check(pullReqs.Count(), check.Equals, 0)
 }
 
-func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
-	opts := RunOptions{
-		CommitPulls: true,
-		CommitTrash: true,
-		Logger:      ctxlog.TestLogger(c),
-	}
-	s.stub.serveCurrentUserAdmin()
-	s.stub.serveCollectionsButSkipOne()
-	s.stub.serveKeepServices(stubServices)
-	s.stub.serveKeepstoreMounts()
-	s.stub.serveKeepstoreIndexFoo4Bar1()
-	trashReqs := s.stub.serveKeepstoreTrash()
-	pullReqs := s.stub.serveKeepstorePull()
-	srv := s.newServer(&opts)
-	_, err := srv.runOnce()
-	c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
-	c.Check(trashReqs.Count(), check.Equals, 4)
-	c.Check(pullReqs.Count(), check.Equals, 0)
-}
-
 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
 	lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
 	c.Assert(err, check.IsNil)
@@ -433,7 +416,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
 	c.Check(err, check.IsNil)
 	lost, err := ioutil.ReadFile(lostf.Name())
 	c.Assert(err, check.IsNil)
-	c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n")
+	c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
 }
 
 func (s *runSuite) TestDryRun(c *check.C) {
@@ -464,11 +447,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
 }
 
 func (s *runSuite) TestCommit(c *check.C) {
-	lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
-	c.Assert(err, check.IsNil)
-	s.config.Collections.BlobMissingReport = lostf.Name()
-	defer os.Remove(lostf.Name())
-
+	s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
 	s.config.ManagementToken = "xyzzy"
 	opts := RunOptions{
 		CommitPulls: true,
@@ -494,17 +473,18 @@ func (s *runSuite) TestCommit(c *check.C) {
 	// in a poor rendezvous position
 	c.Check(bal.stats.pulls, check.Equals, 2)
 
-	lost, err := ioutil.ReadFile(lostf.Name())
+	lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
 	c.Assert(err, check.IsNil)
-	c.Check(string(lost), check.Equals, "")
+	c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
 
 	buf, err := s.getMetrics(c, srv)
 	c.Check(err, check.IsNil)
-	c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
-	c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
-	c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
-	c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
-	c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
+	bufstr := buf.String()
+	c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+	c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+	c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+	c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+	c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
 }
 
 func (s *runSuite) TestRunForever(c *check.C) {
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index ba9edf939..6e0a066e8 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -8,7 +8,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"io"
 	"runtime"
 	"sync"
 	"sync/atomic"
@@ -52,6 +51,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
 	if err != nil {
 		return err
 	}
+	progressTicker := time.NewTicker(10 * time.Second)
+	defer progressTicker.Stop()
 	callCount := 0
 	for rows.Next() {
 		var coll arvados.Collection
@@ -74,8 +75,13 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
 		if err != nil {
 			return err
 		}
-		progress(callCount, expectCount)
+		select {
+		case <-progressTicker.C:
+			progress(callCount, expectCount)
+		default:
+		}
 	}
+	progress(callCount, expectCount)
 	rows.Close()
 	if err := rows.Err(); err != nil {
 		return err
@@ -104,36 +110,37 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 	threshold := time.Now()
 	thresholdStr := threshold.Format(time.RFC3339Nano)
 
+	updated := int64(0)
+
 	var err error
 	collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
 	go func() {
 		defer close(collQ)
-		err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
-			if coll.ModifiedAt.After(threshold) {
-				return io.EOF
-			}
-			if coll.IsTrashed {
-				return nil
+		err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+			if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+				bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+				cancel()
+				return context.Canceled
 			}
 			collQ <- coll
 			return nil
 		}, func(done, total int) {
 			bal.logf("update collections: %d/%d", done, total)
 		})
-		if err == io.EOF {
-			err = nil
-		} else if err != nil {
+		if err != nil && err != context.Canceled {
 			bal.logf("error updating collections: %s", err)
 		}
 	}()
 
-	var updated int64
 	var wg sync.WaitGroup
 	for i := 0; i < runtime.NumCPU(); i++ {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
 			for coll := range collQ {
+				if ctx.Err() != nil {
+					continue
+				}
 				blkids, err := coll.SizedDigests()
 				if err != nil {
 					bal.logf("%s: %s", coll.UUID, err)
@@ -153,7 +160,9 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 					where uuid=$4`,
 					repl, thresholdStr, classes, coll.UUID)
 				if err != nil {
-					bal.logf("%s: update failed: %s", coll.UUID, err)
+					if err != context.Canceled {
+						bal.logf("%s: update failed: %s", coll.UUID, err)
+					}
 					continue
 				}
 				atomic.AddInt64(&updated, 1)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 90ae0695d..e1573e7f7 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -19,6 +19,7 @@ import (
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/health"
 	"github.com/jmoiron/sqlx"
+	_ "github.com/lib/pq"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
 )
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index b154f6e99..65a2d5567 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -11,6 +11,11 @@ import (
 	"net/http"
 	"time"
 
+	"git.arvados.org/arvados.git/lib/config"
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/arvadostest"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"github.com/ghodss/yaml"
 	check "gopkg.in/check.v1"
 )
 
@@ -26,6 +31,8 @@ func (s *mainSuite) TestVersionFlag(c *check.C) {
 }
 
 func (s *mainSuite) TestHTTPServer(c *check.C) {
+	arvadostest.StartKeep(2, true)
+
 	ln, err := net.Listen("tcp", ":0")
 	if err != nil {
 		c.Fatal(err)
@@ -33,10 +40,17 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
 	_, p, err := net.SplitHostPort(ln.Addr().String())
 	c.Check(err, check.IsNil)
 	ln.Close()
-	config := "Clusters:\n zzzzz:\n  ManagementToken: abcdefg\n  Services: {Keepbalance: {InternalURLs: {'http://localhost:" + p + "/': {}}}}\n"
+	cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+	c.Assert(err, check.IsNil)
+	cluster, err := cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+	cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: "localhost:" + p, Path: "/"}] = arvados.ServiceInstance{}
+	cfg.Clusters[cluster.ClusterID] = *cluster
+	config, err := yaml.Marshal(cfg)
+	c.Assert(err, check.IsNil)
 
 	var stdout bytes.Buffer
-	go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBufferString(config), &stdout, &stdout)
+	go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBuffer(config), &stdout, &stdout)
 	done := make(chan struct{})
 	go func() {
 		defer close(done)
@@ -47,7 +61,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
 				c.Fatal(err)
 				return
 			}
-			req.Header.Set("Authorization", "Bearer abcdefg")
+			req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
 			resp, err := http.DefaultClient.Do(req)
 			if err != nil {
 				c.Logf("error %s", err)
@@ -73,6 +87,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
 		c.Log(stdout.String())
 		c.Fatal("timeout")
 	}
+	c.Log(stdout.String())
 
 	// Check non-metrics URL that gets passed through to us from
 	// service.Command

commit d012d691bde541c8dae0dac4c00cb74ed30d9e47
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 28 00:13:50 2021 -0400

    17574: add -pprof flag to service commands.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 9ca243125..40db4f9c7 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -12,6 +12,7 @@ import (
 	"io"
 	"net"
 	"net/http"
+	_ "net/http/pprof"
 	"net/url"
 	"os"
 	"strings"
@@ -70,6 +71,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 	loader := config.NewLoader(stdin, log)
 	loader.SetupFlags(flags)
 	versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
+	pprofAddr := flags.String("pprof", "", "Serve Go profile data at `[addr]:port`")
 	err = flags.Parse(args)
 	if err == flag.ErrHelp {
 		err = nil
@@ -80,6 +82,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 		return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
 	}
 
+	if *pprofAddr != "" {
+		go func() {
+			log.Println(http.ListenAndServe(*pprofAddr, nil))
+		}()
+	}
+
 	if strings.HasSuffix(prog, "controller") {
 		// Some config-loader checks try to make API calls via
 		// controller. Those can't be expected to work if this
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 93457fb82..90ae0695d 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -9,6 +9,8 @@ import (
 	"flag"
 	"fmt"
 	"io"
+	"net/http"
+	_ "net/http/pprof"
 	"os"
 
 	"git.arvados.org/arvados.git/lib/config"
@@ -40,6 +42,13 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
 		"update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
 	flags.Bool("version", false, "Write version information to stdout and exit 0")
 	dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+	pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+
+	if *pprofAddr != "" {
+		go func() {
+			logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+		}()
+	}
 
 	loader := config.NewLoader(os.Stdin, logger)
 	loader.SetupFlags(flags)

commit 11339f23c194809c53cb4a888aea18eb51ff27f8
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 28 00:04:56 2021 -0400

    17574: Speed up manifest parsing.
    
    Previous code spent a lot of time allocating, copying, and GCing
    strings. Working with byte slices is much faser.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
index cec20279d..785c18d4a 100644
--- a/sdk/go/arvados/collection.go
+++ b/sdk/go/arvados/collection.go
@@ -5,11 +5,10 @@
 package arvados
 
 import (
-	"bufio"
+	"bytes"
 	"crypto/md5"
 	"fmt"
 	"regexp"
-	"strings"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/blockdigest"
@@ -56,40 +55,40 @@ func (c Collection) resourceName() string {
 //
 // Zero-length blocks are not included.
 func (c *Collection) SizedDigests() ([]SizedDigest, error) {
-	manifestText := c.ManifestText
-	if manifestText == "" {
-		manifestText = c.UnsignedManifestText
+	manifestText := []byte(c.ManifestText)
+	if len(manifestText) == 0 {
+		manifestText = []byte(c.UnsignedManifestText)
 	}
-	if manifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+	if len(manifestText) == 0 && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
 		// TODO: Check more subtle forms of corruption, too
 		return nil, fmt.Errorf("manifest is missing")
 	}
-	var sds []SizedDigest
-	scanner := bufio.NewScanner(strings.NewReader(manifestText))
-	scanner.Buffer(make([]byte, 1048576), len(manifestText))
-	for scanner.Scan() {
-		line := scanner.Text()
-		tokens := strings.Split(line, " ")
+	sds := make([]SizedDigest, 0, len(manifestText)/40)
+	for _, line := range bytes.Split(manifestText, []byte{'\n'}) {
+		if len(line) == 0 {
+			continue
+		}
+		tokens := bytes.Split(line, []byte{' '})
 		if len(tokens) < 3 {
 			return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
 		}
 		for _, token := range tokens[1:] {
-			if !blockdigest.LocatorPattern.MatchString(token) {
+			if !blockdigest.LocatorPattern.Match(token) {
 				// FIXME: ensure it's a file token
 				break
 			}
-			if strings.HasPrefix(token, "d41d8cd98f00b204e9800998ecf8427e+0") {
+			if bytes.HasPrefix(token, []byte("d41d8cd98f00b204e9800998ecf8427e+0")) {
 				// Exclude "empty block" placeholder
 				continue
 			}
 			// FIXME: shouldn't assume 32 char hash
-			if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+			if i := bytes.IndexRune(token[33:], '+'); i >= 0 {
 				token = token[:33+i]
 			}
-			sds = append(sds, SizedDigest(token))
+			sds = append(sds, SizedDigest(string(token)))
 		}
 	}
-	return sds, scanner.Err()
+	return sds, nil
 }
 
 type CollectionList struct {

commit fdaca8b1c797b47ae7e08f0a9624dc84a3be3c44
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jul 27 11:23:48 2021 -0400

    17574: Add -update-confirmed-fields=false option.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 6a71cf99f..67021bef3 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -173,7 +173,12 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 			return
 		}
 	}
-	err = bal.updateCollections(ctx, client, cluster)
+	if runOptions.CommitConfirmedFields {
+		err = bal.updateCollections(ctx, client, cluster)
+		if err != nil {
+			return
+		}
+	}
 	return
 }
 
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index cbdde595e..5e1c0e45c 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -21,6 +21,7 @@ import (
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadostest"
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"github.com/jmoiron/sqlx"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/common/expfmt"
 	check "gopkg.in/check.v1"
@@ -309,6 +310,7 @@ func (s *stubServer) serveKeepstorePull() *reqTracker {
 type runSuite struct {
 	stub   stubServer
 	config *arvados.Cluster
+	db     *sqlx.DB
 	client *arvados.Client
 }
 
@@ -320,6 +322,7 @@ func (s *runSuite) newServer(options *RunOptions) *Server {
 		Metrics:    newMetrics(prometheus.NewRegistry()),
 		Logger:     options.Logger,
 		Dumper:     options.Dumper,
+		DB:         s.db,
 	}
 	return srv
 }
@@ -329,6 +332,8 @@ func (s *runSuite) SetUpTest(c *check.C) {
 	c.Assert(err, check.Equals, nil)
 	s.config, err = cfg.GetCluster("")
 	c.Assert(err, check.Equals, nil)
+	s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+	c.Assert(err, check.IsNil)
 
 	s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
 	arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index 564e36a43..1458fe452 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -88,9 +88,10 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 		logger := logrus.New()
 		logger.Out = io.MultiWriter(&logBuf, os.Stderr)
 		opts := RunOptions{
-			CommitPulls: true,
-			CommitTrash: true,
-			Logger:      logger,
+			CommitPulls:           true,
+			CommitTrash:           true,
+			CommitConfirmedFields: true,
+			Logger:                logger,
 		}
 
 		bal := &Balancer{
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 80b1ed301..93457fb82 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -36,6 +36,8 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
 		"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
 	flags.BoolVar(&options.CommitTrash, "commit-trash", false,
 		"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+	flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+		"update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
 	flags.Bool("version", false, "Write version information to stdout and exit 0")
 	dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
 
@@ -56,10 +58,11 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
 	// service.Command
 	args = nil
 	dropFlag := map[string]bool{
-		"once":         true,
-		"commit-pulls": true,
-		"commit-trash": true,
-		"dump":         true,
+		"once":                    true,
+		"commit-pulls":            true,
+		"commit-trash":            true,
+		"commit-confirmed-fields": true,
+		"dump":                    true,
 	}
 	flags.Visit(func(f *flag.Flag) {
 		if !dropFlag[f.Name] {
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index b42fa23a3..5299b96c1 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -24,11 +24,12 @@ import (
 //
 // RunOptions fields are controlled by command line flags.
 type RunOptions struct {
-	Once        bool
-	CommitPulls bool
-	CommitTrash bool
-	Logger      logrus.FieldLogger
-	Dumper      logrus.FieldLogger
+	Once                  bool
+	CommitPulls           bool
+	CommitTrash           bool
+	CommitConfirmedFields bool
+	Logger                logrus.FieldLogger
+	Dumper                logrus.FieldLogger
 
 	// SafeRendezvousState from the most recent balance operation,
 	// or "" if unknown. If this changes from one run to the next,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list