[ARVADOS] updated: 2.1.0-1103-g1657c7fbb

Git user git at public.arvados.org
Wed Jul 28 20:13:07 UTC 2021


Summary of changes:
 services/keep-balance/collection.go | 120 +++++++++++++++++++++++++++++++-----
 1 file changed, 103 insertions(+), 17 deletions(-)

       via  1657c7fbb89e790655f830630f60c68ceaf8569f (commit)
      from  46a33e285d6179ebd5041733f98949a23147d55d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1657c7fbb89e790655f830630f60c68ceaf8569f
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jul 28 16:11:52 2021 -0400

    17574: Batch updates into transactions, skip when unchanged.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index 6e0a066e8..daedeb8bf 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -47,24 +47,36 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
 	}
 	var newestModifiedAt time.Time
 
-	rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
+	rows, err := db.QueryxContext(ctx, `SELECT
+		uuid, manifest_text, modified_at, portable_data_hash,
+		replication_desired, replication_confirmed, replication_confirmed_at,
+		storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+		is_trashed
+		FROM collections`)
 	if err != nil {
 		return err
 	}
+	defer rows.Close()
 	progressTicker := time.NewTicker(10 * time.Second)
 	defer progressTicker.Stop()
 	callCount := 0
 	for rows.Next() {
 		var coll arvados.Collection
-		var classesDesired []byte
-		err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
+		var classesDesired, classesConfirmed []byte
+		err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+			&coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+			&classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+			&coll.IsTrashed)
 		if err != nil {
-			rows.Close()
 			return err
 		}
+
 		err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
-		if err != nil {
-			rows.Close()
+		if err != nil && len(classesDesired) > 0 {
+			return err
+		}
+		err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+		if err != nil && len(classesConfirmed) > 0 {
 			return err
 		}
 		if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
@@ -82,8 +94,8 @@ func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(
 		}
 	}
 	progress(callCount, expectCount)
-	rows.Close()
-	if err := rows.Err(); err != nil {
+	err = rows.Close()
+	if err != nil {
 		return err
 	}
 	if checkCount, err := countCollections(c, arvados.ResourceListParams{
@@ -112,7 +124,7 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 
 	updated := int64(0)
 
-	var err error
+	errs := make(chan error, 1)
 	collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
 	go func() {
 		defer close(collQ)
@@ -125,20 +137,46 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 			collQ <- coll
 			return nil
 		}, func(done, total int) {
-			bal.logf("update collections: %d/%d", done, total)
+			bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
 		})
 		if err != nil && err != context.Canceled {
-			bal.logf("error updating collections: %s", err)
+			select {
+			case errs <- err:
+			default:
+			}
 		}
 	}()
 
 	var wg sync.WaitGroup
-	for i := 0; i < runtime.NumCPU(); i++ {
+
+	// Use about 1 goroutine per 2 CPUs. Based on experiments with
+	// a 2-core host, using more concurrent database
+	// calls/transactions makes this process slower, not faster.
+	for i := 0; i < runtime.NumCPU()+1/2; i++ {
 		wg.Add(1)
-		go func() {
+		goSendErr(errs, func() error {
 			defer wg.Done()
+			tx, err := bal.DB.Beginx()
+			if err != nil {
+				return err
+			}
+			txPending := 0
+			flush := func(final bool) error {
+				err := tx.Commit()
+				if err != nil {
+					tx.Rollback()
+					return err
+				}
+				txPending = 0
+				if final {
+					return nil
+				}
+				tx, err = bal.DB.Beginx()
+				return err
+			}
+			txBatch := 100
 			for coll := range collQ {
-				if ctx.Err() != nil {
+				if ctx.Err() != nil || len(errs) > 0 {
 					continue
 				}
 				blkids, err := coll.SizedDigests()
@@ -147,12 +185,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 					continue
 				}
 				repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+
+				desired := bal.DefaultReplication
+				if coll.ReplicationDesired != nil {
+					desired = *coll.ReplicationDesired
+				}
+				if repl > desired {
+					// If actual>desired, confirm
+					// the desired number rather
+					// than actual to avoid
+					// flapping updates when
+					// replication increases
+					// temporarily.
+					repl = desired
+				}
 				classes, err := json.Marshal(coll.StorageClassesDesired)
 				if err != nil {
 					bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
 					continue
 				}
-				_, err = bal.DB.ExecContext(ctx, `update collections set
+				needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+				for i := range coll.StorageClassesDesired {
+					if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+						needUpdate = true
+					}
+				}
+				if !needUpdate {
+					continue
+				}
+				_, err = tx.ExecContext(ctx, `update collections set
 					replication_confirmed=$1,
 					replication_confirmed_at=$2,
 					storage_classes_confirmed=$3,
@@ -166,10 +227,35 @@ func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, c
 					continue
 				}
 				atomic.AddInt64(&updated, 1)
+				if txPending++; txPending >= txBatch {
+					err = flush(false)
+					if err != nil {
+						return err
+					}
+				}
 			}
-		}()
+			return flush(true)
+		})
 	}
 	wg.Wait()
 	bal.logf("updated %d collections", updated)
-	return err
+	if err := <-errs; err != nil {
+		return fmt.Errorf("error updating collections: %s", err)
+	}
+	return nil
+}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+	go func() {
+		err := f()
+		if err != nil {
+			select {
+			case errs <- err:
+			default:
+			}
+		}
+	}()
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list