[ARVADOS] updated: 2.1.0-1097-g325ba452b

Git user git at public.arvados.org
Tue Jul 27 15:17:01 UTC 2021


Summary of changes:
 services/keep-balance/balance.go          |   4 +-
 services/keep-balance/collection.go       | 166 ++++++------------------------
 services/keep-balance/collection_test.go  |  73 +++++--------
 services/keep-balance/integration_test.go |   5 +
 services/keep-balance/main.go             |  14 +++
 services/keep-balance/server.go           |   6 +-
 6 files changed, 84 insertions(+), 184 deletions(-)

       via  325ba452bdb2b8ebe4ef2a85d495291429df8082 (commit)
      from  bf4166939c77771642af846cb5372efc8a78659a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 325ba452bdb2b8ebe4ef2a85d495291429df8082
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jul 27 11:16:49 2021 -0400

    17574: Get collections directly from DB instead of controller.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index a7dcf6190..6a71cf99f 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -23,6 +23,7 @@ import (
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
+	"github.com/jmoiron/sqlx"
 	"github.com/sirupsen/logrus"
 )
 
@@ -36,6 +37,7 @@ import (
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
+	DB      *sqlx.DB
 	Logger  logrus.FieldLogger
 	Dumper  logrus.FieldLogger
 	Metrics *metrics
@@ -424,7 +426,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		err = EachCollection(ctx, c, pageSize,
+		err = EachCollection(ctx, bal.DB, c,
 			func(coll arvados.Collection) error {
 				collQ <- coll
 				if len(errs) > 0 {
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index 3afb1ccc5..ba9edf939 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -34,10 +34,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 // The progress function is called periodically with done (number of
 // times f has been called) and total (number of times f is expected
 // to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
 	if progress == nil {
 		progress = func(_, _ int) {}
 	}
@@ -49,124 +46,51 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
 	if err != nil {
 		return err
 	}
+	var newestModifiedAt time.Time
 
-	// Note the obvious way to get all collections (sorting by
-	// UUID) would be much easier, but would lose data: If a
-	// client were to move files from collection with uuid="zzz"
-	// to a collection with uuid="aaa" around the time when we
-	// were fetching the "mmm" page, we would never see those
-	// files' block IDs at all -- even if the client is careful to
-	// save "aaa" before saving "zzz".
-	//
-	// Instead, we get pages in modified_at order. Collections
-	// that are modified during the run will be re-fetched in a
-	// subsequent page.
-
-	limit := pageSize
-	if limit <= 0 {
-		// Use the maximum page size the server allows
-		limit = 1<<31 - 1
-	}
-	params := arvados.ResourceListParams{
-		Limit:              &limit,
-		Order:              "modified_at, uuid",
-		Count:              "none",
-		Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
-		IncludeTrash:       true,
-		IncludeOldVersions: true,
+	rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+	if err != nil {
+		return err
 	}
-	var last arvados.Collection
-	var filterTime time.Time
 	callCount := 0
-	gettingExactTimestamp := false
-	for {
-		progress(callCount, expectCount)
-		var page arvados.CollectionList
-		err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+	for rows.Next() {
+		var coll arvados.Collection
+		var classesDesired []byte
+		err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
 		if err != nil {
+			rows.Close()
 			return err
 		}
-		for _, coll := range page.Items {
-			if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
-				continue
-			}
-			callCount++
-			err = f(coll)
-			if err != nil {
-				return err
-			}
-			last = coll
+		err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+		if err != nil {
+			rows.Close()
+			return err
 		}
-		if len(page.Items) == 0 && !gettingExactTimestamp {
-			break
-		} else if last.ModifiedAt.IsZero() {
-			return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
-		} else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
-			// If we requested time>=X and never got a
-			// time>X then we might not have received all
-			// items with time==X yet. Switch to
-			// gettingExactTimestamp mode (if we're not
-			// there already), advancing our UUID
-			// threshold with each request, until we get
-			// an empty page.
-			gettingExactTimestamp = true
-			params.Filters = []arvados.Filter{{
-				Attr:     "modified_at",
-				Operator: "=",
-				Operand:  filterTime,
-			}, {
-				Attr:     "uuid",
-				Operator: ">",
-				Operand:  last.UUID,
-			}}
-		} else if gettingExactTimestamp {
-			// This must be an empty page (in this mode,
-			// an unequal timestamp is impossible) so we
-			// can start getting pages of newer
-			// collections.
-			gettingExactTimestamp = false
-			params.Filters = []arvados.Filter{{
-				Attr:     "modified_at",
-				Operator: ">",
-				Operand:  filterTime,
-			}}
-		} else {
-			// In the normal case, we know we have seen
-			// all collections with modtime<filterTime,
-			// but we might not have seen all that have
-			// modtime=filterTime. Hence we use >= instead
-			// of > and skip the obvious overlapping item,
-			// i.e., the last item on the previous
-			// page. In some edge cases this can return
-			// collections we have already seen, but
-			// avoiding that would add overhead in the
-			// overwhelmingly common cases, so we don't
-			// bother.
-			filterTime = last.ModifiedAt
-			params.Filters = []arvados.Filter{{
-				Attr:     "modified_at",
-				Operator: ">=",
-				Operand:  filterTime,
-			}, {
-				Attr:     "uuid",
-				Operator: "!=",
-				Operand:  last.UUID,
-			}}
+		if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+			newestModifiedAt = coll.ModifiedAt
 		}
+		callCount++
+		err = f(coll)
+		if err != nil {
+			return err
+		}
+		progress(callCount, expectCount)
+	}
+	rows.Close()
+	if err := rows.Err(); err != nil {
+		return err
 	}
-	progress(callCount, expectCount)
-
 	if checkCount, err := countCollections(c, arvados.ResourceListParams{
 		Filters: []arvados.Filter{{
 			Attr:     "modified_at",
 			Operator: "<=",
-			Operand:  filterTime}},
+			Operand:  newestModifiedAt}},
 		IncludeTrash:       true,
 		IncludeOldVersions: true,
 	}); err != nil {
 		return err
 	} else if callCount < checkCount {
-		return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+		return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
 	}
 
 	return nil
@@ -184,7 +108,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 	collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
 	go func() {
 		defer close(collQ)
-		err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+		err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
 			if coll.ModifiedAt.After(threshold) {
 				return io.EOF
 			}
@@ -203,11 +127,6 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 		}
 	}()
 
-	db, err := bal.db(cluster)
-	if err != nil {
-		return err
-	}
-
 	var updated int64
 	var wg sync.WaitGroup
 	for i := 0; i < runtime.NumCPU(); i++ {
@@ -221,25 +140,18 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 					continue
 				}
 				repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
-				tx, err := db.Beginx()
+				classes, err := json.Marshal(coll.StorageClassesDesired)
 				if err != nil {
-					bal.logf("error opening transaction: %s", coll.UUID, err)
-					cancel()
+					bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
 					continue
 				}
-				classes, _ := json.Marshal(coll.StorageClassesDesired)
-				_, err = tx.ExecContext(ctx, `update collections set
+				_, err = bal.DB.ExecContext(ctx, `update collections set
 					replication_confirmed=$1,
 					replication_confirmed_at=$2,
 					storage_classes_confirmed=$3,
 					storage_classes_confirmed_at=$2
 					where uuid=$4`,
 					repl, thresholdStr, classes, coll.UUID)
-				if err != nil {
-					tx.Rollback()
-				} else {
-					err = tx.Commit()
-				}
 				if err != nil {
 					bal.logf("%s: update failed: %s", coll.UUID, err)
 					continue
@@ -252,17 +164,3 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 	bal.logf("updated %d collections", updated)
 	return err
 }
-
-func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
-	db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
-	if err != nil {
-		return nil, err
-	}
-	if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
-		db.SetMaxOpenConns(p)
-	}
-	if err := db.Ping(); err != nil {
-		return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
-	}
-	return db, nil
-}
diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go
index 3ab9d07b2..f749bad6a 100644
--- a/services/keep-balance/collection_test.go
+++ b/services/keep-balance/collection_test.go
@@ -6,57 +6,34 @@ package main
 
 import (
 	"context"
-	"sync"
-	"time"
 
+	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"github.com/jmoiron/sqlx"
 	check "gopkg.in/check.v1"
 )
 
-//  TestIdenticalTimestamps ensures EachCollection returns the same
-//  set of collections for various page sizes -- even page sizes so
-//  small that we get entire pages full of collections with identical
-//  timestamps and exercise our gettingExactTimestamp cases.
-func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
-	// pageSize==0 uses the default (large) page size.
-	pageSizes := []int{0, 2, 3, 4, 5}
-	got := make([][]string, len(pageSizes))
-	var wg sync.WaitGroup
-	for trial, pageSize := range pageSizes {
-		wg.Add(1)
-		go func(trial, pageSize int) {
-			defer wg.Done()
-			streak := 0
-			longestStreak := 0
-			var lastMod time.Time
-			sawUUID := make(map[string]bool)
-			err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
-				if c.ModifiedAt.IsZero() {
-					return nil
-				}
-				if sawUUID[c.UUID] {
-					// dup
-					return nil
-				}
-				got[trial] = append(got[trial], c.UUID)
-				sawUUID[c.UUID] = true
-				if lastMod == c.ModifiedAt {
-					streak++
-					if streak > longestStreak {
-						longestStreak = streak
-					}
-				} else {
-					streak = 0
-					lastMod = c.ModifiedAt
-				}
-				return nil
-			}, nil)
-			c.Check(err, check.IsNil)
-			c.Check(longestStreak > 25, check.Equals, true)
-		}(trial, pageSize)
-	}
-	wg.Wait()
-	for trial := 1; trial < len(pageSizes); trial++ {
-		c.Check(got[trial], check.DeepEquals, got[0])
-	}
+// TestMissedCollections exercises EachCollection's sanity check:
+// #collections processed >= #old collections that exist in database
+// after processing.
+func (s *integrationSuite) TestMissedCollections(c *check.C) {
+	cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+	c.Assert(err, check.IsNil)
+	cluster, err := cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+	db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+	c.Assert(err, check.IsNil)
+
+	defer db.Exec(`delete from collections where uuid = 'zzzzz-4zz18-404040404040404'`)
+	insertedOld := false
+	err = EachCollection(context.Background(), db, s.client, func(coll arvados.Collection) error {
+		if !insertedOld {
+			insertedOld = true
+			_, err := db.Exec(`insert into collections (uuid, created_at, updated_at, modified_at) values ('zzzzz-4zz18-404040404040404', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z')`)
+			return err
+		}
+		return nil
+	}, nil)
+	c.Check(err, check.ErrorMatches, `Retrieved .* collections .* but server now reports .* collections.*`)
 }
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index e4297bfe6..564e36a43 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -18,6 +18,7 @@ import (
 	"git.arvados.org/arvados.git/sdk/go/arvadostest"
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
+	"github.com/jmoiron/sqlx"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
 	check "gopkg.in/check.v1"
@@ -27,6 +28,7 @@ var _ = check.Suite(&integrationSuite{})
 
 type integrationSuite struct {
 	config     *arvados.Cluster
+	db         *sqlx.DB
 	client     *arvados.Client
 	keepClient *keepclient.KeepClient
 }
@@ -68,6 +70,8 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
 	c.Assert(err, check.Equals, nil)
 	s.config, err = cfg.GetCluster("")
 	c.Assert(err, check.Equals, nil)
+	s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+	c.Assert(err, check.IsNil)
 	s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
 
 	s.client = &arvados.Client{
@@ -90,6 +94,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 		}
 
 		bal := &Balancer{
+			DB:      s.db,
 			Logger:  logger,
 			Metrics: newMetrics(prometheus.NewRegistry()),
 		}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 8b4ee84c7..80b1ed301 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -16,6 +16,7 @@ import (
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/health"
+	"github.com/jmoiron/sqlx"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
 )
@@ -78,6 +79,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
 				return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
 			}
 
+			db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+			if err != nil {
+				return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+			}
+			if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+				db.SetMaxOpenConns(p)
+			}
+			err = db.Ping()
+			if err != nil {
+				return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+			}
+
 			if options.Logger == nil {
 				options.Logger = ctxlog.FromContext(ctx)
 			}
@@ -89,6 +102,7 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
 				Metrics:    newMetrics(registry),
 				Logger:     options.Logger,
 				Dumper:     options.Dumper,
+				DB:         db,
 			}
 			srv.Handler = &health.Handler{
 				Token:  cluster.ManagementToken,
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index 9801a3fd4..b42fa23a3 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -12,6 +12,7 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"github.com/jmoiron/sqlx"
 	"github.com/sirupsen/logrus"
 )
 
@@ -46,11 +47,13 @@ type Server struct {
 
 	Logger logrus.FieldLogger
 	Dumper logrus.FieldLogger
+
+	DB *sqlx.DB
 }
 
 // CheckHealth implements service.Handler.
 func (srv *Server) CheckHealth() error {
-	return nil
+	return srv.DB.Ping()
 }
 
 // Done implements service.Handler.
@@ -75,6 +78,7 @@ func (srv *Server) run() {
 
 func (srv *Server) runOnce() (*Balancer, error) {
 	bal := &Balancer{
+		DB:             srv.DB,
 		Logger:         srv.Logger,
 		Dumper:         srv.Dumper,
 		Metrics:        srv.Metrics,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list