[ARVADOS] created: 2.1.0-1096-gbf4166939

Git user git at public.arvados.org
Mon Jul 26 19:29:10 UTC 2021


        at  bf4166939c77771642af846cb5372efc8a78659a (commit)


commit bf4166939c77771642af846cb5372efc8a78659a
Author: Tom Clegg <tom at curii.com>
Date:   Mon Jul 26 15:28:01 2021 -0400

    17574: Update replication_confirmed fields after keep-balance run.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 4b7ad6dd5..d770ca76d 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -31,7 +31,10 @@ const (
 	UserAgreementPDH        = "b519d9cb706a29fc7ea24dbea2f05851+93"
 	HelloWorldPdh           = "55713e6a34081eb03609e7ad5fcad129+62"
 
-	MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+	MultilevelCollection1                        = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+	StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
+	StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
+	EmptyCollectionUUID                          = "zzzzz-4zz18-gs9ooj1h9sd5mde"
 
 	AProjectUUID    = "zzzzz-j7d0g-v955i6s2oi1cbso"
 	ASubprojectUUID = "zzzzz-j7d0g-axqo7eu9pwvna1x"
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 86423a297..a7dcf6190 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -167,7 +167,11 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 	}
 	if runOptions.CommitTrash {
 		err = bal.CommitTrash(ctx, client)
+		if err != nil {
+			return
+		}
 	}
+	err = bal.updateCollections(ctx, client, cluster)
 	return
 }
 
@@ -460,7 +464,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
 	if coll.ReplicationDesired != nil {
 		repl = *coll.ReplicationDesired
 	}
-	bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+	bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
 	// Pass pdh to IncreaseDesired only if LostBlocksFile is being
 	// written -- otherwise it's just a waste of memory.
 	pdh := ""
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index 029f8c6c0..e30b4ff79 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -133,3 +133,53 @@ func (bsm *BlockStateMap) IncreaseDesired(pdh string, classes []string, n int, b
 		bsm.get(blkid).increaseDesired(pdh, classes, n)
 	}
 }
+
+// GetConfirmedReplication returns the replication level of the given
+// blocks, considering only the specified storage classes.
+//
+// If len(classes)==0, returns the replication level without regard to
+// storage classes.
+//
+// Safe to call concurrently with other calls to GetCurrent, but not
+// with different BlockStateMap methods.
+func (bsm *BlockStateMap) GetConfirmedReplication(blkids []arvados.SizedDigest, classes []string) int {
+	defaultClasses := map[string]bool{"default": true}
+	min := 0
+	for _, blkid := range blkids {
+		total := 0
+		perclass := make(map[string]int, len(classes))
+		for _, c := range classes {
+			perclass[c] = 0
+		}
+		for _, r := range bsm.get(blkid).Replicas {
+			total += r.KeepMount.Replication
+			mntclasses := r.KeepMount.StorageClasses
+			if len(mntclasses) == 0 {
+				mntclasses = defaultClasses
+			}
+			for c := range mntclasses {
+				n, ok := perclass[c]
+				if !ok {
+					// Don't care about this storage class
+					continue
+				}
+				perclass[c] = n + r.KeepMount.Replication
+			}
+		}
+		if total == 0 {
+			return 0
+		}
+		for _, n := range perclass {
+			if n == 0 {
+				return 0
+			}
+			if n < min || min == 0 {
+				min = n
+			}
+		}
+		if len(perclass) == 0 && (min == 0 || min > total) {
+			min = total
+		}
+	}
+	return min
+}
diff --git a/services/keep-balance/block_state_test.go b/services/keep-balance/block_state_test.go
new file mode 100644
index 000000000..aaf2c18e2
--- /dev/null
+++ b/services/keep-balance/block_state_test.go
@@ -0,0 +1,94 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"time"
+
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&confirmedReplicationSuite{})
+
+type confirmedReplicationSuite struct {
+	blockStateMap *BlockStateMap
+	mtime         int64
+}
+
+func (s *confirmedReplicationSuite) SetUpTest(c *check.C) {
+	t, _ := time.Parse(time.RFC3339Nano, time.RFC3339Nano)
+	s.mtime = t.UnixNano()
+	s.blockStateMap = NewBlockStateMap()
+	s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+		Replication:    1,
+		StorageClasses: map[string]bool{"default": true},
+	}}, []arvados.KeepServiceIndexEntry{
+		{SizedDigest: knownBlkid(10), Mtime: s.mtime},
+	})
+	s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+		Replication:    2,
+		StorageClasses: map[string]bool{"default": true},
+	}}, []arvados.KeepServiceIndexEntry{
+		{SizedDigest: knownBlkid(20), Mtime: s.mtime},
+	})
+}
+
+func (s *confirmedReplicationSuite) TestZeroReplication(c *check.C) {
+	n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(404), knownBlkid(409)}, []string{"default"})
+	c.Check(n, check.Equals, 0)
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, []string{"default"})
+	c.Check(n, check.Equals, 0)
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, nil)
+	c.Check(n, check.Equals, 0)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksWithDifferentReplication(c *check.C) {
+	n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(20)}, []string{"default"})
+	c.Check(n, check.Equals, 1)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksInDifferentClasses(c *check.C) {
+	s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+		Replication:    3,
+		StorageClasses: map[string]bool{"three": true},
+	}}, []arvados.KeepServiceIndexEntry{
+		{SizedDigest: knownBlkid(30), Mtime: s.mtime},
+	})
+
+	n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(30)}, []string{"three"})
+	c.Check(n, check.Equals, 3)
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"default"})
+	c.Check(n, check.Equals, 0) // block 30 has repl 0 @ "default"
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"three"})
+	c.Check(n, check.Equals, 0) // block 20 has repl 0 @ "three"
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, nil)
+	c.Check(n, check.Equals, 2)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksOnMultipleMounts(c *check.C) {
+	s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+		Replication:    2,
+		StorageClasses: map[string]bool{"default": true, "four": true},
+	}}, []arvados.KeepServiceIndexEntry{
+		{SizedDigest: knownBlkid(40), Mtime: s.mtime},
+		{SizedDigest: knownBlkid(41), Mtime: s.mtime},
+	})
+	s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+		Replication:    2,
+		StorageClasses: map[string]bool{"four": true},
+	}}, []arvados.KeepServiceIndexEntry{
+		{SizedDigest: knownBlkid(40), Mtime: s.mtime},
+		{SizedDigest: knownBlkid(41), Mtime: s.mtime},
+	})
+	n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default"})
+	c.Check(n, check.Equals, 2)
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"four"})
+	c.Check(n, check.Equals, 4)
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default", "four"})
+	c.Check(n, check.Equals, 2)
+	n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
+	c.Check(n, check.Equals, 4)
+}
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index 1659918ca..3afb1ccc5 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -6,10 +6,16 @@ package main
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
+	"io"
+	"runtime"
+	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"github.com/jmoiron/sqlx"
 )
 
 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
@@ -65,7 +71,7 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
 		Limit:              &limit,
 		Order:              "modified_at, uuid",
 		Count:              "none",
-		Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+		Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
 		IncludeTrash:       true,
 		IncludeOldVersions: true,
 	}
@@ -165,3 +171,98 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
 
 	return nil
 }
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	defer bal.time("update_collections", "wall clock time to update collections")()
+	threshold := time.Now()
+	thresholdStr := threshold.Format(time.RFC3339Nano)
+
+	var err error
+	collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+	go func() {
+		defer close(collQ)
+		err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+			if coll.ModifiedAt.After(threshold) {
+				return io.EOF
+			}
+			if coll.IsTrashed {
+				return nil
+			}
+			collQ <- coll
+			return nil
+		}, func(done, total int) {
+			bal.logf("update collections: %d/%d", done, total)
+		})
+		if err == io.EOF {
+			err = nil
+		} else if err != nil {
+			bal.logf("error updating collections: %s", err)
+		}
+	}()
+
+	db, err := bal.db(cluster)
+	if err != nil {
+		return err
+	}
+
+	var updated int64
+	var wg sync.WaitGroup
+	for i := 0; i < runtime.NumCPU(); i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for coll := range collQ {
+				blkids, err := coll.SizedDigests()
+				if err != nil {
+					bal.logf("%s: %s", coll.UUID, err)
+					continue
+				}
+				repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+				tx, err := db.Beginx()
+				if err != nil {
+					bal.logf("error opening transaction: %s", coll.UUID, err)
+					cancel()
+					continue
+				}
+				classes, _ := json.Marshal(coll.StorageClassesDesired)
+				_, err = tx.ExecContext(ctx, `update collections set
+					replication_confirmed=$1,
+					replication_confirmed_at=$2,
+					storage_classes_confirmed=$3,
+					storage_classes_confirmed_at=$2
+					where uuid=$4`,
+					repl, thresholdStr, classes, coll.UUID)
+				if err != nil {
+					tx.Rollback()
+				} else {
+					err = tx.Commit()
+				}
+				if err != nil {
+					bal.logf("%s: update failed: %s", coll.UUID, err)
+					continue
+				}
+				atomic.AddInt64(&updated, 1)
+			}
+		}()
+	}
+	wg.Wait()
+	bal.logf("updated %d collections", updated)
+	return err
+}
+
+func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
+	db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+	if err != nil {
+		return nil, err
+	}
+	if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+		db.SetMaxOpenConns(p)
+	}
+	if err := db.Ping(); err != nil {
+		return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
+	}
+	return db, nil
+}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index defabd9a1..e4297bfe6 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -6,6 +6,7 @@ package main
 
 import (
 	"bytes"
+	"io"
 	"os"
 	"strings"
 	"testing"
@@ -81,7 +82,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 	for iter := 0; iter < 20; iter++ {
 		logBuf.Reset()
 		logger := logrus.New()
-		logger.Out = &logBuf
+		logger.Out = io.MultiWriter(&logBuf, os.Stderr)
 		opts := RunOptions{
 			CommitPulls: true,
 			CommitTrash: true,
@@ -105,4 +106,23 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 		time.Sleep(200 * time.Millisecond)
 	}
 	c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+
+	for _, trial := range []struct {
+		uuid    string
+		repl    int
+		classes []string
+	}{
+		{arvadostest.EmptyCollectionUUID, 0, []string{"default"}},
+		{arvadostest.FooCollection, 4, []string{"default"}},                                // "foo" blk
+		{arvadostest.StorageClassesDesiredDefaultConfirmedDefault, 2, []string{"default"}}, // "bar" blk
+		{arvadostest.StorageClassesDesiredArchiveConfirmedDefault, 0, []string{"archive"}}, // "bar" blk
+	} {
+		c.Logf("%#v", trial)
+		var coll arvados.Collection
+		s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+trial.uuid, nil, nil)
+		if c.Check(coll.ReplicationConfirmed, check.NotNil) {
+			c.Check(*coll.ReplicationConfirmed, check.Equals, trial.repl)
+		}
+		c.Check(coll.StorageClassesConfirmed, check.DeepEquals, trial.classes)
+	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list