[ARVADOS] updated: 01aa17fb0f5de5a7e9a287e54ef15008d7af7260
Git user
git at public.curoverse.com
Wed May 18 11:32:13 EDT 2016
Summary of changes:
services/keep-balance/balance.go | 97 ++++++++++++------
services/keep-balance/keep_service.go | 10 +-
services/keep-balance/main.go | 8 +-
services/keep-balance/main_test.go | 182 ++++++++++++++++++++++------------
4 files changed, 197 insertions(+), 100 deletions(-)
via 01aa17fb0f5de5a7e9a287e54ef15008d7af7260 (commit)
via 24815fb6113db774abb81dd543dbc80926bf5bce (commit)
from cdfc5f568046d2f39861cea65b62682bc9ddbce1 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 01aa17fb0f5de5a7e9a287e54ef15008d7af7260
Author: Tom Clegg <tom at curoverse.com>
Date: Wed May 18 11:32:08 2016 -0400
9162: Fix up request tracking in tests
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index 5b83316..d3051a5 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -10,7 +10,6 @@ import (
"net/http/httptest"
"strings"
"sync"
- "sync/atomic"
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
@@ -23,7 +22,7 @@ type stubServer struct {
mux *http.ServeMux
srv *httptest.Server
mutex sync.Mutex
- Requests []http.Request
+ Requests reqTracker
logf func(string, ...interface{})
}
@@ -34,7 +33,7 @@ func (s *stubServer) start() *http.Client {
s.mux = http.NewServeMux()
s.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.mutex.Lock()
- s.Requests = append(s.Requests, *r)
+ s.Requests.Add(r)
s.mutex.Unlock()
w.Header().Set("Content-Type", "application/json")
s.mux.ServeHTTP(w, r)
@@ -56,43 +55,61 @@ func (s *stubServer) stop() {
s.srv.Close()
}
-func (s *stubServer) serveStatic(path, data string) *int {
- var count int
+type reqTracker struct {
+ reqs []http.Request
+ sync.Mutex
+}
+
+func (rt *reqTracker) Count() int {
+ rt.Lock()
+ defer rt.Unlock()
+ return len(rt.reqs)
+}
+
+func (rt *reqTracker) Add(req *http.Request) int {
+ rt.Lock()
+ defer rt.Unlock()
+ rt.reqs = append(rt.reqs, *req)
+ return len(rt.reqs)
+}
+
+func (s *stubServer) serveStatic(path, data string) *reqTracker {
+ rt := &reqTracker{}
s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
- count++
+ rt.Add(r)
if r.Body != nil {
ioutil.ReadAll(r.Body)
r.Body.Close()
}
io.WriteString(w, data)
})
- return &count
+ return rt
}
-func (s *stubServer) serveCurrentUserAdmin() *int {
+func (s *stubServer) serveCurrentUserAdmin() *reqTracker {
return s.serveStatic("/arvados/v1/users/current",
`{"uuid":"zzzzz-tpzed-000000000000000","is_admin":true,"is_active":true}`)
}
-func (s *stubServer) serveCurrentUserNotAdmin() *int {
+func (s *stubServer) serveCurrentUserNotAdmin() *reqTracker {
return s.serveStatic("/arvados/v1/users/current",
`{"uuid":"zzzzz-tpzed-000000000000000","is_admin":false,"is_active":true}`)
}
-func (s *stubServer) serveDiscoveryDoc() *int {
+func (s *stubServer) serveDiscoveryDoc() *reqTracker {
return s.serveStatic("/discovery/v1/apis/arvados/v1/rest",
`{"defaultCollectionReplication":2}`)
}
-func (s *stubServer) serveZeroCollections() *int {
+func (s *stubServer) serveZeroCollections() *reqTracker {
return s.serveStatic("/arvados/v1/collections",
`{"items":[],"items_available":0}`)
}
-func (s *stubServer) serveFooBarFileCollections() *int {
- var count int
+func (s *stubServer) serveFooBarFileCollections() *reqTracker {
+ rt := &reqTracker{}
s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
- count++
+ rt.Add(r)
if strings.Contains(r.Form.Get("filters"), `modified_at`) {
io.WriteString(w, `{"items_available":0,"items":[]}`)
} else {
@@ -101,15 +118,15 @@ func (s *stubServer) serveFooBarFileCollections() *int {
{"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
}
})
- return &count
+ return rt
}
-func (s *stubServer) serveZeroKeepServices() *int {
+func (s *stubServer) serveZeroKeepServices() *reqTracker {
return s.serveStatic("/arvados/v1/keep_services",
`{"items":[],"items_available":0}`)
}
-func (s *stubServer) serveFourDiskKeepServices() *int {
+func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
{"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
{"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
@@ -118,23 +135,23 @@ func (s *stubServer) serveFourDiskKeepServices() *int {
{"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
}
-func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *int64 {
- var count int64
+func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
+ rt := &reqTracker{}
s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
- count := atomic.AddInt64(&count, 1)
+ count := rt.Add(r)
if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
}
fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
})
- return &count
+ return rt
}
-func (s *stubServer) serveKeepstoreTrash() *int {
+func (s *stubServer) serveKeepstoreTrash() *reqTracker {
return s.serveStatic("/trash", `{}`)
}
-func (s *stubServer) serveKeepstorePull() *int {
+func (s *stubServer) serveKeepstorePull() *reqTracker {
return s.serveStatic("/pull", `{}`)
}
@@ -185,13 +202,31 @@ func (s *mainSuite) TestRefuseZeroCollections(c *check.C) {
Logger: s.logger(c),
}
s.stub.serveCurrentUserAdmin()
- countCollectionsList := s.stub.serveZeroCollections()
- s.stub.serveZeroKeepServices()
- countTrash := s.stub.serveKeepstoreTrash()
+ s.stub.serveZeroCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ _, err := Run(s.config, opts)
+ c.Check(err, check.ErrorMatches, "received zero collections")
+ c.Check(trashReqs.Count(), check.Equals, 0)
+}
+
+func (s *mainSuite) TestServiceTypes(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ }
+ s.config.ServiceTypes = []string{"unlisted-type"}
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveZeroCollections()
+ s.stub.serveFourDiskKeepServices()
+ indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
_, err := Run(s.config, opts)
c.Check(err, check.ErrorMatches, "received zero collections")
- c.Check(*countCollectionsList, check.Equals, 2)
- c.Check(*countTrash, check.Equals, 0)
+ c.Check(indexReqs.Count(), check.Equals, 0)
+ c.Check(trashReqs.Count(), check.Equals, 0)
}
func (s *mainSuite) TestRefuseNonAdmin(c *check.C) {
@@ -200,18 +235,15 @@ func (s *mainSuite) TestRefuseNonAdmin(c *check.C) {
CommitTrash: true,
Logger: s.logger(c),
}
- countCurrentUser := s.stub.serveCurrentUserNotAdmin()
- countColl := s.stub.serveZeroCollections()
- countKS := s.stub.serveZeroKeepServices()
- countTrash := s.stub.serveKeepstoreTrash()
- countPull := s.stub.serveKeepstorePull()
+ s.stub.serveCurrentUserNotAdmin()
+ s.stub.serveZeroCollections()
+ s.stub.serveFourDiskKeepServices()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
_, err := Run(s.config, opts)
c.Check(err, check.ErrorMatches, "Current user .* is not .* admin user")
- c.Check(*countCurrentUser, check.Equals, 1)
- c.Check(*countColl, check.Equals, 0)
- c.Check(*countKS, check.Equals, 0)
- c.Check(*countTrash, check.Equals, 0)
- c.Check(*countPull, check.Equals, 0)
+ c.Check(trashReqs.Count(), check.Equals, 0)
+ c.Check(pullReqs.Count(), check.Equals, 0)
}
func (s *mainSuite) TestDryRun(c *check.C) {
@@ -224,17 +256,14 @@ func (s *mainSuite) TestDryRun(c *check.C) {
s.stub.serveFooBarFileCollections()
s.stub.serveFourDiskKeepServices()
s.stub.serveKeepstoreIndexFoo4Bar1()
- countTrash := s.stub.serveKeepstoreTrash()
- countPull := s.stub.serveKeepstorePull()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
bal, err := Run(s.config, opts)
c.Check(err, check.IsNil)
- c.Check(*countTrash, check.Equals, 0)
- c.Check(*countPull, check.Equals, 0)
- var todo int
- for _, srv := range bal.KeepServices {
- todo += len(srv.Pulls) + len(srv.Trashes)
- }
- c.Check(todo, check.Not(check.Equals), 0)
+ c.Check(trashReqs.Count(), check.Equals, 0)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+ stats := bal.getStatistics()
+ c.Check(stats.pulls, check.Not(check.Equals), 0)
}
func (s *mainSuite) TestCommit(c *check.C) {
@@ -248,12 +277,12 @@ func (s *mainSuite) TestCommit(c *check.C) {
s.stub.serveFooBarFileCollections()
s.stub.serveFourDiskKeepServices()
s.stub.serveKeepstoreIndexFoo4Bar1()
- countTrash := s.stub.serveKeepstoreTrash()
- countPull := s.stub.serveKeepstorePull()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
bal, err := Run(s.config, opts)
c.Check(err, check.IsNil)
- c.Check(*countTrash, check.Equals, 4)
- c.Check(*countPull, check.Equals, 4)
+ c.Check(trashReqs.Count(), check.Equals, 4)
+ c.Check(pullReqs.Count(), check.Equals, 4)
stats := bal.getStatistics()
// "foo" block is overreplicated by 2
c.Check(stats.trashes, check.Equals, 2)
commit 24815fb6113db774abb81dd543dbc80926bf5bce
Author: Tom Clegg <tom at curoverse.com>
Date: Wed May 18 10:31:46 2016 -0400
9162: Add dump flag. Fix PUT bug properly.
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 957f37f..3cff8e8 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -17,6 +17,7 @@ type Balancer struct {
DefaultReplication int
ServiceTypes []string
Logger *log.Logger
+ Dumper *log.Logger
MinMtime int64
collScanned int
@@ -233,6 +234,18 @@ func (bal *Balancer) ComputeChangeSets() {
wg.Wait()
}
+const (
+ changeNothing = iota
+ changePull
+ changeTrash
+)
+
+var changeName = map[int]string{
+ changeNothing: "leave",
+ changePull: "pull",
+ changeTrash: "trash",
+}
+
// balanceBlock compares current state to desired state for a single
// block, and makes the appropriate ChangeSet calls.
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
@@ -256,7 +269,9 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
// rendezvous position N, we should assume all pulls we have
// requested on rendezvous positions M<N will be successful.)
pulls := 0
+ var changes []string
for _, uuid := range uuids {
+ change := changeNothing
srv := bal.KeepServices[uuid]
// TODO: request a Touch if Mtime is duplicated.
if repl, ok := hasRepl[srv.UUID]; ok {
@@ -274,6 +289,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
SizedDigest: blkid,
Mtime: repl.Mtime,
})
+ change = changeTrash
}
uniqueBestRepl[repl.Mtime] = true
reportedBestRepl++
@@ -289,7 +305,14 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
Source: blk.Replicas[0].KeepService,
})
pulls++
+ change = changePull
}
+ if bal.Dumper != nil {
+ changes = append(changes, changeName[change]+":"+uuid)
+ }
+ }
+ if bal.Dumper != nil {
+ bal.Dumper.Printf("%s have=%d want=%d %v", blkid, len(blk.Replicas), blk.Desired, changes)
}
}
@@ -303,25 +326,30 @@ func (bb blocksNBytes) String() string {
return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
}
-func (bal *Balancer) PrintStatistics() {
- var lost, overrep, unref, garbage, underrep, justright, desired, current blocksNBytes
+type balancerStats struct {
+ lost, overrep, unref, garbage, underrep, justright blocksNBytes
+ desired, current blocksNBytes
+ pulls, trashes int
+}
+
+func (bal *Balancer) getStatistics() (s balancerStats) {
bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
surplus := len(blk.Replicas) - blk.Desired
bytes := blkid.Size()
switch {
case len(blk.Replicas) == 0 && blk.Desired > 0:
- lost.replicas -= surplus
- lost.blocks++
- lost.bytes += bytes * int64(-surplus)
+ s.lost.replicas -= surplus
+ s.lost.blocks++
+ s.lost.bytes += bytes * int64(-surplus)
case len(blk.Replicas) < blk.Desired:
- underrep.replicas -= surplus
- underrep.blocks++
- underrep.bytes += bytes * int64(-surplus)
+ s.underrep.replicas -= surplus
+ s.underrep.blocks++
+ s.underrep.bytes += bytes * int64(-surplus)
case len(blk.Replicas) > 0 && blk.Desired == 0:
- counter := &garbage
+ counter := &s.garbage
for _, r := range blk.Replicas {
if r.Mtime >= bal.MinMtime {
- counter = &unref
+ counter = &s.unref
break
}
}
@@ -329,36 +357,45 @@ func (bal *Balancer) PrintStatistics() {
counter.blocks++
counter.bytes += bytes * int64(surplus)
case len(blk.Replicas) > blk.Desired:
- overrep.replicas += surplus
- overrep.blocks++
- overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
+ s.overrep.replicas += surplus
+ s.overrep.blocks++
+ s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
default:
- justright.replicas += blk.Desired
- justright.blocks++
- justright.bytes += bytes * int64(blk.Desired)
+ s.justright.replicas += blk.Desired
+ s.justright.blocks++
+ s.justright.bytes += bytes * int64(blk.Desired)
}
if blk.Desired > 0 {
- desired.replicas += blk.Desired
- desired.blocks++
- desired.bytes += bytes * int64(blk.Desired)
+ s.desired.replicas += blk.Desired
+ s.desired.blocks++
+ s.desired.bytes += bytes * int64(blk.Desired)
}
if len(blk.Replicas) > 0 {
- current.replicas += len(blk.Replicas)
- current.blocks++
- current.bytes += bytes * int64(len(blk.Replicas))
+ s.current.replicas += len(blk.Replicas)
+ s.current.blocks++
+ s.current.bytes += bytes * int64(len(blk.Replicas))
}
})
+ for _, srv := range bal.KeepServices {
+ s.pulls += len(srv.ChangeSet.Pulls)
+ s.trashes += len(srv.ChangeSet.Trashes)
+ }
+ return
+}
+
+func (bal *Balancer) PrintStatistics() {
+ s := bal.getStatistics()
bal.Logf("===")
- bal.Logf("%s lost (0=have<want)", lost)
- bal.Logf("%s underreplicated (0<have<want)", underrep)
- bal.Logf("%s just right (have=want)", justright)
- bal.Logf("%s overreplicated (have>want>0)", overrep)
- bal.Logf("%s unreferenced (have>want=0, new)", unref)
- bal.Logf("%s garbage (have>want=0, old)", garbage)
+ bal.Logf("%s lost (0=have<want)", s.lost)
+ bal.Logf("%s underreplicated (0<have<want)", s.underrep)
+ bal.Logf("%s just right (have=want)", s.justright)
+ bal.Logf("%s overreplicated (have>want>0)", s.overrep)
+ bal.Logf("%s unreferenced (have>want=0, new)", s.unref)
+ bal.Logf("%s garbage (have>want=0, old)", s.garbage)
bal.Logf("===")
- bal.Logf("%s total commitment (excluding unreferenced)", desired)
- bal.Logf("%s total usage", current)
+ bal.Logf("%s total commitment (excluding unreferenced)", s.desired)
+ bal.Logf("%s total usage", s.current)
bal.Logf("===")
for _, srv := range bal.KeepServices {
bal.Logf("%s: %v\n", srv, srv.ChangeSet)
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index 7500b9f..1a502e0 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -34,24 +34,18 @@ func (srv *KeepService) CommitTrash(c *arvados.Client) error {
}
func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
- errC := make(chan error)
+ errC := make(chan error, 1)
jsonR, jsonW := io.Pipe()
go func() {
enc := json.NewEncoder(jsonW)
errC <- enc.Encode(data)
+ jsonW.Close()
}()
url := srv.URLBase() + "/" + path
req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
if err != nil {
return fmt.Errorf("building request for %s: %v", url, err)
}
-
- // Workaround bug (in keepstore?) -- if the server end closes
- // the connection without setting Connection: Close, our
- // client will hang for a while next time it tries to reuse
- // the connection.
- req.Close = true
-
err = c.DoAndDecode(nil, req)
if encErr := <-errC; encErr != nil {
return fmt.Errorf("encoding data for %s: %v", url, encErr)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 615c47d..0d7e56c 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -18,7 +18,8 @@ type Config struct {
type RunOptions struct {
CommitPulls bool
CommitTrash bool
- Logger *log.Logger
+ Logger *log.Logger
+ Dumper *log.Logger
}
var debugf func(string, ...interface{}) = func(string, ...interface{}) {}
@@ -34,6 +35,7 @@ func main() {
"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+ dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
debugFlag := flag.Bool("debug", false, "enable debug messages")
flag.Parse()
@@ -51,6 +53,9 @@ func main() {
log.Printf("config is %s", j)
}
}
+ if *dumpFlag {
+ runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+ }
if _, err := Run(config, runOptions); err != nil {
log.Fatal(err)
}
@@ -62,6 +67,7 @@ func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
}
bal = &Balancer{
Logger: runOptions.Logger,
+ Dumper: runOptions.Dumper,
ServiceTypes: config.ServiceTypes,
}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index fcb0d35..5b83316 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -60,6 +60,10 @@ func (s *stubServer) serveStatic(path, data string) *int {
var count int
s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
count++
+ if r.Body != nil {
+ ioutil.ReadAll(r.Body)
+ r.Body.Close()
+ }
io.WriteString(w, data)
})
return &count
@@ -85,14 +89,16 @@ func (s *stubServer) serveZeroCollections() *int {
`{"items":[],"items_available":0}`)
}
-func (s *stubServer) serveFooFileCollection() *int {
+func (s *stubServer) serveFooBarFileCollections() *int {
var count int
s.mux.HandleFunc("/arvados/v1/collections", func(w http.ResponseWriter, r *http.Request) {
count++
if strings.Contains(r.Form.Get("filters"), `modified_at`) {
- io.WriteString(w, `{"items":[],"items_available":0}`)
+ io.WriteString(w, `{"items_available":0,"items":[]}`)
} else {
- io.WriteString(w, `{"items":[{"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}],"items_available":1}`)
+ io.WriteString(w, `{"items_available":2,"items":[
+ {"uuid":"zzzzz-4zz18-ehbhgtheo8909or","portable_data_hash":"fa7aeb5140e2848d39b416daeef4ffc5+45","manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n","modified_at":"2014-02-03T17:22:54Z"},
+ {"uuid":"zzzzz-4zz18-znfnqtbbv4spc3w","portable_data_hash":"1f4b0bc7583c2a7f9102c395f4ffc5e3+45","manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo\n","modified_at":"2014-02-03T17:22:54Z"}]}`)
}
})
return &count
@@ -103,20 +109,22 @@ func (s *stubServer) serveZeroKeepServices() *int {
`{"items":[],"items_available":0}`)
}
-func (s *stubServer) serveFourKeepServices() *int {
- return s.serveStatic("/arvados/v1/keep_services",
- `{"items":[
- {"uuid":"zzzzz-bi6l4-6zhilxar6r8ey90","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
- {"uuid":"zzzzz-bi6l4-rsnj3c76ndxb7o0","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
- {"uuid":"zzzzz-bi6l4-6zhilxar6r8ey91","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
- {"uuid":"zzzzz-bi6l4-rsnj3c76ndxb7o1","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"}
- ],"items_available":4}`)
+func (s *stubServer) serveFourDiskKeepServices() *int {
+ return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
+ {"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
+ {"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
}
-func (s *stubServer) serveKeepstoreIndexFooBlock() *int64 {
+func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *int64 {
var count int64
s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
count := atomic.AddInt64(&count, 1)
+ if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
+ io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+ }
fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
})
return &count
@@ -152,7 +160,7 @@ func (s *mainSuite) logger(c *check.C) *log.Logger {
}
}
}()
- return log.New(w, "", 0)
+ return log.New(w, "", log.LstdFlags)
}
func (s *mainSuite) SetUpTest(c *check.C) {
@@ -213,14 +221,12 @@ func (s *mainSuite) TestDryRun(c *check.C) {
Logger: s.logger(c),
}
s.stub.serveCurrentUserAdmin()
- s.stub.serveFooFileCollection()
- countKS := s.stub.serveFourKeepServices()
- countIndex := s.stub.serveKeepstoreIndexFooBlock()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
countTrash := s.stub.serveKeepstoreTrash()
countPull := s.stub.serveKeepstorePull()
bal, err := Run(s.config, opts)
- c.Check(*countKS, check.Equals, 1)
- c.Check(*countIndex, check.Equals, int64(4))
c.Check(err, check.IsNil)
c.Check(*countTrash, check.Equals, 0)
c.Check(*countPull, check.Equals, 0)
@@ -230,3 +236,28 @@ func (s *mainSuite) TestDryRun(c *check.C) {
}
c.Check(todo, check.Not(check.Equals), 0)
}
+
+func (s *mainSuite) TestCommit(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ Logger: s.logger(c),
+ Dumper: s.logger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveFourDiskKeepServices()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ countTrash := s.stub.serveKeepstoreTrash()
+ countPull := s.stub.serveKeepstorePull()
+ bal, err := Run(s.config, opts)
+ c.Check(err, check.IsNil)
+ c.Check(*countTrash, check.Equals, 4)
+ c.Check(*countPull, check.Equals, 4)
+ stats := bal.getStatistics()
+ // "foo" block is overreplicated by 2
+ c.Check(stats.trashes, check.Equals, 2)
+ // "bar" block is underreplicated by 1, and its only copy is
+ // in a poor rendezvous position
+ c.Check(stats.pulls, check.Equals, 2)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list