[ARVADOS] updated: 2.1.0-1097-g325ba452b
Git user
git at public.arvados.org
Tue Jul 27 15:17:01 UTC 2021
Summary of changes:
services/keep-balance/balance.go | 4 +-
services/keep-balance/collection.go | 166 ++++++------------------------
services/keep-balance/collection_test.go | 73 +++++--------
services/keep-balance/integration_test.go | 5 +
services/keep-balance/main.go | 14 +++
services/keep-balance/server.go | 6 +-
6 files changed, 84 insertions(+), 184 deletions(-)
via 325ba452bdb2b8ebe4ef2a85d495291429df8082 (commit)
from bf4166939c77771642af846cb5372efc8a78659a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 325ba452bdb2b8ebe4ef2a85d495291429df8082
Author: Tom Clegg <tom at curii.com>
Date: Tue Jul 27 11:16:49 2021 -0400
17574: Get collections directly from DB instead of controller.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index a7dcf6190..6a71cf99f 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -23,6 +23,7 @@ import (
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
@@ -36,6 +37,7 @@ import (
// BlobSignatureTTL; and all N existing replicas of a given data block
// are in the N best positions in rendezvous probe order.
type Balancer struct {
+ DB *sqlx.DB
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
Metrics *metrics
@@ -424,7 +426,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(ctx, c, pageSize,
+ err = EachCollection(ctx, bal.DB, c,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index 3afb1ccc5..ba9edf939 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -34,10 +34,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
// The progress function is called periodically with done (number of
// times f has been called) and total (number of times f is expected
// to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
@@ -49,124 +46,51 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
if err != nil {
return err
}
+ var newestModifiedAt time.Time
- // Note the obvious way to get all collections (sorting by
- // UUID) would be much easier, but would lose data: If a
- // client were to move files from collection with uuid="zzz"
- // to a collection with uuid="aaa" around the time when we
- // were fetching the "mmm" page, we would never see those
- // files' block IDs at all -- even if the client is careful to
- // save "aaa" before saving "zzz".
- //
- // Instead, we get pages in modified_at order. Collections
- // that are modified during the run will be re-fetched in a
- // subsequent page.
-
- limit := pageSize
- if limit <= 0 {
- // Use the maximum page size the server allows
- limit = 1<<31 - 1
- }
- params := arvados.ResourceListParams{
- Limit: &limit,
- Order: "modified_at, uuid",
- Count: "none",
- Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
- IncludeTrash: true,
- IncludeOldVersions: true,
+ rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+ if err != nil {
+ return err
}
- var last arvados.Collection
- var filterTime time.Time
callCount := 0
- gettingExactTimestamp := false
- for {
- progress(callCount, expectCount)
- var page arvados.CollectionList
- err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+ for rows.Next() {
+ var coll arvados.Collection
+ var classesDesired []byte
+ err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
if err != nil {
+ rows.Close()
return err
}
- for _, coll := range page.Items {
- if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
- continue
- }
- callCount++
- err = f(coll)
- if err != nil {
- return err
- }
- last = coll
+ err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+ if err != nil {
+ rows.Close()
+ return err
}
- if len(page.Items) == 0 && !gettingExactTimestamp {
- break
- } else if last.ModifiedAt.IsZero() {
- return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
- } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
- // If we requested time>=X and never got a
- // time>X then we might not have received all
- // items with time==X yet. Switch to
- // gettingExactTimestamp mode (if we're not
- // there already), advancing our UUID
- // threshold with each request, until we get
- // an empty page.
- gettingExactTimestamp = true
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: "=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: ">",
- Operand: last.UUID,
- }}
- } else if gettingExactTimestamp {
- // This must be an empty page (in this mode,
- // an unequal timestamp is impossible) so we
- // can start getting pages of newer
- // collections.
- gettingExactTimestamp = false
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">",
- Operand: filterTime,
- }}
- } else {
- // In the normal case, we know we have seen
- // all collections with modtime<filterTime,
- // but we might not have seen all that have
- // modtime=filterTime. Hence we use >= instead
- // of > and skip the obvious overlapping item,
- // i.e., the last item on the previous
- // page. In some edge cases this can return
- // collections we have already seen, but
- // avoiding that would add overhead in the
- // overwhelmingly common cases, so we don't
- // bother.
- filterTime = last.ModifiedAt
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: "!=",
- Operand: last.UUID,
- }}
+ if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+ newestModifiedAt = coll.ModifiedAt
}
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ progress(callCount, expectCount)
+ }
+ rows.Close()
+ if err := rows.Err(); err != nil {
+ return err
}
- progress(callCount, expectCount)
-
if checkCount, err := countCollections(c, arvados.ResourceListParams{
Filters: []arvados.Filter{{
Attr: "modified_at",
Operator: "<=",
- Operand: filterTime}},
+ Operand: newestModifiedAt}},
IncludeTrash: true,
IncludeOldVersions: true,
}); err != nil {
return err
} else if callCount < checkCount {
- return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+ return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
}
return nil
@@ -184,7 +108,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
- err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+ err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
if coll.ModifiedAt.After(threshold) {
return io.EOF
}
@@ -203,11 +127,6 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
}
}()
- db, err := bal.db(cluster)
- if err != nil {
- return err
- }
-
var updated int64
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
@@ -221,25 +140,18 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
continue
}
repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
- tx, err := db.Beginx()
+ classes, err := json.Marshal(coll.StorageClassesDesired)
if err != nil {
- bal.logf("error opening transaction: %s", coll.UUID, err)
- cancel()
+ bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
continue
}
- classes, _ := json.Marshal(coll.StorageClassesDesired)
- _, err = tx.ExecContext(ctx, `update collections set
+ _, err = bal.DB.ExecContext(ctx, `update collections set
replication_confirmed=$1,
replication_confirmed_at=$2,
storage_classes_confirmed=$3,
storage_classes_confirmed_at=$2
where uuid=$4`,
repl, thresholdStr, classes, coll.UUID)
- if err != nil {
- tx.Rollback()
- } else {
- err = tx.Commit()
- }
if err != nil {
bal.logf("%s: update failed: %s", coll.UUID, err)
continue
@@ -252,17 +164,3 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
bal.logf("updated %d collections", updated)
return err
}
-
-func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
- db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
- if err != nil {
- return nil, err
- }
- if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
- db.SetMaxOpenConns(p)
- }
- if err := db.Ping(); err != nil {
- return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
- }
- return db, nil
-}
diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go
index 3ab9d07b2..f749bad6a 100644
--- a/services/keep-balance/collection_test.go
+++ b/services/keep-balance/collection_test.go
@@ -6,57 +6,34 @@ package main
import (
"context"
- "sync"
- "time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
check "gopkg.in/check.v1"
)
-// TestIdenticalTimestamps ensures EachCollection returns the same
-// set of collections for various page sizes -- even page sizes so
-// small that we get entire pages full of collections with identical
-// timestamps and exercise our gettingExactTimestamp cases.
-func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
- // pageSize==0 uses the default (large) page size.
- pageSizes := []int{0, 2, 3, 4, 5}
- got := make([][]string, len(pageSizes))
- var wg sync.WaitGroup
- for trial, pageSize := range pageSizes {
- wg.Add(1)
- go func(trial, pageSize int) {
- defer wg.Done()
- streak := 0
- longestStreak := 0
- var lastMod time.Time
- sawUUID := make(map[string]bool)
- err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
- if c.ModifiedAt.IsZero() {
- return nil
- }
- if sawUUID[c.UUID] {
- // dup
- return nil
- }
- got[trial] = append(got[trial], c.UUID)
- sawUUID[c.UUID] = true
- if lastMod == c.ModifiedAt {
- streak++
- if streak > longestStreak {
- longestStreak = streak
- }
- } else {
- streak = 0
- lastMod = c.ModifiedAt
- }
- return nil
- }, nil)
- c.Check(err, check.IsNil)
- c.Check(longestStreak > 25, check.Equals, true)
- }(trial, pageSize)
- }
- wg.Wait()
- for trial := 1; trial < len(pageSizes); trial++ {
- c.Check(got[trial], check.DeepEquals, got[0])
- }
+// TestMissedCollections exercises EachCollection's sanity check:
+// #collections processed >= #old collections that exist in database
+// after processing.
+func (s *integrationSuite) TestMissedCollections(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ c.Assert(err, check.IsNil)
+
+ defer db.Exec(`delete from collections where uuid = 'zzzzz-4zz18-404040404040404'`)
+ insertedOld := false
+ err = EachCollection(context.Background(), db, s.client, func(coll arvados.Collection) error {
+ if !insertedOld {
+ insertedOld = true
+ _, err := db.Exec(`insert into collections (uuid, created_at, updated_at, modified_at) values ('zzzzz-4zz18-404040404040404', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z')`)
+ return err
+ }
+ return nil
+ }, nil)
+ c.Check(err, check.ErrorMatches, `Retrieved .* collections .* but server now reports .* collections.*`)
}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index e4297bfe6..564e36a43 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -18,6 +18,7 @@ import (
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
@@ -27,6 +28,7 @@ var _ = check.Suite(&integrationSuite{})
type integrationSuite struct {
config *arvados.Cluster
+ db *sqlx.DB
client *arvados.Client
keepClient *keepclient.KeepClient
}
@@ -68,6 +70,8 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
c.Assert(err, check.Equals, nil)
s.config, err = cfg.GetCluster("")
c.Assert(err, check.Equals, nil)
+ s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+ c.Assert(err, check.IsNil)
s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
s.client = &arvados.Client{
@@ -90,6 +94,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
}
bal := &Balancer{
+ DB: s.db,
Logger: logger,
Metrics: newMetrics(prometheus.NewRegistry()),
}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 8b4ee84c7..80b1ed301 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -16,6 +16,7 @@ import (
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
+ "github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
@@ -78,6 +79,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
}
+ db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+ }
+ if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
+ }
+ err = db.Ping()
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+ }
+
if options.Logger == nil {
options.Logger = ctxlog.FromContext(ctx)
}
@@ -89,6 +102,7 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
Metrics: newMetrics(registry),
Logger: options.Logger,
Dumper: options.Dumper,
+ DB: db,
}
srv.Handler = &health.Handler{
Token: cluster.ManagementToken,
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index 9801a3fd4..b42fa23a3 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -12,6 +12,7 @@ import (
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
@@ -46,11 +47,13 @@ type Server struct {
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
+
+ DB *sqlx.DB
}
// CheckHealth implements service.Handler.
func (srv *Server) CheckHealth() error {
- return nil
+ return srv.DB.Ping()
}
// Done implements service.Handler.
@@ -75,6 +78,7 @@ func (srv *Server) run() {
func (srv *Server) runOnce() (*Balancer, error) {
bal := &Balancer{
+ DB: srv.DB,
Logger: srv.Logger,
Dumper: srv.Dumper,
Metrics: srv.Metrics,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list