[ARVADOS] updated: 2.1.0-1103-g1657c7fbb
Git user
git at public.arvados.org
Wed Jul 28 20:13:07 UTC 2021
Summary of changes:
services/keep-balance/collection.go | 120 +++++++++++++++++++++++++++++++-----
1 file changed, 103 insertions(+), 17 deletions(-)
via 1657c7fbb89e790655f830630f60c68ceaf8569f (commit)
from 46a33e285d6179ebd5041733f98949a23147d55d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1657c7fbb89e790655f830630f60c68ceaf8569f
Author: Tom Clegg <tom at curii.com>
Date: Wed Jul 28 16:11:52 2021 -0400
17574: Batch updates into transactions, skip when unchanged.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index 6e0a066e8..daedeb8bf 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -47,24 +47,36 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
}
var newestModifiedAt time.Time
- rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+ rows, err := db.QueryxContext(ctx, `SELECT
+ uuid, manifest_text, modified_at, portable_data_hash,
+ replication_desired, replication_confirmed, replication_confirmed_at,
+ storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+ is_trashed
+ FROM collections`)
if err != nil {
return err
}
+ defer rows.Close()
progressTicker := time.NewTicker(10 * time.Second)
defer progressTicker.Stop()
callCount := 0
for rows.Next() {
var coll arvados.Collection
- var classesDesired []byte
- err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
+ var classesDesired, classesConfirmed []byte
+ err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+ &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+ &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+ &coll.IsTrashed)
if err != nil {
- rows.Close()
return err
}
+
err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
- if err != nil {
- rows.Close()
+ if err != nil && len(classesDesired) > 0 {
+ return err
+ }
+ err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+ if err != nil && len(classesConfirmed) > 0 {
return err
}
if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
@@ -82,8 +94,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
}
}
progress(callCount, expectCount)
- rows.Close()
- if err := rows.Err(); err != nil {
+ err = rows.Close()
+ if err != nil {
return err
}
if checkCount, err := countCollections(c, arvados.ResourceListParams{
@@ -112,7 +124,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
updated := int64(0)
- var err error
+ errs := make(chan error, 1)
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
@@ -125,20 +137,46 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
collQ <- coll
return nil
}, func(done, total int) {
- bal.logf("update collections: %d/%d", done, total)
+ bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
})
if err != nil && err != context.Canceled {
- bal.logf("error updating collections: %s", err)
+ select {
+ case errs <- err:
+ default:
+ }
}
}()
var wg sync.WaitGroup
- for i := 0; i < runtime.NumCPU(); i++ {
+
+ // Use about 1 goroutine per 2 CPUs. Based on experiments with
+ // a 2-core host, using more concurrent database
+ // calls/transactions makes this process slower, not faster.
+ for i := 0; i < runtime.NumCPU()+1/2; i++ {
wg.Add(1)
- go func() {
+ goSendErr(errs, func() error {
defer wg.Done()
+ tx, err := bal.DB.Beginx()
+ if err != nil {
+ return err
+ }
+ txPending := 0
+ flush := func(final bool) error {
+ err := tx.Commit()
+ if err != nil {
+ tx.Rollback()
+ return err
+ }
+ txPending = 0
+ if final {
+ return nil
+ }
+ tx, err = bal.DB.Beginx()
+ return err
+ }
+ txBatch := 100
for coll := range collQ {
- if ctx.Err() != nil {
+ if ctx.Err() != nil || len(errs) > 0 {
continue
}
blkids, err := coll.SizedDigests()
@@ -147,12 +185,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
continue
}
repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+
+ desired := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ desired = *coll.ReplicationDesired
+ }
+ if repl > desired {
+ // If actual>desired, confirm
+ // the desired number rather
+ // than actual to avoid
+ // flapping updates when
+ // replication increases
+ // temporarily.
+ repl = desired
+ }
classes, err := json.Marshal(coll.StorageClassesDesired)
if err != nil {
bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
continue
}
- _, err = bal.DB.ExecContext(ctx, `update collections set
+ needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+ for i := range coll.StorageClassesDesired {
+ if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+ needUpdate = true
+ }
+ }
+ if !needUpdate {
+ continue
+ }
+ _, err = tx.ExecContext(ctx, `update collections set
replication_confirmed=$1,
replication_confirmed_at=$2,
storage_classes_confirmed=$3,
@@ -166,10 +227,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
continue
}
atomic.AddInt64(&updated, 1)
+ if txPending++; txPending >= txBatch {
+ err = flush(false)
+ if err != nil {
+ return err
+ }
+ }
}
- }()
+ return flush(true)
+ })
}
wg.Wait()
bal.logf("updated %d collections", updated)
- return err
+ if err := <-errs; err != nil {
+ return fmt.Errorf("error updating collections: %s", err)
+ }
+ return nil
+}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+ go func() {
+ err := f()
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list