[ARVADOS] updated: 5c27dc48c7007c1deff4b2877b2c81906df47015

Git user git at public.curoverse.com
Thu May 19 08:03:41 EDT 2016


Summary of changes:
 services/keep-balance/balance.go | 182 +++++++++++++++++++++------------------
 1 file changed, 96 insertions(+), 86 deletions(-)

       via  5c27dc48c7007c1deff4b2877b2c81906df47015 (commit)
      from  87f2f44c70532acc70885c5b7f528ce4d45cc65d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 5c27dc48c7007c1deff4b2877b2c81906df47015
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 19 08:03:28 2016 -0400

    9162: Sort funcs more logically. Add "stay" vs "none" distinction in dump.

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 38333fa..d1de5f9 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -34,27 +34,26 @@ type Balancer struct {
 	mutex        sync.Mutex
 }
 
-func (bal *Balancer) addCollection(coll arvados.Collection) error {
-	blkids, err := coll.SizedDigests()
-	if err != nil {
-		bal.mutex.Lock()
-		bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
-		bal.mutex.Unlock()
-		return nil
-	}
-	repl := bal.DefaultReplication
-	if coll.ReplicationDesired != nil {
-		repl = *coll.ReplicationDesired
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+	bal.KeepServices = make(map[string]*KeepService)
+	for _, srv := range srvList.Items {
+		bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
 	}
-	debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
-	bal.BlockStateMap.IncreaseDesired(repl, blkids)
 	return nil
 }
 
-func (bal *Balancer) logf(f string, args ...interface{}) {
-	if bal.Logger != nil {
-		bal.Logger.Printf(f, args...)
-	}
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+	bal.KeepServices = make(map[string]*KeepService)
+	return c.EachKeepService(func(srv arvados.KeepService) error {
+		for _, t := range okTypes {
+			if t == srv.ServiceType {
+				bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+				return nil
+			}
+		}
+		bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+		return nil
+	})
 }
 
 // CheckSanityEarly checks for configuration and runtime errors that
@@ -80,63 +79,6 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
 	return nil
 }
 
-// CheckSanityLate checks for configuration and runtime errors after
-// GetCurrentState() and ComputeChangeSets() have finished.
-//
-// If it returns an error, it is dangerous to run any Commit methods.
-func (bal *Balancer) CheckSanityLate() error {
-	if bal.errors != nil {
-		for _, err := range bal.errors {
-			bal.logf("deferred error: %v", err)
-		}
-		return fmt.Errorf("cannot proceed safely after deferred errors")
-	}
-
-	if bal.collScanned == 0 {
-		return fmt.Errorf("received zero collections")
-	}
-
-	anyDesired := false
-	bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
-		if blk.Desired > 0 {
-			anyDesired = true
-		}
-	})
-	if !anyDesired {
-		return fmt.Errorf("zero blocks have desired replication>0")
-	}
-
-	if dr := bal.DefaultReplication; dr < 1 {
-		return fmt.Errorf("Default replication (%d) is less than 1", dr)
-	}
-
-	// TODO: no two services have identical indexes
-	// TODO: no collisions (same md5, different size)
-	return nil
-}
-
-func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
-	bal.KeepServices = make(map[string]*KeepService)
-	for _, srv := range srvList.Items {
-		bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
-	}
-	return nil
-}
-
-func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
-	bal.KeepServices = make(map[string]*KeepService)
-	return c.EachKeepService(func(srv arvados.KeepService) error {
-		for _, t := range okTypes {
-			if t == srv.ServiceType {
-				bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
-				return nil
-			}
-		}
-		bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
-		return nil
-	})
-}
-
 // GetCurrentState determines the current replication state, and the
 // desired replication level, for every block that is either
 // retrievable or referenced.
@@ -172,7 +114,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
 				errs <- fmt.Errorf("%s: %v", srv, err)
 				return
 			}
-			bal.logf("%s: add replicas to map", srv)
+			bal.logf("%s: add %d replicas to map", srv, len(idx))
 			bal.BlockStateMap.AddReplicas(srv, idx)
 			bal.logf("%s: done", srv)
 		}(srv)
@@ -225,11 +167,21 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
 	return <-errs
 }
 
-func (bal *Balancer) setupServiceRoots() {
-	bal.serviceRoots = make(map[string]string)
-	for _, srv := range bal.KeepServices {
-		bal.serviceRoots[srv.UUID] = srv.UUID
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+	blkids, err := coll.SizedDigests()
+	if err != nil {
+		bal.mutex.Lock()
+		bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+		bal.mutex.Unlock()
+		return nil
+	}
+	repl := bal.DefaultReplication
+	if coll.ReplicationDesired != nil {
+		repl = *coll.ReplicationDesired
 	}
+	debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+	bal.BlockStateMap.IncreaseDesired(repl, blkids)
+	return nil
 }
 
 // ComputeChangeSets compares, for each known block, the current and
@@ -239,6 +191,8 @@ func (bal *Balancer) setupServiceRoots() {
 //
 // It does not actually apply any of the computed changes.
 func (bal *Balancer) ComputeChangeSets() {
+	// This just calls balanceBlock() once for each block, using a
+	// pool of worker goroutines.
 	defer timeMe(bal.Logger, "ComputeChangeSets")()
 	bal.setupServiceRoots()
 
@@ -267,16 +221,25 @@ func (bal *Balancer) ComputeChangeSets() {
 	wg.Wait()
 }
 
+func (bal *Balancer) setupServiceRoots() {
+	bal.serviceRoots = make(map[string]string)
+	for _, srv := range bal.KeepServices {
+		bal.serviceRoots[srv.UUID] = srv.UUID
+	}
+}
+
 const (
-	changeNothing = iota
+	changeStay = iota
 	changePull
 	changeTrash
+	changeNone
 )
 
 var changeName = map[int]string{
-	changeNothing: "leave",
-	changePull:    "pull",
-	changeTrash:   "trash",
+	changeStay:  "stay",
+	changePull:  "pull",
+	changeTrash: "trash",
+	changeNone:  "none",
 }
 
 // balanceBlock compares current state to desired state for a single
@@ -287,6 +250,9 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
 	for _, repl := range blk.Replicas {
 		hasRepl[repl.UUID] = repl
+		// TODO: when multiple copies are on one server, use
+		// the oldest one that doesn't have a timestamp
+		// collision with other replicas.
 	}
 	// number of replicas already found in positions better than
 	// the position we're contemplating now.
@@ -304,10 +270,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	pulls := 0
 	var changes []string
 	for _, uuid := range uuids {
-		change := changeNothing
+		change := changeNone
 		srv := bal.KeepServices[uuid]
 		// TODO: request a Touch if Mtime is duplicated.
-		if repl, ok := hasRepl[srv.UUID]; ok {
+		repl, ok := hasRepl[srv.UUID]
+		if ok {
 			// This service has a replica. We should
 			// delete it if [1] we already have enough
 			// distinct replicas in better rendezvous
@@ -323,6 +290,8 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 					Mtime:       repl.Mtime,
 				})
 				change = changeTrash
+			} else {
+				change = changeStay
 			}
 			uniqueBestRepl[repl.Mtime] = true
 			reportedBestRepl++
@@ -341,7 +310,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 			change = changePull
 		}
 		if bal.Dumper != nil {
-			changes = append(changes, changeName[change]+":"+uuid)
+			changes = append(changes, fmt.Sprintf("%s:%s:%d", changeName[change], uuid, repl.Mtime))
 		}
 	}
 	if bal.Dumper != nil {
@@ -439,6 +408,41 @@ func (bal *Balancer) PrintStatistics() {
 	bal.logf("===")
 }
 
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+//
+// If it returns an error, it is dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityLate() error {
+	if bal.errors != nil {
+		for _, err := range bal.errors {
+			bal.logf("deferred error: %v", err)
+		}
+		return fmt.Errorf("cannot proceed safely after deferred errors")
+	}
+
+	if bal.collScanned == 0 {
+		return fmt.Errorf("received zero collections")
+	}
+
+	anyDesired := false
+	bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+		if blk.Desired > 0 {
+			anyDesired = true
+		}
+	})
+	if !anyDesired {
+		return fmt.Errorf("zero blocks have desired replication>0")
+	}
+
+	if dr := bal.DefaultReplication; dr < 1 {
+		return fmt.Errorf("Default replication (%d) is less than 1", dr)
+	}
+
+	// TODO: no two services have identical indexes
+	// TODO: no collisions (same md5, different size)
+	return nil
+}
+
 // CommitPulls sends the computed lists of pull requests to the
 // keepstore servers. This has the effect of increasing replication of
 // existing blocks that are either underreplicated or poorly
@@ -487,3 +491,9 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
 	close(errs)
 	return lastErr
 }
+
+func (bal *Balancer) logf(f string, args ...interface{}) {
+	if bal.Logger != nil {
+		bal.Logger.Printf(f, args...)
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list