[ARVADOS] updated: 5c27dc48c7007c1deff4b2877b2c81906df47015
Git user
git at public.curoverse.com
Thu May 19 08:03:41 EDT 2016
Summary of changes:
services/keep-balance/balance.go | 182 +++++++++++++++++++++------------------
1 file changed, 96 insertions(+), 86 deletions(-)
via 5c27dc48c7007c1deff4b2877b2c81906df47015 (commit)
from 87f2f44c70532acc70885c5b7f528ce4d45cc65d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 5c27dc48c7007c1deff4b2877b2c81906df47015
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 19 08:03:28 2016 -0400
9162: Sort funcs more logically. Add "stay" vs "none" distinction in dump.
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 38333fa..d1de5f9 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -34,27 +34,26 @@ type Balancer struct {
mutex sync.Mutex
}
-func (bal *Balancer) addCollection(coll arvados.Collection) error {
- blkids, err := coll.SizedDigests()
- if err != nil {
- bal.mutex.Lock()
- bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
- bal.mutex.Unlock()
- return nil
- }
- repl := bal.DefaultReplication
- if coll.ReplicationDesired != nil {
- repl = *coll.ReplicationDesired
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ for _, srv := range srvList.Items {
+ bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
}
- debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
- bal.BlockStateMap.IncreaseDesired(repl, blkids)
return nil
}
-func (bal *Balancer) logf(f string, args ...interface{}) {
- if bal.Logger != nil {
- bal.Logger.Printf(f, args...)
- }
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ return c.EachKeepService(func(srv arvados.KeepService) error {
+ for _, t := range okTypes {
+ if t == srv.ServiceType {
+ bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+ return nil
+ }
+ }
+ bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+ return nil
+ })
}
// CheckSanityEarly checks for configuration and runtime errors that
@@ -80,63 +79,6 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
return nil
}
-// CheckSanityLate checks for configuration and runtime errors after
-// GetCurrentState() and ComputeChangeSets() have finished.
-//
-// If it returns an error, it is dangerous to run any Commit methods.
-func (bal *Balancer) CheckSanityLate() error {
- if bal.errors != nil {
- for _, err := range bal.errors {
- bal.logf("deferred error: %v", err)
- }
- return fmt.Errorf("cannot proceed safely after deferred errors")
- }
-
- if bal.collScanned == 0 {
- return fmt.Errorf("received zero collections")
- }
-
- anyDesired := false
- bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
- if blk.Desired > 0 {
- anyDesired = true
- }
- })
- if !anyDesired {
- return fmt.Errorf("zero blocks have desired replication>0")
- }
-
- if dr := bal.DefaultReplication; dr < 1 {
- return fmt.Errorf("Default replication (%d) is less than 1", dr)
- }
-
- // TODO: no two services have identical indexes
- // TODO: no collisions (same md5, different size)
- return nil
-}
-
-func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
- bal.KeepServices = make(map[string]*KeepService)
- for _, srv := range srvList.Items {
- bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
- }
- return nil
-}
-
-func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
- bal.KeepServices = make(map[string]*KeepService)
- return c.EachKeepService(func(srv arvados.KeepService) error {
- for _, t := range okTypes {
- if t == srv.ServiceType {
- bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
- return nil
- }
- }
- bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
- return nil
- })
-}
-
// GetCurrentState determines the current replication state, and the
// desired replication level, for every block that is either
// retrievable or referenced.
@@ -172,7 +114,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
errs <- fmt.Errorf("%s: %v", srv, err)
return
}
- bal.logf("%s: add replicas to map", srv)
+ bal.logf("%s: add %d replicas to map", srv, len(idx))
bal.BlockStateMap.AddReplicas(srv, idx)
bal.logf("%s: done", srv)
}(srv)
@@ -225,11 +167,21 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
return <-errs
}
-func (bal *Balancer) setupServiceRoots() {
- bal.serviceRoots = make(map[string]string)
- for _, srv := range bal.KeepServices {
- bal.serviceRoots[srv.UUID] = srv.UUID
+func (bal *Balancer) addCollection(coll arvados.Collection) error {
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.mutex.Lock()
+ bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
+ bal.mutex.Unlock()
+ return nil
+ }
+ repl := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ repl = *coll.ReplicationDesired
}
+ debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.BlockStateMap.IncreaseDesired(repl, blkids)
+ return nil
}
// ComputeChangeSets compares, for each known block, the current and
@@ -239,6 +191,8 @@ func (bal *Balancer) setupServiceRoots() {
//
// It does not actually apply any of the computed changes.
func (bal *Balancer) ComputeChangeSets() {
+ // This just calls balanceBlock() once for each block, using a
+ // pool of worker goroutines.
defer timeMe(bal.Logger, "ComputeChangeSets")()
bal.setupServiceRoots()
@@ -267,16 +221,25 @@ func (bal *Balancer) ComputeChangeSets() {
wg.Wait()
}
+func (bal *Balancer) setupServiceRoots() {
+ bal.serviceRoots = make(map[string]string)
+ for _, srv := range bal.KeepServices {
+ bal.serviceRoots[srv.UUID] = srv.UUID
+ }
+}
+
const (
- changeNothing = iota
+ changeStay = iota
changePull
changeTrash
+ changeNone
)
var changeName = map[int]string{
- changeNothing: "leave",
- changePull: "pull",
- changeTrash: "trash",
+ changeStay: "stay",
+ changePull: "pull",
+ changeTrash: "trash",
+ changeNone: "none",
}
// balanceBlock compares current state to desired state for a single
@@ -287,6 +250,9 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
hasRepl := make(map[string]Replica, len(bal.serviceRoots))
for _, repl := range blk.Replicas {
hasRepl[repl.UUID] = repl
+ // TODO: when multiple copies are on one server, use
+ // the oldest one that doesn't have a timestamp
+ // collision with other replicas.
}
// number of replicas already found in positions better than
// the position we're contemplating now.
@@ -304,10 +270,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
pulls := 0
var changes []string
for _, uuid := range uuids {
- change := changeNothing
+ change := changeNone
srv := bal.KeepServices[uuid]
// TODO: request a Touch if Mtime is duplicated.
- if repl, ok := hasRepl[srv.UUID]; ok {
+ repl, ok := hasRepl[srv.UUID]
+ if ok {
// This service has a replica. We should
// delete it if [1] we already have enough
// distinct replicas in better rendezvous
@@ -323,6 +290,8 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
Mtime: repl.Mtime,
})
change = changeTrash
+ } else {
+ change = changeStay
}
uniqueBestRepl[repl.Mtime] = true
reportedBestRepl++
@@ -341,7 +310,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
change = changePull
}
if bal.Dumper != nil {
- changes = append(changes, changeName[change]+":"+uuid)
+ changes = append(changes, fmt.Sprintf("%s:%s:%d", changeName[change], uuid, repl.Mtime))
}
}
if bal.Dumper != nil {
@@ -439,6 +408,41 @@ func (bal *Balancer) PrintStatistics() {
bal.logf("===")
}
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+//
+// If it returns an error, it is dangerous to run any Commit methods.
+func (bal *Balancer) CheckSanityLate() error {
+ if bal.errors != nil {
+ for _, err := range bal.errors {
+ bal.logf("deferred error: %v", err)
+ }
+ return fmt.Errorf("cannot proceed safely after deferred errors")
+ }
+
+ if bal.collScanned == 0 {
+ return fmt.Errorf("received zero collections")
+ }
+
+ anyDesired := false
+ bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
+ if blk.Desired > 0 {
+ anyDesired = true
+ }
+ })
+ if !anyDesired {
+ return fmt.Errorf("zero blocks have desired replication>0")
+ }
+
+ if dr := bal.DefaultReplication; dr < 1 {
+ return fmt.Errorf("Default replication (%d) is less than 1", dr)
+ }
+
+ // TODO: no two services have identical indexes
+ // TODO: no collisions (same md5, different size)
+ return nil
+}
+
// CommitPulls sends the computed lists of pull requests to the
// keepstore servers. This has the effect of increasing replication of
// existing blocks that are either underreplicated or poorly
@@ -487,3 +491,9 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
close(errs)
return lastErr
}
+
+func (bal *Balancer) logf(f string, args ...interface{}) {
+ if bal.Logger != nil {
+ bal.Logger.Printf(f, args...)
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list