[ARVADOS] updated: abe6659e55121a45342bbacd2651c7b51cd7d4f8

Git user git at public.curoverse.com
Mon May 16 23:39:57 EDT 2016


Summary of changes:
 services/keep-balance/balance.go         |  16 ++-
 services/keep-balance/balance_test.go    | 202 ++++++++++++++++++++-----------
 services/keep-balance/change_set.go      |  25 ++++
 services/keep-balance/change_set_test.go |  35 ++++++
 services/keep-balance/keep_service.go    |   6 +
 5 files changed, 207 insertions(+), 77 deletions(-)
 create mode 100644 services/keep-balance/change_set_test.go

       via  abe6659e55121a45342bbacd2651c7b51cd7d4f8 (commit)
      from  fa93bba9f4496b2b52ded5234e190f99a3d4fc58 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit abe6659e55121a45342bbacd2651c7b51cd7d4f8
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon May 16 23:39:36 2016 -0400

    9162: Add tests. Do not pull more replicas just because mtimes are equal

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d0f708a..25e8fef 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -238,6 +238,9 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	for _, repl := range blk.Replicas {
 		hasRepl[repl.UUID] = repl
 	}
+	// number of replicas already found in positions better than
+	// the position we're contemplating now.
+	reportedBestRepl := 0
 	// To be safe we assume two replicas with the same Mtime are
 	// in fact the same replica being reported more than
 	// once. len(uniqueBestRepl) is the number of distinct
@@ -257,17 +260,20 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 			// delete it if [1] we already have enough
 			// distinct replicas in better rendezvous
 			// positions and [2] this replica's Mtime is
-			// distinct from the better replicas' Mtime.
-			if len(uniqueBestRepl) >= blk.Desired &&
-				!uniqueBestRepl[repl.Mtime] &&
-				repl.Mtime < bal.MinMtime {
+			// distinct from all of the better replicas'
+			// Mtimes.
+			if !srv.ReadOnly &&
+				repl.Mtime < bal.MinMtime &&
+				len(uniqueBestRepl) >= blk.Desired &&
+				!uniqueBestRepl[repl.Mtime] {
 				srv.AddTrash(Trash{
 					SizedDigest: blkid,
 					Mtime:       repl.Mtime,
 				})
 			}
 			uniqueBestRepl[repl.Mtime] = true
-		} else if pulls+len(uniqueBestRepl) < blk.Desired &&
+			reportedBestRepl++
+		} else if pulls+reportedBestRepl < blk.Desired &&
 			len(blk.Replicas) > 0 &&
 			!srv.ReadOnly {
 			// This service doesn't have a replica. We
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index 7d90302..1facb7c 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -6,6 +6,7 @@ import (
 	"sort"
 	"strconv"
 	"testing"
+	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 
@@ -24,6 +25,7 @@ type balancerSuite struct {
 	srvs            []*KeepService
 	blks            map[string]tester
 	knownRendezvous [][]int
+	signatureTTL    int64
 }
 
 const (
@@ -31,11 +33,15 @@ const (
 	known0 = 0
 )
 
+type slots []int
+
 type tester struct {
 	known       int
-	blk         *BlockState
-	shouldPull  []*KeepService
-	shouldTrash []*KeepService
+	desired     int
+	current     slots
+	timestamps  []int64
+	shouldPull  slots
+	shouldTrash slots
 }
 
 type serviceByUUID []*KeepService
@@ -45,18 +51,6 @@ func (s serviceByUUID) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 func (s serviceByUUID) Less(i, j int) bool { return s[i].UUID < s[j].UUID }
 
 func (bal *balancerSuite) SetUpSuite(c *check.C) {
-	bal.srvs = make([]*KeepService, 16)
-	bal.KeepServices = make(map[string]*KeepService)
-	for i := range bal.srvs {
-		srv := &KeepService{
-			KeepService: arvados.KeepService{
-				UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
-			},
-		}
-		bal.srvs[i] = srv
-		bal.KeepServices[srv.UUID] = srv
-	}
-
 	bal.knownRendezvous = nil
 	for _, str := range []string{
 		"3eab2d5fc9681074",
@@ -64,99 +58,163 @@ func (bal *balancerSuite) SetUpSuite(c *check.C) {
 		"c5b4e023f8a7d691",
 		"9d81c02e76a3bf54",
 	} {
-		var probes []int
+		var slots []int
 		for _, c := range []byte(str) {
 			pos, _ := strconv.ParseUint(string(c), 16, 4)
-			probes = append(probes, int(pos))
+			slots = append(slots, int(pos))
 		}
-		bal.knownRendezvous = append(bal.knownRendezvous, probes)
+		bal.knownRendezvous = append(bal.knownRendezvous, slots)
 	}
+
+	bal.signatureTTL = 3600
 }
 
 func (bal *balancerSuite) SetUpTest(c *check.C) {
-	// Clear any changes from previous test case
-	for _, srv := range bal.srvs {
-		srv.ChangeSet = ChangeSet{}
+	bal.srvs = make([]*KeepService, 16)
+	bal.KeepServices = make(map[string]*KeepService)
+	for i := range bal.srvs {
+		srv := &KeepService{
+			KeepService: arvados.KeepService{
+				UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
+			},
+		}
+		bal.srvs[i] = srv
+		bal.KeepServices[srv.UUID] = srv
 	}
+
+	bal.MinMtime = time.Now().Unix() - bal.signatureTTL
 }
 
 func (bal *balancerSuite) TestPerfect(c *check.C) {
 	bal.try(c, tester{
+		desired:     2,
+		current:     slots{0, 1},
 		shouldPull:  nil,
-		shouldTrash: nil,
-		known:       known0,
-		blk: &BlockState{
-			Desired:  2,
-			Replicas: bal.replList(known0, 0, 1)}})
+		shouldTrash: nil})
 }
 
 func (bal *balancerSuite) TestDecreaseRepl(c *check.C) {
 	bal.try(c, tester{
-		shouldPull:  nil,
-		shouldTrash: bal.srvList(known0, 2),
-		known:       known0,
-		blk: &BlockState{
-			Desired:  2,
-			Replicas: bal.replList(known0, 0, 2, 1)}})
+		desired:     2,
+		current:     slots{0, 2, 1},
+		shouldTrash: slots{2}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplToZero(c *check.C) {
+	bal.try(c, tester{
+		desired:     0,
+		current:     slots{0, 1, 3},
+		shouldTrash: slots{0, 1, 3}})
 }
 
 func (bal *balancerSuite) TestIncreaseRepl(c *check.C) {
 	bal.try(c, tester{
-		shouldPull:  bal.srvList(known0, 2, 3),
-		shouldTrash: nil,
-		known:       known0,
-		blk: &BlockState{
-			Desired:  4,
-			Replicas: bal.replList(known0, 0, 1)}})
+		desired:    4,
+		current:    slots{0, 1},
+		shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestSkipReadonly(c *check.C) {
+	bal.srvList(0, slots{3})[0].ReadOnly = true
+	bal.try(c, tester{
+		desired:    4,
+		current:    slots{0, 1},
+		shouldPull: slots{2, 4}})
 }
 
 func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
 	bal.try(c, tester{
-		shouldPull:  bal.srvList(known0, 1),
-		shouldTrash: nil,
-		known:       known0,
-		blk: &BlockState{
-			Desired:  2,
-			Replicas: bal.replList(known0, 2, 0)}})
+		desired:    2,
+		current:    slots{2, 0},
+		shouldPull: slots{1}})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{2, 7},
+		shouldPull: slots{0, 1}})
+	// unbalanced + excessive replication => pull + trash
 	bal.try(c, tester{
-		shouldPull:  bal.srvList(known0, 0, 1),
-		shouldTrash: nil,
-		known:       known0,
-		blk: &BlockState{
-			Desired:  2,
-			Replicas: bal.replList(known0, 2, 7)}})
+		desired:     1,
+		current:     slots{2, 7},
+		shouldPull:  slots{0},
+		shouldTrash: slots{7}})
 }
 
 func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
 	// For purposes of increasing replication, we assume identical
 	// replicas are distinct.
-	repls := bal.replList(known0, 0, 1)
-	repls[1].Mtime = repls[0].Mtime
 	bal.try(c, tester{
-		shouldPull:  bal.srvList(known0, 2, 3),
-		shouldTrash: nil,
-		known:       known0,
-		blk: &BlockState{
-			Desired:  4,
-			Replicas: repls}})
+		desired:    4,
+		current:    slots{0, 1},
+		timestamps: []int64{12345678, 12345678},
+		shouldPull: slots{2, 3}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplTimestampCollision(c *check.C) {
+	// For purposes of decreasing replication, we assume identical
+	// replicas are NOT distinct.
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 1, 2},
+		timestamps: []int64{12345678, 12345678, 12345678}})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 1, 2},
+		timestamps: []int64{12345678, 10000000, 10000000}})
+}
+
+func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
+	oldTime := bal.MinMtime - 3600
+	newTime := bal.MinMtime + 3600
+	// The excess replica is too new to delete.
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 1, 2},
+		timestamps: []int64{oldTime, newTime, newTime+1}})
+	// The best replicas are too new to delete, but the excess
+	// replica is old enough.
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 1, 2},
+		timestamps: []int64{newTime, newTime+1, oldTime},
+		shouldTrash: slots{2}})
 }
 
+// Clear all servers' changesets, balance a single block, and verify
+// the appropriate changes for that block have been added to the
+// changesets.
 func (bal *balancerSuite) try(c *check.C, t tester) {
 	bal.setupServiceRoots()
-	bal.balanceBlock(knownBlkid(t.known), t.blk)
-
-	var didPull, didTrash []*KeepService
+	blk := &BlockState{
+		Desired:  t.desired,
+		Replicas: bal.replList(t.known, t.current)}
+	for i, t := range t.timestamps {
+		blk.Replicas[i].Mtime = t
+	}
 	for _, srv := range bal.srvs {
-		if len(srv.Pulls) > 0 {
-			didPull = append(didPull, srv)
+		srv.ChangeSet = ChangeSet{}
+	}
+	bal.balanceBlock(knownBlkid(t.known), blk)
+
+	var didPull, didTrash slots
+	for i, srv := range bal.srvs {
+		var slot int
+		for probeOrder, srvNum := range bal.knownRendezvous[t.known] {
+			if srvNum == i {
+				slot = probeOrder
+			}
+		}
+		for _, pull := range srv.Pulls {
+			didPull = append(didPull, slot)
+			c.Check(pull.SizedDigest, check.Equals, knownBlkid(t.known))
 		}
-		if len(srv.Trashes) > 0 {
-			didTrash = append(didTrash, srv)
+		for _, trash := range srv.Trashes {
+			didTrash = append(didTrash, slot)
+			c.Check(trash.SizedDigest, check.Equals, knownBlkid(t.known))
 		}
 	}
 
-	for _, list := range [][]*KeepService{didPull, didTrash, t.shouldPull, t.shouldTrash} {
-		sort.Sort(serviceByUUID(list))
+	for _, list := range []slots{didPull, didTrash, t.shouldPull, t.shouldTrash} {
+		sort.Sort(sort.IntSlice(list))
 	}
 	c.Check(didPull, check.DeepEquals, t.shouldPull)
 	c.Check(didTrash, check.DeepEquals, t.shouldTrash)
@@ -166,8 +224,8 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
 // then selected by idx. For example, srvList(3, 0, 1, 4) returns the
 // the first-, second-, and fifth-best servers for storing
 // bal.knownBlkid(3).
-func (bal *balancerSuite) srvList(knownBlockID int, idx ...int) (srvs []*KeepService) {
-	for _, i := range idx {
+func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
+	for _, i := range order {
 		srvs = append(srvs, bal.srvs[bal.knownRendezvous[knownBlockID][i]])
 	}
 	return
@@ -175,9 +233,9 @@ func (bal *balancerSuite) srvList(knownBlockID int, idx ...int) (srvs []*KeepSer
 
 // replList is like srvList but returns an "existing replicas" slice,
 // suitable for a BlockState test fixture.
-func (bal *balancerSuite) replList(knownBlockID int, idx ...int) (repls []Replica) {
-	mtime := int64(1461618503)
-	for _, srv := range bal.srvList(knownBlockID, idx...) {
+func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
+	mtime := time.Now().Unix() - bal.signatureTTL - 86400
+	for _, srv := range bal.srvList(knownBlockID, order) {
 		repls = append(repls, Replica{srv, mtime})
 		mtime++
 	}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
index 6b79b18..3edb21e 100644
--- a/services/keep-balance/change_set.go
+++ b/services/keep-balance/change_set.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
 	"sync"
 
@@ -14,12 +15,36 @@ type Pull struct {
 	Source *KeepService
 }
 
+// MarshalJSON formats a pull request the way keepstore wants to see
+// it.
+func (p Pull) MarshalJSON() ([]byte, error) {
+	type KeepstorePullRequest struct {
+		Locator string   `json:"locator"`
+		Servers []string `json:"servers"`
+	}
+	return json.Marshal(KeepstorePullRequest{
+		Locator: string(p.SizedDigest[:32]),
+		Servers: []string{p.Source.URLBase()}})
+}
+
 // Trash is a request to delete a block.
 type Trash struct {
 	arvados.SizedDigest
 	Mtime int64
 }
 
+// MarshalJSON formats a trash request the way keepstore wants to see
+// it, i.e., as a bare locator with no +size hint.
+func (t Trash) MarshalJSON() ([]byte, error) {
+	type KeepstoreTrashRequest struct {
+		Locator    string `json:"locator"`
+		BlockMtime int64  `json:"block_mtime"`
+	}
+	return json.Marshal(KeepstoreTrashRequest{
+		Locator:    string(t.SizedDigest[:32]),
+		BlockMtime: t.Mtime})
+}
+
 // ChangeSet is a set of change requests that will be sent to a
 // keepstore server.
 type ChangeSet struct {
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
new file mode 100644
index 0000000..50a2715
--- /dev/null
+++ b/services/keep-balance/change_set_test.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+	"encoding/json"
+
+	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
+
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&changeSetSuite{})
+
+type changeSetSuite struct{}
+
+func (s *changeSetSuite) TestJSONFormat(c *check.C) {
+	srv := &KeepService{
+		KeepService: arvados.KeepService{
+			UUID:           "zzzzz-bi6l4-000000000000001",
+			ServiceType:    "disk",
+			ServiceSSLFlag: false,
+			ServiceHost:    "keep1.zzzzz.arvadosapi.com",
+			ServicePort:    25107}}
+
+	buf, err := json.Marshal([]Pull{{
+		SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+		Source:      srv}})
+	c.Check(err, check.IsNil)
+	c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","servers":["http://keep1.zzzzz.arvadosapi.com:25107"]}]`)
+
+	buf, err = json.Marshal([]Trash{{
+		SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
+		Mtime:      123456789}})
+	c.Check(err, check.IsNil)
+	c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
+}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index f4db28e..e158728 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -13,3 +13,9 @@ type KeepService struct {
 func (srv KeepService) String() string {
 	return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
 }
+
+var ksSchemes = map[bool]string{false: "http", true: "https"}
+
+func (srv KeepService) URLBase() string {
+	return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list