[ARVADOS] created: 2.1.0-1096-gbf4166939
Git user
git at public.arvados.org
Mon Jul 26 19:29:10 UTC 2021
at bf4166939c77771642af846cb5372efc8a78659a (commit)
commit bf4166939c77771642af846cb5372efc8a78659a
Author: Tom Clegg <tom at curii.com>
Date: Mon Jul 26 15:28:01 2021 -0400
17574: Update replication_confirmed fields after keep-balance run.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 4b7ad6dd5..d770ca76d 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -31,7 +31,10 @@ const (
UserAgreementPDH = "b519d9cb706a29fc7ea24dbea2f05851+93"
HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62"
- MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+ MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+ StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
+ StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
+ EmptyCollectionUUID = "zzzzz-4zz18-gs9ooj1h9sd5mde"
AProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso"
ASubprojectUUID = "zzzzz-j7d0g-axqo7eu9pwvna1x"
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 86423a297..a7dcf6190 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -167,7 +167,11 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
}
if runOptions.CommitTrash {
err = bal.CommitTrash(ctx, client)
+ if err != nil {
+ return
+ }
}
+ err = bal.updateCollections(ctx, client, cluster)
return
}
@@ -460,7 +464,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
if coll.ReplicationDesired != nil {
repl = *coll.ReplicationDesired
}
- bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
// Pass pdh to IncreaseDesired only if LostBlocksFile is being
// written -- otherwise it's just a waste of memory.
pdh := ""
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index 029f8c6c0..e30b4ff79 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -133,3 +133,53 @@ func (bsm *BlockStateMap) IncreaseDesired(pdh string, classes []string, n int, b
bsm.get(blkid).increaseDesired(pdh, classes, n)
}
}
+
+// GetConfirmedReplication returns the replication level of the given
+// blocks, considering only the specified storage classes.
+//
+// If len(classes)==0, returns the replication level without regard to
+// storage classes.
+//
+// Safe to call concurrently with other calls to GetCurrent, but not
+// with different BlockStateMap methods.
+func (bsm *BlockStateMap) GetConfirmedReplication(blkids []arvados.SizedDigest, classes []string) int {
+ defaultClasses := map[string]bool{"default": true}
+ min := 0
+ for _, blkid := range blkids {
+ total := 0
+ perclass := make(map[string]int, len(classes))
+ for _, c := range classes {
+ perclass[c] = 0
+ }
+ for _, r := range bsm.get(blkid).Replicas {
+ total += r.KeepMount.Replication
+ mntclasses := r.KeepMount.StorageClasses
+ if len(mntclasses) == 0 {
+ mntclasses = defaultClasses
+ }
+ for c := range mntclasses {
+ n, ok := perclass[c]
+ if !ok {
+ // Don't care about this storage class
+ continue
+ }
+ perclass[c] = n + r.KeepMount.Replication
+ }
+ }
+ if total == 0 {
+ return 0
+ }
+ for _, n := range perclass {
+ if n == 0 {
+ return 0
+ }
+ if n < min || min == 0 {
+ min = n
+ }
+ }
+ if len(perclass) == 0 && (min == 0 || min > total) {
+ min = total
+ }
+ }
+ return min
+}
diff --git a/services/keep-balance/block_state_test.go b/services/keep-balance/block_state_test.go
new file mode 100644
index 000000000..aaf2c18e2
--- /dev/null
+++ b/services/keep-balance/block_state_test.go
@@ -0,0 +1,94 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&confirmedReplicationSuite{})
+
+type confirmedReplicationSuite struct {
+ blockStateMap *BlockStateMap
+ mtime int64
+}
+
+func (s *confirmedReplicationSuite) SetUpTest(c *check.C) {
+ t, _ := time.Parse(time.RFC3339Nano, time.RFC3339Nano)
+ s.mtime = t.UnixNano()
+ s.blockStateMap = NewBlockStateMap()
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 1,
+ StorageClasses: map[string]bool{"default": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(10), Mtime: s.mtime},
+ })
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 2,
+ StorageClasses: map[string]bool{"default": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(20), Mtime: s.mtime},
+ })
+}
+
+func (s *confirmedReplicationSuite) TestZeroReplication(c *check.C) {
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(404), knownBlkid(409)}, []string{"default"})
+ c.Check(n, check.Equals, 0)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, []string{"default"})
+ c.Check(n, check.Equals, 0)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, nil)
+ c.Check(n, check.Equals, 0)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksWithDifferentReplication(c *check.C) {
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(20)}, []string{"default"})
+ c.Check(n, check.Equals, 1)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksInDifferentClasses(c *check.C) {
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 3,
+ StorageClasses: map[string]bool{"three": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(30), Mtime: s.mtime},
+ })
+
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(30)}, []string{"three"})
+ c.Check(n, check.Equals, 3)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"default"})
+ c.Check(n, check.Equals, 0) // block 30 has repl 0 @ "default"
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"three"})
+ c.Check(n, check.Equals, 0) // block 20 has repl 0 @ "three"
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, nil)
+ c.Check(n, check.Equals, 2)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksOnMultipleMounts(c *check.C) {
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 2,
+ StorageClasses: map[string]bool{"default": true, "four": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+ {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+ })
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 2,
+ StorageClasses: map[string]bool{"four": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+ {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+ })
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default"})
+ c.Check(n, check.Equals, 2)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"four"})
+ c.Check(n, check.Equals, 4)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default", "four"})
+ c.Check(n, check.Equals, 2)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
+ c.Check(n, check.Equals, 4)
+}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index 1659918ca..3afb1ccc5 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -6,10 +6,16 @@ package main
import (
"context"
+ "encoding/json"
"fmt"
+ "io"
+ "runtime"
+ "sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/jmoiron/sqlx"
)
func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
@@ -65,7 +71,7 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
Limit: &limit,
Order: "modified_at, uuid",
Count: "none",
- Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+ Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
IncludeTrash: true,
IncludeOldVersions: true,
}
@@ -165,3 +171,98 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
return nil
}
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ defer bal.time("update_collections", "wall clock time to update collections")()
+ threshold := time.Now()
+ thresholdStr := threshold.Format(time.RFC3339Nano)
+
+ var err error
+ collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+ go func() {
+ defer close(collQ)
+ err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+ if coll.ModifiedAt.After(threshold) {
+ return io.EOF
+ }
+ if coll.IsTrashed {
+ return nil
+ }
+ collQ <- coll
+ return nil
+ }, func(done, total int) {
+ bal.logf("update collections: %d/%d", done, total)
+ })
+ if err == io.EOF {
+ err = nil
+ } else if err != nil {
+ bal.logf("error updating collections: %s", err)
+ }
+ }()
+
+ db, err := bal.db(cluster)
+ if err != nil {
+ return err
+ }
+
+ var updated int64
+ var wg sync.WaitGroup
+ for i := 0; i < runtime.NumCPU(); i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.logf("%s: %s", coll.UUID, err)
+ continue
+ }
+ repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+ tx, err := db.Beginx()
+ if err != nil {
+ bal.logf("error opening transaction: %s", coll.UUID, err)
+ cancel()
+ continue
+ }
+ classes, _ := json.Marshal(coll.StorageClassesDesired)
+ _, err = tx.ExecContext(ctx, `update collections set
+ replication_confirmed=$1,
+ replication_confirmed_at=$2,
+ storage_classes_confirmed=$3,
+ storage_classes_confirmed_at=$2
+ where uuid=$4`,
+ repl, thresholdStr, classes, coll.UUID)
+ if err != nil {
+ tx.Rollback()
+ } else {
+ err = tx.Commit()
+ }
+ if err != nil {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ continue
+ }
+ atomic.AddInt64(&updated, 1)
+ }
+ }()
+ }
+ wg.Wait()
+ bal.logf("updated %d collections", updated)
+ return err
+}
+
+func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
+ db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ if err != nil {
+ return nil, err
+ }
+ if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
+ }
+ if err := db.Ping(); err != nil {
+ return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
+ }
+ return db, nil
+}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index defabd9a1..e4297bfe6 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -6,6 +6,7 @@ package main
import (
"bytes"
+ "io"
"os"
"strings"
"testing"
@@ -81,7 +82,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
for iter := 0; iter < 20; iter++ {
logBuf.Reset()
logger := logrus.New()
- logger.Out = &logBuf
+ logger.Out = io.MultiWriter(&logBuf, os.Stderr)
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
@@ -105,4 +106,23 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
time.Sleep(200 * time.Millisecond)
}
c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+
+ for _, trial := range []struct {
+ uuid string
+ repl int
+ classes []string
+ }{
+ {arvadostest.EmptyCollectionUUID, 0, []string{"default"}},
+ {arvadostest.FooCollection, 4, []string{"default"}}, // "foo" blk
+ {arvadostest.StorageClassesDesiredDefaultConfirmedDefault, 2, []string{"default"}}, // "bar" blk
+ {arvadostest.StorageClassesDesiredArchiveConfirmedDefault, 0, []string{"archive"}}, // "bar" blk
+ } {
+ c.Logf("%#v", trial)
+ var coll arvados.Collection
+ s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+trial.uuid, nil, nil)
+ if c.Check(coll.ReplicationConfirmed, check.NotNil) {
+ c.Check(*coll.ReplicationConfirmed, check.Equals, trial.repl)
+ }
+ c.Check(coll.StorageClassesConfirmed, check.DeepEquals, trial.classes)
+ }
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list