[ARVADOS] created: 1.1.4-8-g3ce3d9f
Git user
git at public.curoverse.com
Thu Apr 19 10:59:57 EDT 2018
at 3ce3d9f884aed70cc84155554defe614a1bcfaaa (commit)
commit 3ce3d9f884aed70cc84155554defe614a1bcfaaa
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Apr 19 10:59:42 2018 -0400
12708: Pull blocks to desired storage classes.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
index 999b4e9..8a8330a 100644
--- a/sdk/go/arvados/collection.go
+++ b/sdk/go/arvados/collection.go
@@ -15,19 +15,22 @@ import (
// Collection is an arvados#collection resource.
type Collection struct {
- UUID string `json:"uuid,omitempty"`
- TrashAt *time.Time `json:"trash_at,omitempty"`
- ManifestText string `json:"manifest_text,omitempty"`
- UnsignedManifestText string `json:"unsigned_manifest_text,omitempty"`
- Name string `json:"name,omitempty"`
- CreatedAt *time.Time `json:"created_at,omitempty"`
- ModifiedAt *time.Time `json:"modified_at,omitempty"`
- PortableDataHash string `json:"portable_data_hash,omitempty"`
- ReplicationConfirmed *int `json:"replication_confirmed,omitempty"`
- ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
- ReplicationDesired *int `json:"replication_desired,omitempty"`
- DeleteAt *time.Time `json:"delete_at,omitempty"`
- IsTrashed bool `json:"is_trashed,omitempty"`
+ UUID string `json:"uuid,omitempty"`
+ TrashAt *time.Time `json:"trash_at,omitempty"`
+ ManifestText string `json:"manifest_text,omitempty"`
+ UnsignedManifestText string `json:"unsigned_manifest_text,omitempty"`
+ Name string `json:"name,omitempty"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+ ModifiedAt *time.Time `json:"modified_at,omitempty"`
+ PortableDataHash string `json:"portable_data_hash,omitempty"`
+ ReplicationConfirmed *int `json:"replication_confirmed,omitempty"`
+ ReplicationConfirmedAt *time.Time `json:"replication_confirmed_at,omitempty"`
+ ReplicationDesired *int `json:"replication_desired,omitempty"`
+ StorageClassesDesired []string `json:"storage_classes_desired,omitempty"`
+ StorageClassesConfirmed []string `json:"storage_classes_confirmed,omitempty"`
+ StorageClassesConfirmedAt *time.Time `json:"storage_classes_confirmed_at,omitempty"`
+ DeleteAt *time.Time `json:"delete_at,omitempty"`
+ IsTrashed bool `json:"is_trashed,omitempty"`
}
func (c Collection) resourceName() string {
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 32f36e0..7ad2d48 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -5,6 +5,8 @@
package main
import (
+ "bytes"
+ "crypto/md5"
"fmt"
"log"
"math"
@@ -48,10 +50,13 @@ type Balancer struct {
Dumper *log.Logger
MinMtime int64
- collScanned int
- serviceRoots map[string]string
- errors []error
- mutex sync.Mutex
+ classes []string
+ mountsByClass map[string]map[*KeepMount]bool
+ collScanned int
+ serviceRoots map[string]string
+ errors []error
+ stats balancerStats
+ mutex sync.Mutex
}
// Run performs a balance operation using the given config and
@@ -82,6 +87,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
if err != nil {
return
}
+
for _, srv := range bal.KeepServices {
err = srv.discoverMounts(&config.Client)
if err != nil {
@@ -338,7 +344,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
repl = *coll.ReplicationDesired
}
debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
- bal.BlockStateMap.IncreaseDesired(repl, blkids)
+ bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
return nil
}
@@ -352,7 +358,7 @@ func (bal *Balancer) ComputeChangeSets() {
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
defer timeMe(bal.Logger, "ComputeChangeSets")()
- bal.setupServiceRoots()
+ bal.setupCaches()
type balanceTask struct {
blkid arvados.SizedDigest
@@ -360,12 +366,13 @@ func (bal *Balancer) ComputeChangeSets() {
}
nWorkers := 1 + runtime.NumCPU()
todo := make(chan balanceTask, nWorkers)
+ results := make(chan balanceResult, 16)
var wg sync.WaitGroup
for i := 0; i < nWorkers; i++ {
wg.Add(1)
go func() {
for work := range todo {
- bal.balanceBlock(work.blkid, work.blk)
+ results <- bal.balanceBlock(work.blkid, work.blk)
}
wg.Done()
}()
@@ -377,14 +384,44 @@ func (bal *Balancer) ComputeChangeSets() {
}
})
close(todo)
- wg.Wait()
+ go func() {
+ wg.Wait()
+ close(results)
+ }()
+ bal.collectStatistics(results)
}
-func (bal *Balancer) setupServiceRoots() {
+func (bal *Balancer) setupCaches() {
bal.serviceRoots = make(map[string]string)
+ bal.classes = []string{"default"}
+ bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
for _, srv := range bal.KeepServices {
bal.serviceRoots[srv.UUID] = srv.UUID
+ for _, mnt := range srv.mounts {
+ // All mounts on a read-only service are
+ // effectively read-only.
+ mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly
+
+ if len(mnt.StorageClasses) == 0 {
+ bal.mountsByClass["default"][mnt] = true
+ continue
+ }
+ for _, class := range mnt.StorageClasses {
+ if mbc := bal.mountsByClass[class]; mbc == nil {
+ bal.classes = append(bal.classes, class)
+ bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
+ } else {
+ mbc[mnt] = true
+ }
+ }
+ }
}
+ // Consider classes in lexicographic order to avoid flapping
+ // between balancing runs. The outcome of the "prefer a mount
+ // we're already planning to use for a different storage
+ // class" case in balanceBlock depends on the order classes
+ // are considered.
+ sort.Strings(bal.classes)
}
const (
@@ -401,129 +438,213 @@ var changeName = map[int]string{
changeNone: "none",
}
+type balanceResult struct {
+ blk *BlockState
+ blkid arvados.SizedDigest
+ have int
+ want int
+}
+
// balanceBlock compares current state to desired state for a single
// block, and makes the appropriate ChangeSet calls.
-func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
+func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
debugf("balanceBlock: %v %+v", blkid, blk)
- // A slot is somewhere a replica could potentially be trashed
- // from, pulled from, or pulled to. Each KeepService gets
- // either one empty slot, or one or more non-empty slots.
type slot struct {
- srv *KeepService // never nil
- repl *Replica // nil if none found
+ mnt *KeepMount // never nil
+ repl *Replica // replica already stored here (or nil)
+ want bool // we should pull/leave a replica here
}
- // First, we build an ordered list of all slots worth
- // considering (including all slots where replicas have been
- // found, as well as all of the optimal slots for this block).
- // Then, when we consider each slot in that order, we will
- // have all of the information we need to make a decision
- // about that slot.
+ // Build a list of all slots (one per mounted volume).
+ var slots []slot
+ for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ var repl *Replica
+ for r := range blk.Replicas {
+ if blk.Replicas[r].KeepMount == mnt {
+ repl = &blk.Replicas[r]
+ }
+ }
+ // Initial value of "want" is "have, and can't
+ // delete". These untrashable replicas get
+ // prioritized when sorting slots: otherwise,
+ // non-optimal readonly copies would cause us
+ // to overreplicate.
+ slots = append(slots, slot{
+ mnt: mnt,
+ repl: repl,
+ want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
+ })
+ }
+ }
uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
- rendezvousOrder := make(map[*KeepService]int, len(uuids))
- slots := make([]slot, len(uuids))
+ srvRendezvous := make(map[*KeepService]int, len(uuids))
for i, uuid := range uuids {
srv := bal.KeepServices[uuid]
- rendezvousOrder[srv] = i
- slots[i].srv = srv
- }
-
- // Sort readonly replicas ahead of trashable ones. This way,
- // if a single service has excessive replicas, the ones we
- // encounter last (and therefore choose to delete) will be on
- // the writable volumes, where possible.
- //
- // TODO: within the trashable set, prefer the oldest replica
- // that doesn't have a timestamp collision with others.
- sort.Slice(blk.Replicas, func(i, j int) bool {
- mnt := blk.Replicas[i].KeepMount
- return mnt.ReadOnly || mnt.KeepService.ReadOnly
- })
+ srvRendezvous[srv] = i
+ }
+
+ // Below we set underreplicated=true if we find any storage
+ // class that's currently underreplicated -- in that case we
+ // won't want to trash any replicas.
+ underreplicated := false
- // Assign existing replicas to slots.
- for ri := range blk.Replicas {
- repl := &blk.Replicas[ri]
- srv := repl.KeepService
- slotIdx := rendezvousOrder[srv]
- if slots[slotIdx].repl != nil {
- // Additional replicas on a single server are
- // considered non-optimal. Within this
- // category, we don't try to optimize layout:
- // we just say the optimal order is the order
- // we encounter them.
- slotIdx = len(slots)
- slots = append(slots, slot{srv: srv})
+ unsafeToDelete := make(map[int64]bool, len(slots))
+ for _, class := range bal.classes {
+ desired := blk.Desired[class]
+ if desired == 0 {
+ continue
}
- slots[slotIdx].repl = repl
- }
-
- // number of replicas already found in positions better than
- // the position we're contemplating now.
- reportedBestRepl := 0
- // To be safe we assume two replicas with the same Mtime are
- // in fact the same replica being reported more than
- // once. len(uniqueBestRepl) is the number of distinct
- // replicas in the best rendezvous positions we've considered
- // so far.
- uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
- // pulls is the number of Pull changes we have already
- // requested. (For purposes of deciding whether to Pull to
- // rendezvous position N, we should assume all pulls we have
- // requested on rendezvous positions M<N will be successful.)
- pulls := 0
+ // Sort the slots by desirability.
+ sort.Slice(slots, func(i, j int) bool {
+ si, sj := slots[i], slots[j]
+ if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
+ // Prefer a mount that satisfies the
+ // desired class.
+ return bal.mountsByClass[class][si.mnt]
+ } else if wanti, wantj := si.want, si.want; wanti != wantj {
+ // Prefer a mount that will have a
+ // replica no matter what we do here
+ // -- either because it already has an
+ // untrashable replica, or because we
+ // already need it to satisfy a
+ // different storage class.
+ return slots[i].want
+ } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
+ // Prefer a better rendezvous
+ // position.
+ return orderi < orderj
+ } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
+ // Prefer a mount that already has a
+ // replica.
+ return repli
+ } else {
+ // If pull/trash turns out to be
+ // needed, distribute the
+ // new/remaining replicas uniformly
+ // across qualifying mounts on a given
+ // server.
+ return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
+ }
+ })
+
+ // Servers and mounts (with or without existing
+ // replicas) that are part of the best achievable
+ // layout for this storage class.
+ wantSrv := map[*KeepService]bool{}
+ wantMnt := map[*KeepMount]bool{}
+ // Positions (with existing replicas) that have been
+ // protected (via unsafeToDelete) to ensure we don't
+ // reduce replication below desired level when
+ // trashing replicas that aren't optimal positions for
+ // any storage class.
+ protMnt := map[*KeepMount]bool{}
+
+ // trySlot tries using a slot to meet requirements,
+ // and returns true if all requirements are met.
+ trySlot := func(i int) bool {
+ slot := slots[i]
+ if len(protMnt) < desired && slot.repl != nil {
+ unsafeToDelete[slot.repl.Mtime] = true
+ protMnt[slot.mnt] = true
+ }
+ if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+ slots[i].want = true
+ wantSrv[slot.mnt.KeepService] = true
+ wantMnt[slot.mnt] = true
+ }
+ return len(protMnt) >= desired && len(wantMnt) >= desired
+ }
+
+ // First try to achieve desired replication without
+ // using the same server twice.
+ done := false
+ for i := 0; i < len(slots) && !done; i++ {
+ if !wantSrv[slots[i].mnt.KeepService] {
+ done = trySlot(i)
+ }
+ }
+
+ // If that didn't suffice, do another pass without the
+ // "distinct services" restriction. (Achieving the
+ // desired volume replication on fewer than the
+ // desired number of services is better than
+ // underreplicating.)
+ for i := 0; i < len(slots) && !done; i++ {
+ done = trySlot(i)
+ }
+
+ if !underreplicated {
+ safe := 0
+ for _, slot := range slots {
+ if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
+ continue
+ }
+ if safe++; safe >= desired {
+ break
+ }
+ }
+ underreplicated = safe < desired
+ }
+ }
+
+ // TODO: If multiple replicas are trashable, prefer the oldest
+ // replica that doesn't have a timestamp collision with
+ // others.
+
+ var have, want int
+ for _, slot := range slots {
+ if slot.want {
+ want++
+ }
+ if slot.repl != nil {
+ have++
+ }
+ }
+
var changes []string
for _, slot := range slots {
- change := changeNone
- srv, repl := slot.srv, slot.repl
// TODO: request a Touch if Mtime is duplicated.
- if repl != nil {
- // This service has a replica. We should
- // delete it if [1] we already have enough
- // distinct replicas in better rendezvous
- // positions and [2] this replica's Mtime is
- // distinct from all of the better replicas'
- // Mtimes.
- if !srv.ReadOnly &&
- !repl.KeepMount.ReadOnly &&
- repl.Mtime < bal.MinMtime &&
- len(uniqueBestRepl) >= blk.Desired &&
- !uniqueBestRepl[repl.Mtime] {
- srv.AddTrash(Trash{
- SizedDigest: blkid,
- Mtime: repl.Mtime,
- })
- change = changeTrash
- } else {
- change = changeStay
- }
- uniqueBestRepl[repl.Mtime] = true
- reportedBestRepl++
- } else if pulls+reportedBestRepl < blk.Desired &&
- len(blk.Replicas) > 0 &&
- !srv.ReadOnly {
- // This service doesn't have a replica. We
- // should pull one to this server if we don't
- // already have enough (existing+requested)
- // replicas in better rendezvous positions.
- srv.AddPull(Pull{
+ var change int
+ switch {
+ case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
+ slot.mnt.KeepService.AddTrash(Trash{
SizedDigest: blkid,
- Source: blk.Replicas[0].KeepService,
+ Mtime: slot.repl.Mtime,
+ From: slot.mnt,
+ })
+ change = changeTrash
+ case len(blk.Replicas) == 0:
+ change = changeNone
+ case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
+ slot.mnt.KeepService.AddPull(Pull{
+ SizedDigest: blkid,
+ From: blk.Replicas[0].KeepMount.KeepService,
+ To: slot.mnt,
})
- pulls++
change = changePull
+ default:
+ change = changeStay
}
if bal.Dumper != nil {
var mtime int64
- if repl != nil {
- mtime = repl.Mtime
+ if slot.repl != nil {
+ mtime = slot.repl.Mtime
}
- changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime))
+ srv := slot.mnt.KeepService
+ changes = append(changes, fmt.Sprintf("%s:%d/%s=%s,%d", srv.ServiceHost, srv.ServicePort, slot.mnt.UUID, changeName[change], mtime))
}
}
if bal.Dumper != nil {
- bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
+ bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
+ }
+ return balanceResult{
+ blk: blk,
+ blkid: blkid,
+ have: have,
+ want: want,
}
}
@@ -544,23 +665,24 @@ type balancerStats struct {
replHistogram []int
}
-func (bal *Balancer) getStatistics() (s balancerStats) {
+func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
+ var s balancerStats
s.replHistogram = make([]int, 2)
- bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
- surplus := len(blk.Replicas) - blk.Desired
- bytes := blkid.Size()
+ for result := range results {
+ surplus := result.have - result.want
+ bytes := result.blkid.Size()
switch {
- case len(blk.Replicas) == 0 && blk.Desired > 0:
+ case result.have == 0 && result.want > 0:
s.lost.replicas -= surplus
s.lost.blocks++
s.lost.bytes += bytes * int64(-surplus)
- case len(blk.Replicas) < blk.Desired:
+ case surplus < 0:
s.underrep.replicas -= surplus
s.underrep.blocks++
s.underrep.bytes += bytes * int64(-surplus)
- case len(blk.Replicas) > 0 && blk.Desired == 0:
+ case surplus > 0 && result.want == 0:
counter := &s.garbage
- for _, r := range blk.Replicas {
+ for _, r := range result.blk.Replicas {
if r.Mtime >= bal.MinMtime {
counter = &s.unref
break
@@ -569,67 +691,66 @@ func (bal *Balancer) getStatistics() (s balancerStats) {
counter.replicas += surplus
counter.blocks++
counter.bytes += bytes * int64(surplus)
- case len(blk.Replicas) > blk.Desired:
+ case surplus > 0:
s.overrep.replicas += surplus
s.overrep.blocks++
- s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+ s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
default:
- s.justright.replicas += blk.Desired
+ s.justright.replicas += result.want
s.justright.blocks++
- s.justright.bytes += bytes * int64(blk.Desired)
+ s.justright.bytes += bytes * int64(result.want)
}
- if blk.Desired > 0 {
- s.desired.replicas += blk.Desired
+ if result.want > 0 {
+ s.desired.replicas += result.want
s.desired.blocks++
- s.desired.bytes += bytes * int64(blk.Desired)
+ s.desired.bytes += bytes * int64(result.want)
}
- if len(blk.Replicas) > 0 {
- s.current.replicas += len(blk.Replicas)
+ if len(result.blk.Replicas) > 0 {
+ s.current.replicas += len(result.blk.Replicas)
s.current.blocks++
- s.current.bytes += bytes * int64(len(blk.Replicas))
+ s.current.bytes += bytes * int64(len(result.blk.Replicas))
}
- for len(s.replHistogram) <= len(blk.Replicas) {
+ for len(s.replHistogram) <= len(result.blk.Replicas) {
s.replHistogram = append(s.replHistogram, 0)
}
- s.replHistogram[len(blk.Replicas)]++
- })
+ s.replHistogram[len(result.blk.Replicas)]++
+ }
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
s.trashes += len(srv.ChangeSet.Trashes)
}
- return
+ bal.stats = s
}
// PrintStatistics writes statistics about the computed changes to
// bal.Logger. It should not be called until ComputeChangeSets has
// finished.
func (bal *Balancer) PrintStatistics() {
- s := bal.getStatistics()
bal.logf("===")
- bal.logf("%s lost (0=have<want)", s.lost)
- bal.logf("%s underreplicated (0<have<want)", s.underrep)
- bal.logf("%s just right (have=want)", s.justright)
- bal.logf("%s overreplicated (have>want>0)", s.overrep)
- bal.logf("%s unreferenced (have>want=0, new)", s.unref)
- bal.logf("%s garbage (have>want=0, old)", s.garbage)
+ bal.logf("%s lost (0=have<want)", bal.stats.lost)
+ bal.logf("%s underreplicated (0<have<want)", bal.stats.underrep)
+ bal.logf("%s just right (have=want)", bal.stats.justright)
+ bal.logf("%s overreplicated (have>want>0)", bal.stats.overrep)
+ bal.logf("%s unreferenced (have>want=0, new)", bal.stats.unref)
+ bal.logf("%s garbage (have>want=0, old)", bal.stats.garbage)
bal.logf("===")
- bal.logf("%s total commitment (excluding unreferenced)", s.desired)
- bal.logf("%s total usage", s.current)
+ bal.logf("%s total commitment (excluding unreferenced)", bal.stats.desired)
+ bal.logf("%s total usage", bal.stats.current)
bal.logf("===")
for _, srv := range bal.KeepServices {
bal.logf("%s: %v\n", srv, srv.ChangeSet)
}
bal.logf("===")
- bal.printHistogram(s, 60)
+ bal.printHistogram(60)
bal.logf("===")
}
-func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
+func (bal *Balancer) printHistogram(hashColumns int) {
bal.logf("Replication level distribution (counting N replicas on a single server as N):")
maxCount := 0
- for _, count := range s.replHistogram {
+ for _, count := range bal.stats.replHistogram {
if maxCount < count {
maxCount = count
}
@@ -637,7 +758,7 @@ func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
hashes := strings.Repeat("#", hashColumns)
countWidth := 1 + int(math.Log10(float64(maxCount+1)))
scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
- for repl, count := range s.replHistogram {
+ for repl, count := range bal.stats.replHistogram {
nHashes := int(scaleCount * math.Log10(float64(count+1)))
bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
}
@@ -661,8 +782,11 @@ func (bal *Balancer) CheckSanityLate() error {
anyDesired := false
bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
- if blk.Desired > 0 {
- anyDesired = true
+ for _, desired := range blk.Desired {
+ if desired > 0 {
+ anyDesired = true
+ break
+ }
}
})
if !anyDesired {
@@ -729,3 +853,11 @@ func (bal *Balancer) logf(f string, args ...interface{}) {
bal.Logger.Printf(f, args...)
}
}
+
+// Rendezvous hash sort function. Less efficient than sorting on
+// precomputed rendezvous hashes, but also rarely used.
+func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
+ a := md5.Sum([]byte(string(blkid[:32]) + i))
+ b := md5.Sum([]byte(string(blkid[:32]) + j))
+ return bytes.Compare(a[:], b[:]) < 0
+}
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 08cfcce..28776ab 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -413,10 +413,9 @@ func (s *runSuite) TestDryRun(c *check.C) {
}
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
- stats := bal.getStatistics()
- c.Check(stats.pulls, check.Not(check.Equals), 0)
- c.Check(stats.underrep.replicas, check.Not(check.Equals), 0)
- c.Check(stats.overrep.replicas, check.Not(check.Equals), 0)
+ c.Check(bal.stats.pulls, check.Not(check.Equals), 0)
+ c.Check(bal.stats.underrep.replicas, check.Not(check.Equals), 0)
+ c.Check(bal.stats.overrep.replicas, check.Not(check.Equals), 0)
}
func (s *runSuite) TestCommit(c *check.C) {
@@ -438,12 +437,11 @@ func (s *runSuite) TestCommit(c *check.C) {
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
- stats := bal.getStatistics()
// "foo" block is overreplicated by 2
- c.Check(stats.trashes, check.Equals, 2)
+ c.Check(bal.stats.trashes, check.Equals, 2)
// "bar" block is underreplicated by 1, and its only copy is
// in a poor rendezvous position
- c.Check(stats.pulls, check.Equals, 2)
+ c.Check(bal.stats.pulls, check.Equals, 2)
}
func (s *runSuite) TestRunForever(c *check.C) {
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index 167e874..4741a58 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -41,11 +41,14 @@ type slots []int
type tester struct {
known int
- desired int
+ desired map[string]int
current slots
timestamps []int64
shouldPull slots
shouldTrash slots
+
+ shouldPullMounts []string
+ shouldTrashMounts []string
}
func (bal *balancerSuite) SetUpSuite(c *check.C) {
@@ -76,7 +79,12 @@ func (bal *balancerSuite) SetUpTest(c *check.C) {
UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
},
}
- srv.mounts = []*KeepMount{{KeepMount: arvados.KeepMount{UUID: fmt.Sprintf("mount-%015x", i)}, KeepService: srv}}
+ srv.mounts = []*KeepMount{{
+ KeepMount: arvados.KeepMount{
+ UUID: fmt.Sprintf("zzzzz-mount-%015x", i),
+ },
+ KeepService: srv,
+ }}
bal.srvs[i] = srv
bal.KeepServices[srv.UUID] = srv
}
@@ -86,7 +94,7 @@ func (bal *balancerSuite) SetUpTest(c *check.C) {
func (bal *balancerSuite) TestPerfect(c *check.C) {
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 1},
shouldPull: nil,
shouldTrash: nil})
@@ -94,21 +102,21 @@ func (bal *balancerSuite) TestPerfect(c *check.C) {
func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 2, 1},
shouldTrash: slots{2}})
}
func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
bal.try(c, tester{
- desired: 0,
+ desired: map[string]int{"default": 0},
current: slots{0, 1, 3},
shouldTrash: slots{0, 1, 3}})
}
func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
bal.try(c, tester{
- desired: 4,
+ desired: map[string]int{"default": 4},
current: slots{0, 1},
shouldPull: slots{2, 3}})
}
@@ -116,77 +124,83 @@ func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
bal.srvList(0, slots{3})[0].ReadOnly = true
bal.try(c, tester{
- desired: 4,
+ desired: map[string]int{"default": 4},
current: slots{0, 1},
shouldPull: slots{2, 4}})
}
func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{2, 0},
shouldPull: slots{1}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{2, 7},
shouldPull: slots{0, 1}})
// if only one of the pulls succeeds, we'll see this next:
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{2, 1, 7},
shouldPull: slots{0},
shouldTrash: slots{7}})
// if both pulls succeed, we'll see this next:
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{2, 0, 1, 7},
shouldTrash: slots{2, 7}})
// unbalanced + excessive replication => pull + trash
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{2, 5, 7},
shouldPull: slots{0, 1},
shouldTrash: slots{7}})
}
func (bal *balancerSuite) TestMultipleReplicasPerService(c *check.C) {
+ for _, srv := range bal.srvs {
+ for i := 0; i < 3; i++ {
+ m := *(srv.mounts[0])
+ srv.mounts = append(srv.mounts, &m)
+ }
+ }
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 0},
shouldPull: slots{1}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{2, 2},
shouldPull: slots{0, 1}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 0, 1},
shouldTrash: slots{0}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{1, 1, 0},
shouldTrash: slots{1}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{1, 0, 1, 0, 2},
shouldTrash: slots{0, 1, 2}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{1, 1, 1, 0, 2},
shouldTrash: slots{1, 1, 2}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{1, 1, 2},
shouldPull: slots{0},
shouldTrash: slots{1}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{1, 1, 0},
timestamps: []int64{12345678, 12345678, 12345679},
shouldTrash: nil})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{1, 1},
shouldPull: slots{0}})
}
@@ -195,7 +209,7 @@ func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
// For purposes of increasing replication, we assume identical
// replicas are distinct.
bal.try(c, tester{
- desired: 4,
+ desired: map[string]int{"default": 4},
current: slots{0, 1},
timestamps: []int64{12345678, 12345678},
shouldPull: slots{2, 3}})
@@ -205,11 +219,11 @@ func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
// For purposes of decreasing replication, we assume identical
// replicas are NOT distinct.
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 1, 2},
timestamps: []int64{12345678, 12345678, 12345678}})
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 1, 2},
timestamps: []int64{12345678, 10000000, 10000000}})
}
@@ -219,26 +233,140 @@ func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
newTime := bal.MinMtime + 3600
// The excess replica is too new to delete.
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 1, 2},
timestamps: []int64{oldTime, newTime, newTime + 1}})
// The best replicas are too new to delete, but the excess
// replica is old enough.
bal.try(c, tester{
- desired: 2,
+ desired: map[string]int{"default": 2},
current: slots{0, 1, 2},
timestamps: []int64{newTime, newTime + 1, oldTime},
shouldTrash: slots{2}})
}
+func (bal *balancerSuite) TestChangeStorageClasses(c *check.C) {
+ // For known blocks 0/1/2/3, server 9 is slot 9/1/14/0 in
+ // probe order. For these tests we give it two mounts, one
+ // with classes=[special], one with
+ // classes=[special,special2].
+ bal.srvs[9].mounts = []*KeepMount{{
+ KeepMount: arvados.KeepMount{
+ Replication: 1,
+ StorageClasses: []string{"special"},
+ UUID: "zzzzz-mount-special00000009",
+ DeviceID: "9-special",
+ },
+ KeepService: bal.srvs[9],
+ }, {
+ KeepMount: arvados.KeepMount{
+ Replication: 1,
+ StorageClasses: []string{"special", "special2"},
+ UUID: "zzzzz-mount-special20000009",
+ DeviceID: "9-special-and-special2",
+ },
+ KeepService: bal.srvs[9],
+ }}
+ // For known blocks 0/1/2/3, server 13 (d) is slot 5/3/11/1 in
+ // probe order. We give it two mounts, one with
+ // classes=[special3], one with classes=[default].
+ bal.srvs[13].mounts = []*KeepMount{{
+ KeepMount: arvados.KeepMount{
+ Replication: 1,
+ StorageClasses: []string{"special2"},
+ UUID: "zzzzz-mount-special2000000d",
+ DeviceID: "13-special2",
+ },
+ KeepService: bal.srvs[13],
+ }, {
+ KeepMount: arvados.KeepMount{
+ Replication: 1,
+ StorageClasses: []string{"default"},
+ UUID: "zzzzz-mount-00000000000000d",
+ DeviceID: "13-default",
+ },
+ KeepService: bal.srvs[13],
+ }}
+ // Pull to slot 9 because that's the only server with the
+ // desired class "special".
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2, "special": 1},
+ current: slots{0, 1},
+ shouldPull: slots{9},
+ shouldPullMounts: []string{"zzzzz-mount-special00000009"}})
+ // If some storage classes are not satisfied, don't trash any
+ // excess replicas. (E.g., if someone desires repl=1 on
+ // class=durable, and we have two copies on class=volatile, we
+ // should wait for pull to succeed before trashing anything).
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"special": 1},
+ current: slots{0, 1},
+ shouldPull: slots{9},
+ shouldPullMounts: []string{"zzzzz-mount-special00000009"}})
+ // Once storage classes are satisfied, trash excess replicas
+ // that appear earlier in probe order but aren't needed to
+ // satisfy the desired classes.
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"special": 1},
+ current: slots{0, 1, 9},
+ shouldTrash: slots{0, 1}})
+ // Pull to slot 5, the best server with class "special2".
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"special2": 1},
+ current: slots{0, 1},
+ shouldPull: slots{5},
+ shouldPullMounts: []string{"zzzzz-mount-special2000000d"}})
+ // Pull to slot 5 and 9 to get replication 2 in desired class
+ // "special2".
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"special2": 2},
+ current: slots{0, 1},
+ shouldPull: slots{5, 9},
+ shouldPullMounts: []string{"zzzzz-mount-special20000009", "zzzzz-mount-special2000000d"}})
+ // Slot 0 has a replica in "default", slot 1 has a replica
+ // in "special"; we need another replica in "default", i.e.,
+ // on slot 2.
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 2, "special": 1},
+ current: slots{0, 1},
+ shouldPull: slots{2}})
+ // Pull to best probe position 0 (despite wrong storage class)
+ // if it's impossible to achieve desired replication in the
+ // desired class (only slots 1 and 3 have special2).
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"special2": 3},
+ current: slots{3},
+ shouldPull: slots{0, 1}})
+ // Trash excess replica.
+ bal.try(c, tester{
+ known: 3,
+ desired: map[string]int{"special": 1},
+ current: slots{0, 1},
+ shouldTrash: slots{1}})
+ // Leave one copy on slot 1 because slot 0 (server 9) only
+ // gives us repl=1.
+ bal.try(c, tester{
+ known: 3,
+ desired: map[string]int{"special": 2},
+ current: slots{0, 1}})
+}
+
// Clear all servers' changesets, balance a single block, and verify
// the appropriate changes for that block have been added to the
// changesets.
func (bal *balancerSuite) try(c *check.C, t tester) {
- bal.setupServiceRoots()
+ bal.setupCaches()
blk := &BlockState{
+ Replicas: bal.replList(t.known, t.current),
Desired: t.desired,
- Replicas: bal.replList(t.known, t.current)}
+ }
for i, t := range t.timestamps {
blk.Replicas[i].Mtime = t
}
@@ -248,6 +376,7 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
bal.balanceBlock(knownBlkid(t.known), blk)
var didPull, didTrash slots
+ var didPullMounts, didTrashMounts []string
for i, srv := range bal.srvs {
var slot int
for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
@@ -257,10 +386,12 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
}
for _, pull := range srv.Pulls {
didPull = append(didPull, slot)
+ didPullMounts = append(didPullMounts, pull.To.UUID)
c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
}
for _, trash := range srv.Trashes {
didTrash = append(didTrash, slot)
+ didTrashMounts = append(didTrashMounts, trash.From.UUID)
c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
}
}
@@ -270,6 +401,14 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
}
c.Check(didPull, check.DeepEquals, t.shouldPull)
c.Check(didTrash, check.DeepEquals, t.shouldTrash)
+ if t.shouldPullMounts != nil {
+ sort.Strings(didPullMounts)
+ c.Check(didPullMounts, check.DeepEquals, t.shouldPullMounts)
+ }
+ if t.shouldTrashMounts != nil {
+ sort.Strings(didTrashMounts)
+ c.Check(didTrashMounts, check.DeepEquals, t.shouldTrashMounts)
+ }
}
// srvList returns the KeepServices, sorted in rendezvous order and
@@ -286,9 +425,14 @@ func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepSe
// replList is like srvList but returns an "existing replicas" slice,
// suitable for a BlockState test fixture.
func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
+ nextMnt := map[*KeepService]int{}
mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
for _, srv := range bal.srvList(knownBlockID, order) {
- repls = append(repls, Replica{srv.mounts[0], mtime})
+ // round-robin repls onto each srv's mounts
+ n := nextMnt[srv]
+ nextMnt[srv] = (n + 1) % len(srv.mounts)
+
+ repls = append(repls, Replica{srv.mounts[n], mtime})
mtime++
}
return
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index 958cdb5..6fd50f4 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -18,21 +18,31 @@ type Replica struct {
Mtime int64
}
-// BlockState indicates the number of desired replicas (according to
-// the collections we know about) and the replicas actually stored
-// (according to the keepstore indexes we know about).
+// BlockState indicates the desired storage class and number of
+// replicas (according to the collections we know about) and the
+// replicas actually stored (according to the keepstore indexes we
+// know about).
type BlockState struct {
Replicas []Replica
- Desired int
+ Desired map[string]int
}
+var defaultClasses = []string{"default"}
+
func (bs *BlockState) addReplica(r Replica) {
bs.Replicas = append(bs.Replicas, r)
}
-func (bs *BlockState) increaseDesired(n int) {
- if bs.Desired < n {
- bs.Desired = n
+func (bs *BlockState) increaseDesired(classes []string, n int) {
+ if len(classes) == 0 {
+ classes = defaultClasses
+ }
+ for _, class := range classes {
+ if bs.Desired == nil {
+ bs.Desired = map[string]int{class: n}
+ } else if d, ok := bs.Desired[class]; !ok || d < n {
+ bs.Desired[class] = n
+ }
}
}
@@ -88,12 +98,12 @@ func (bsm *BlockStateMap) AddReplicas(mnt *KeepMount, idx []arvados.KeepServiceI
}
// IncreaseDesired updates the map to indicate the desired replication
-// for the given blocks is at least n.
-func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
+// for the given blocks in the given storage class is at least n.
+func (bsm *BlockStateMap) IncreaseDesired(classes []string, n int, blocks []arvados.SizedDigest) {
bsm.mutex.Lock()
defer bsm.mutex.Unlock()
for _, blkid := range blocks {
- bsm.get(blkid).increaseDesired(n)
+ bsm.get(blkid).increaseDesired(classes, n)
}
}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
index f88cf8e..5437f76 100644
--- a/services/keep-balance/change_set.go
+++ b/services/keep-balance/change_set.go
@@ -16,25 +16,30 @@ import (
// store it locally.
type Pull struct {
arvados.SizedDigest
- Source *KeepService
+ From *KeepService
+ To *KeepMount
}
// MarshalJSON formats a pull request the way keepstore wants to see
// it.
func (p Pull) MarshalJSON() ([]byte, error) {
type KeepstorePullRequest struct {
- Locator string `json:"locator"`
- Servers []string `json:"servers"`
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+ MountUUID string `json:"mount_uuid"`
}
return json.Marshal(KeepstorePullRequest{
- Locator: string(p.SizedDigest[:32]),
- Servers: []string{p.Source.URLBase()}})
+ Locator: string(p.SizedDigest[:32]),
+ Servers: []string{p.From.URLBase()},
+ MountUUID: p.To.KeepMount.UUID,
+ })
}
// Trash is a request to delete a block.
type Trash struct {
arvados.SizedDigest
Mtime int64
+ From *KeepMount
}
// MarshalJSON formats a trash request the way keepstore wants to see
@@ -43,10 +48,13 @@ func (t Trash) MarshalJSON() ([]byte, error) {
type KeepstoreTrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
+ MountUUID string `json:"mount_uuid"`
}
return json.Marshal(KeepstoreTrashRequest{
Locator: string(t.SizedDigest[:32]),
- BlockMtime: t.Mtime})
+ BlockMtime: t.Mtime,
+ MountUUID: t.From.KeepMount.UUID,
+ })
}
// ChangeSet is a set of change requests that will be sent to a
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
index 5eb850d..6421a4d 100644
--- a/services/keep-balance/change_set_test.go
+++ b/services/keep-balance/change_set_test.go
@@ -17,6 +17,9 @@ var _ = check.Suite(&changeSetSuite{})
type changeSetSuite struct{}
func (s *changeSetSuite) TestJSONFormat(c *check.C) {
+ mnt := &KeepMount{
+ KeepMount: arvados.KeepMount{
+ UUID: "zzzzz-mount-abcdefghijklmno"}}
srv := &KeepService{
KeepService: arvados.KeepService{
UUID: "zzzzz-bi6l4-000000000000001",
@@ -27,13 +30,15 @@ func (s *changeSetSuite) TestJSONFormat(c *check.C) {
buf, err := json.Marshal([]Pull{{
SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
- Source: srv}})
+ To: mnt,
+ From: srv}})
c.Check(err, check.IsNil)
- c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"],"mount_uuid":"zzzzz-mount-abcdefghijklmno"}]`)
buf, err = json.Marshal([]Trash{{
SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+ From: mnt,
Mtime: 123456789}})
c.Check(err, check.IsNil)
- c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789,"mount_uuid":"zzzzz-mount-abcdefghijklmno"}]`)
}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 8b37b90..a84a84d 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -547,7 +547,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
pullq.ReplaceQueue(plist)
}
-// TrashRequest consists of a block locator and it's Mtime
+// TrashRequest consists of a block locator and its Mtime
type TrashRequest struct {
Locator string `json:"locator"`
BlockMtime int64 `json:"block_mtime"`
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list