[ARVADOS] created: f6ad92b7fb0049c062c7f9465d186f690fb39ba4
Git user
git at public.curoverse.com
Mon Jul 4 17:30:08 EDT 2016
at f6ad92b7fb0049c062c7f9465d186f690fb39ba4 (commit)
commit f6ad92b7fb0049c062c7f9465d186f690fb39ba4
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Jul 3 09:54:22 2016 -0400
9437: Do not clear trash lists between runs when the list of keep services has not changed.
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d22074e..ca57c15 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -6,6 +6,7 @@ import (
"math"
"os"
"runtime"
+ "sort"
"strings"
"sync"
"time"
@@ -50,11 +51,17 @@ type Balancer struct {
}
// Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
//
-// err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Run should only be called once on a given Balancer object.
+//
+// Typical usage:
+//
+// runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+ nextRunOptions = runOptions
+
bal.Dumper = runOptions.Dumper
bal.Logger = runOptions.Logger
if bal.Logger == nil {
@@ -75,10 +82,20 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
}
- if runOptions.CommitTrash {
+ rs := bal.rendezvousState()
+ if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+ if runOptions.SafeRendezvousState != "" {
+ bal.logf("notice: KeepServices list has changed since last run")
+ }
+ bal.logf("clearing existing trash lists to avoid racing with deletions based on a different rendezvous order")
if err = bal.ClearTrashLists(&config.Client); err != nil {
return
}
+ // The current rendezvous state becomes "safe" (i.e.,
+ // OK to compute changes for that state without
+ // clearing existing trash lists) only now, after we
+ // succeed in clearing existing trash lists.
+ nextRunOptions.SafeRendezvousState = rs
}
if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
return
@@ -158,6 +175,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
return nil
}
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+ srvs := make([]string, 0, len(bal.KeepServices))
+ for _, srv := range bal.KeepServices {
+ srvs = append(srvs, srv.String())
+ }
+ sort.Strings(srvs)
+ return strings.Join(srvs, "; ")
+}
+
// ClearTrashLists sends an empty trash list to each keep
// service. Calling this before GetCurrentState avoids races.
//
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index a138d91..edc88aa 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -236,7 +236,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -254,7 +254,7 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
s.stub.serveFourDiskKeepServices()
indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(indexReqs.Count(), check.Equals, 0)
c.Check(trashReqs.Count(), check.Equals, 0)
@@ -271,7 +271,7 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
s.stub.serveFourDiskKeepServices()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -289,7 +289,7 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -308,7 +308,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
var bal Balancer
- err := bal.Run(s.config, opts)
+ _, err := bal.Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -332,7 +332,7 @@ func (s *runSuite) TestCommit(c *check.C) {
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
var bal Balancer
- err := bal.Run(s.config, opts)
+ _, err := bal.Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
@@ -362,13 +362,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
s.config.RunPeriod = arvados.Duration(time.Millisecond)
go RunForever(s.config, opts, stop)
- // Each run should send 4 clear trash lists + 4 pull lists + 4
- // trash lists. We should complete four runs in much less than
+ // Each run should send 4 pull lists + 4 trash lists. The
+ // first run should also send 4 empty trash lists at
+ // startup. We should complete all four runs in much less than
// a second.
for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
time.Sleep(time.Millisecond)
}
stop <- true
c.Check(pullReqs.Count() >= 16, check.Equals, true)
- c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+ c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index b090614..0793889 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -78,8 +78,10 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
CommitTrash: true,
Logger: log.New(logBuf, "", log.LstdFlags),
}
- err := (&Balancer{}).Run(s.config, opts)
+ nextOpts, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.IsNil)
+ c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
+ c.Check(nextOpts.CommitPulls, check.Equals, true)
if iter == 0 {
c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 364bb3f..da4fb62 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -51,6 +51,12 @@ type RunOptions struct {
CommitTrash bool
Logger *log.Logger
Dumper *log.Logger
+
+ // SafeRendezvousState from the most recent balance operation,
+ // or "" if unknown. If this changes from one run to the next,
+ // we need to watch out for races. See
+ // (*Balancer)ClearTrashLists.
+ SafeRendezvousState string
}
var debugf = func(string, ...interface{}) {}
@@ -98,7 +104,7 @@ func main() {
if err != nil {
// (don't run)
} else if runOptions.Once {
- err = (&Balancer{}).Run(config, runOptions)
+ _, err = (&Balancer{}).Run(config, runOptions)
} else {
err = RunForever(config, runOptions, nil)
}
@@ -138,7 +144,9 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- err := (&Balancer{}).Run(config, runOptions)
+ bal := &Balancer{}
+ var err error
+ runOptions, err = bal.Run(config, runOptions)
if err != nil {
logger.Print("run failed: ", err)
} else {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list