[ARVADOS] updated: abe6659e55121a45342bbacd2651c7b51cd7d4f8
Git user
git at public.curoverse.com
Mon May 16 23:39:57 EDT 2016
Summary of changes:
services/keep-balance/balance.go | 16 ++-
services/keep-balance/balance_test.go | 202 ++++++++++++++++++++-----------
services/keep-balance/change_set.go | 25 ++++
services/keep-balance/change_set_test.go | 35 ++++++
services/keep-balance/keep_service.go | 6 +
5 files changed, 207 insertions(+), 77 deletions(-)
create mode 100644 services/keep-balance/change_set_test.go
via abe6659e55121a45342bbacd2651c7b51cd7d4f8 (commit)
from fa93bba9f4496b2b52ded5234e190f99a3d4fc58 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit abe6659e55121a45342bbacd2651c7b51cd7d4f8
Author: Tom Clegg <tom at curoverse.com>
Date: Mon May 16 23:39:36 2016 -0400
9162: Add tests. Do not pull more replicas just because mtimes are equal
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d0f708a..25e8fef 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -238,6 +238,9 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
for _, repl := range blk.Replicas {
hasRepl[repl.UUID] = repl
}
+ // number of replicas already found in positions better than
+ // the position we're contemplating now.
+ reportedBestRepl := 0
// To be safe we assume two replicas with the same Mtime are
// in fact the same replica being reported more than
// once. len(uniqueBestRepl) is the number of distinct
@@ -257,17 +260,20 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
// delete it if [1] we already have enough
// distinct replicas in better rendezvous
// positions and [2] this replica's Mtime is
- // distinct from the better replicas' Mtime.
- if len(uniqueBestRepl) >= blk.Desired &&
- !uniqueBestRepl[repl.Mtime] &&
- repl.Mtime < bal.MinMtime {
+ // distinct from all of the better replicas'
+ // Mtimes.
+ if !srv.ReadOnly &&
+ repl.Mtime < bal.MinMtime &&
+ len(uniqueBestRepl) >= blk.Desired &&
+ !uniqueBestRepl[repl.Mtime] {
srv.AddTrash(Trash{
SizedDigest: blkid,
Mtime: repl.Mtime,
})
}
uniqueBestRepl[repl.Mtime] = true
- } else if pulls+len(uniqueBestRepl) < blk.Desired &&
+ reportedBestRepl++
+ } else if pulls+reportedBestRepl < blk.Desired &&
len(blk.Replicas) > 0 &&
!srv.ReadOnly {
// This service doesn't have a replica. We
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index 7d90302..1facb7c 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -6,6 +6,7 @@ import (
"sort"
"strconv"
"testing"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
@@ -24,6 +25,7 @@ type balancerSuite struct {
srvs []*KeepService
blks map[string]tester
knownRendezvous [][]int
+ signatureTTL int64
}
const (
@@ -31,11 +33,15 @@ const (
known0 = 0
)
+type slots []int
+
type tester struct {
known int
- blk *BlockState
- shouldPull []*KeepService
- shouldTrash []*KeepService
+ desired int
+ current slots
+ timestamps []int64
+ shouldPull slots
+ shouldTrash slots
}
type serviceByUUID []*KeepService
@@ -45,18 +51,6 @@ func (s serviceByUUID) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s serviceByUUID) Less(i, j int) bool { return s[i].UUID < s[j].UUID }
func (bal *balancerSuite) SetUpSuite(c *check.C) {
- bal.srvs = make([]*KeepService, 16)
- bal.KeepServices = make(map[string]*KeepService)
- for i := range bal.srvs {
- srv := &KeepService{
- KeepService: arvados.KeepService{
- UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
- },
- }
- bal.srvs[i] = srv
- bal.KeepServices[srv.UUID] = srv
- }
-
bal.knownRendezvous = nil
for _, str := range []string{
"3eab2d5fc9681074",
@@ -64,99 +58,163 @@ func (bal *balancerSuite) SetUpSuite(c *check.C) {
"c5b4e023f8a7d691",
"9d81c02e76a3bf54",
} {
- var probes []int
+ var slots []int
for _, c := range []byte(str) {
pos, _ := strconv.ParseUint(string(c), 16, 4)
- probes = append(probes, int(pos))
+ slots = append(slots, int(pos))
}
- bal.knownRendezvous = append(bal.knownRendezvous, probes)
+ bal.knownRendezvous = append(bal.knownRendezvous, slots)
}
+
+ bal.signatureTTL = 3600
}
func (bal *balancerSuite) SetUpTest(c *check.C) {
- // Clear any changes from previous test case
- for _, srv := range bal.srvs {
- srv.ChangeSet = ChangeSet{}
+ bal.srvs = make([]*KeepService, 16)
+ bal.KeepServices = make(map[string]*KeepService)
+ for i := range bal.srvs {
+ srv := &KeepService{
+ KeepService: arvados.KeepService{
+ UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
+ },
+ }
+ bal.srvs[i] = srv
+ bal.KeepServices[srv.UUID] = srv
}
+
+ bal.MinMtime = time.Now().Unix() - bal.signatureTTL
}
func (bal *balancerSuite) TestPerfect(c *check.C) {
bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1},
shouldPull: nil,
- shouldTrash: nil,
- known: known0,
- blk: &BlockState{
- Desired: 2,
- Replicas: bal.replList(known0, 0, 1)}})
+ shouldTrash: nil})
}
func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
bal.try(c, tester{
- shouldPull: nil,
- shouldTrash: bal.srvList(known0, 2),
- known: known0,
- blk: &BlockState{
- Desired: 2,
- Replicas: bal.replList(known0, 0, 2, 1)}})
+ desired: 2,
+ current: slots{0, 2, 1},
+ shouldTrash: slots{2}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
+ bal.try(c, tester{
+ desired: 0,
+ current: slots{0, 1, 3},
+ shouldTrash: slots{0, 1, 3}})
}
func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
bal.try(c, tester{
- shouldPull: bal.srvList(known0, 2, 3),
- shouldTrash: nil,
- known: known0,
- blk: &BlockState{
- Desired: 4,
- Replicas: bal.replList(known0, 0, 1)}})
+ desired: 4,
+ current: slots{0, 1},
+ shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
+ bal.srvList(0, slots{3})[0].ReadOnly = true
+ bal.try(c, tester{
+ desired: 4,
+ current: slots{0, 1},
+ shouldPull: slots{2, 4}})
}
func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
bal.try(c, tester{
- shouldPull: bal.srvList(known0, 1),
- shouldTrash: nil,
- known: known0,
- blk: &BlockState{
- Desired: 2,
- Replicas: bal.replList(known0, 2, 0)}})
+ desired: 2,
+ current: slots{2, 0},
+ shouldPull: slots{1}})
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{2, 7},
+ shouldPull: slots{0, 1}})
+ // unbalanced + excessive replication => pull + trash
bal.try(c, tester{
- shouldPull: bal.srvList(known0, 0, 1),
- shouldTrash: nil,
- known: known0,
- blk: &BlockState{
- Desired: 2,
- Replicas: bal.replList(known0, 2, 7)}})
+ desired: 1,
+ current: slots{2, 7},
+ shouldPull: slots{0},
+ shouldTrash: slots{7}})
}
func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
// For purposes of increasing replication, we assume identical
// replicas are distinct.
- repls := bal.replList(known0, 0, 1)
- repls[1].Mtime = repls[0].Mtime
bal.try(c, tester{
- shouldPull: bal.srvList(known0, 2, 3),
- shouldTrash: nil,
- known: known0,
- blk: &BlockState{
- Desired: 4,
- Replicas: repls}})
+ desired: 4,
+ current: slots{0, 1},
+ timestamps: []int64{12345678, 12345678},
+ shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
+ // For purposes of decreasing replication, we assume identical
+ // replicas are NOT distinct.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{12345678, 12345678, 12345678}})
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{12345678, 10000000, 10000000}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
+ oldTime := bal.MinMtime - 3600
+ newTime := bal.MinMtime + 3600
+ // The excess replica is too new to delete.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{oldTime, newTime, newTime+1}})
+ // The best replicas are too new to delete, but the excess
+ // replica is old enough.
+ bal.try(c, tester{
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{newTime, newTime+1, oldTime},
+ shouldTrash: slots{2}})
}
+// Clear all servers' changesets, balance a single block, and verify
+// the appropriate changes for that block have been added to the
+// changesets.
func (bal *balancerSuite) try(c *check.C, t tester) {
bal.setupServiceRoots()
- bal.balanceBlock(knownBlkid(t.known), t.blk)
-
- var didPull, didTrash []*KeepService
+ blk := &BlockState{
+ Desired: t.desired,
+ Replicas: bal.replList(t.known, t.current)}
+ for i, t := range t.timestamps {
+ blk.Replicas[i].Mtime = t
+ }
for _, srv := range bal.srvs {
- if len(srv.Pulls) > 0 {
- didPull = append(didPull, srv)
+ srv.ChangeSet = ChangeSet{}
+ }
+ bal.balanceBlock(knownBlkid(t.known), blk)
+
+ var didPull, didTrash slots
+ for i, srv := range bal.srvs {
+ var slot int
+ for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
+ if srvNum == i {
+ slot = probeOrder
+ }
+ }
+ for _, pull := range srv.Pulls {
+ didPull = append(didPull, slot)
+ c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
}
- if len(srv.Trashes) > 0 {
- didTrash = append(didTrash, srv)
+ for _, trash := range srv.Trashes {
+ didTrash = append(didTrash, slot)
+ c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
}
}
- for _, list := range [][]*KeepService{didPull, didTrash, t.shouldPull, t.shouldTrash} {
- sort.Sort(serviceByUUID(list))
+ for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
+ sort.Sort(sort.IntSlice(list))
}
c.Check(didPull, check.DeepEquals, t.shouldPull)
c.Check(didTrash, check.DeepEquals, t.shouldTrash)
@@ -166,8 +224,8 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
// then selected by idx. For example, srvList(3, 0, 1, 4) returns the
// the first-, second-, and fifth-best servers for storing
// bal.knownBlkid(3).
-func (bal *balancerSuite) srvList(knownBlockID int, idx ...int) (srvs []*KeepService) {
- for _, i := range idx {
+func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
+ for _, i := range order {
srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
}
return
@@ -175,9 +233,9 @@ func (bal *balancerSuite) srvList(knownBlockID int, idx ...int) (srvs []*KeepSer
// replList is like srvList but returns an "existing replicas" slice,
// suitable for a BlockState test fixture.
-func (bal *balancerSuite) replList(knownBlockID int, idx ...int) (repls []Replica) {
- mtime := int64(1461618503)
- for _, srv := range bal.srvList(knownBlockID, idx...) {
+func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
+ mtime := time.Now().Unix() - bal.signatureTTL - 86400
+ for _, srv := range bal.srvList(knownBlockID, order) {
repls = append(repls, Replica{srv, mtime})
mtime++
}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
index 6b79b18..3edb21e 100644
--- a/services/keep-balance/change_set.go
+++ b/services/keep-balance/change_set.go
@@ -1,6 +1,7 @@
package main
import (
+ "encoding/json"
"fmt"
"sync"
@@ -14,12 +15,36 @@ type Pull struct {
Source *KeepService
}
+// MarshalJSON formats a pull request the way keepstore wants to see
+// it.
+func (p Pull) MarshalJSON() ([]byte, error) {
+ type KeepstorePullRequest struct {
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+ }
+ return json.Marshal(KeepstorePullRequest{
+ Locator: string(p.SizedDigest[:32]),
+ Servers: []string{p.Source.URLBase()}})
+}
+
// Trash is a request to delete a block.
type Trash struct {
arvados.SizedDigest
Mtime int64
}
+// MarshalJSON formats a trash request the way keepstore wants to see
+// it, i.e., as a bare locator with no +size hint.
+func (t Trash) MarshalJSON() ([]byte, error) {
+ type KeepstoreTrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+ }
+ return json.Marshal(KeepstoreTrashRequest{
+ Locator: string(t.SizedDigest[:32]),
+ BlockMtime: t.Mtime})
+}
+
// ChangeSet is a set of change requests that will be sent to a
// keepstore server.
type ChangeSet struct {
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
new file mode 100644
index 0000000..50a2715
--- /dev/null
+++ b/services/keep-balance/change_set_test.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ "encoding/json"
+
+ "git.curoverse.com/arvados.git/sdk/go/x/arvados"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&changeSetSuite{})
+
+type changeSetSuite struct{}
+
+func (s *changeSetSuite) TestJSONFormat(c *check.C) {
+ srv := &KeepService{
+ KeepService: arvados.KeepService{
+ UUID: "zzzzz-bi6l4-000000000000001",
+ ServiceType: "disk",
+ ServiceSSLFlag: false,
+ ServiceHost: "keep1.zzzzz.arvadosapi.com",
+ ServicePort: 25107}}
+
+ buf, err := json.Marshal([]Pull{{
+ SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+ Source: srv}})
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`)
+
+ buf, err = json.Marshal([]Trash{{
+ SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+ Mtime: 123456789}})
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index f4db28e..e158728 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -13,3 +13,9 @@ type KeepService struct {
func (srv KeepService) String() string {
return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
}
+
+var ksSchemes = map[bool]string{false: "http", true: "https"}
+
+func (srv KeepService) URLBase() string {
+ return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list