[ARVADOS] created: f6ad92b7fb0049c062c7f9465d186f690fb39ba4

Git user git at public.curoverse.com
Mon Jul 4 17:30:08 EDT 2016


        at  f6ad92b7fb0049c062c7f9465d186f690fb39ba4 (commit)


commit f6ad92b7fb0049c062c7f9465d186f690fb39ba4
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Jul 3 09:54:22 2016 -0400

    9437: Do not clear trash lists between runs when the list of keep services has not changed.

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d22074e..ca57c15 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -6,6 +6,7 @@ import (
 	"math"
 	"os"
 	"runtime"
+	"sort"
 	"strings"
 	"sync"
 	"time"
@@ -50,11 +51,17 @@ type Balancer struct {
 }
 
 // Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
 //
-//   err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Run should only be called once on a given Balancer object.
+//
+// Typical usage:
+//
+//   runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+	nextRunOptions = runOptions
+
 	bal.Dumper = runOptions.Dumper
 	bal.Logger = runOptions.Logger
 	if bal.Logger == nil {
@@ -75,10 +82,20 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
 	if err = bal.CheckSanityEarly(&config.Client); err != nil {
 		return
 	}
-	if runOptions.CommitTrash {
+	rs := bal.rendezvousState()
+	if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+		if runOptions.SafeRendezvousState != "" {
+			bal.logf("notice: KeepServices list has changed since last run")
+		}
+		bal.logf("clearing existing trash lists to avoid racing with deletions based on a different rendezvous order")
 		if err = bal.ClearTrashLists(&config.Client); err != nil {
 			return
 		}
+		// The current rendezvous state becomes "safe" (i.e.,
+		// OK to compute changes for that state without
+		// clearing existing trash lists) only now, after we
+		// succeed in clearing existing trash lists.
+		nextRunOptions.SafeRendezvousState = rs
 	}
 	if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
 		return
@@ -158,6 +175,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
 	return nil
 }
 
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+	srvs := make([]string, 0, len(bal.KeepServices))
+	for _, srv := range bal.KeepServices {
+		srvs = append(srvs, srv.String())
+	}
+	sort.Strings(srvs)
+	return strings.Join(srvs, "; ")
+}
+
 // ClearTrashLists sends an empty trash list to each keep
 // service. Calling this before GetCurrentState avoids races.
 //
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index a138d91..edc88aa 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -236,7 +236,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	err := (&Balancer{}).Run(s.config, opts)
+	_, err := (&Balancer{}).Run(s.config, opts)
 	c.Check(err, check.ErrorMatches, "received zero collections")
 	c.Check(trashReqs.Count(), check.Equals, 4)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -254,7 +254,7 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
 	s.stub.serveFourDiskKeepServices()
 	indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
-	err := (&Balancer{}).Run(s.config, opts)
+	_, err := (&Balancer{}).Run(s.config, opts)
 	c.Check(err, check.IsNil)
 	c.Check(indexReqs.Count(), check.Equals, 0)
 	c.Check(trashReqs.Count(), check.Equals, 0)
@@ -271,7 +271,7 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
 	s.stub.serveFourDiskKeepServices()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	err := (&Balancer{}).Run(s.config, opts)
+	_, err := (&Balancer{}).Run(s.config, opts)
 	c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
 	c.Check(trashReqs.Count(), check.Equals, 0)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -289,7 +289,7 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	err := (&Balancer{}).Run(s.config, opts)
+	_, err := (&Balancer{}).Run(s.config, opts)
 	c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
 	c.Check(trashReqs.Count(), check.Equals, 4)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -308,7 +308,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	var bal Balancer
-	err := bal.Run(s.config, opts)
+	_, err := bal.Run(s.config, opts)
 	c.Check(err, check.IsNil)
 	c.Check(trashReqs.Count(), check.Equals, 0)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -332,7 +332,7 @@ func (s *runSuite) TestCommit(c *check.C) {
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	var bal Balancer
-	err := bal.Run(s.config, opts)
+	_, err := bal.Run(s.config, opts)
 	c.Check(err, check.IsNil)
 	c.Check(trashReqs.Count(), check.Equals, 8)
 	c.Check(pullReqs.Count(), check.Equals, 4)
@@ -362,13 +362,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
 	s.config.RunPeriod = arvados.Duration(time.Millisecond)
 	go RunForever(s.config, opts, stop)
 
-	// Each run should send 4 clear trash lists + 4 pull lists + 4
-	// trash lists. We should complete four runs in much less than
+	// Each run should send 4 pull lists + 4 trash lists. The
+	// first run should also send 4 empty trash lists at
+	// startup. We should complete all four runs in much less than
 	// a second.
 	for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
 		time.Sleep(time.Millisecond)
 	}
 	stop <- true
 	c.Check(pullReqs.Count() >= 16, check.Equals, true)
-	c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+	c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
 }
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index b090614..0793889 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -78,8 +78,10 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 			CommitTrash: true,
 			Logger:      log.New(logBuf, "", log.LstdFlags),
 		}
-		err := (&Balancer{}).Run(s.config, opts)
+		nextOpts, err := (&Balancer{}).Run(s.config, opts)
 		c.Check(err, check.IsNil)
+		c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
+		c.Check(nextOpts.CommitPulls, check.Equals, true)
 		if iter == 0 {
 			c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
 			c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 364bb3f..da4fb62 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -51,6 +51,12 @@ type RunOptions struct {
 	CommitTrash bool
 	Logger      *log.Logger
 	Dumper      *log.Logger
+
+	// SafeRendezvousState from the most recent balance operation,
+	// or "" if unknown. If this changes from one run to the next,
+	// we need to watch out for races. See
+	// (*Balancer)ClearTrashLists.
+	SafeRendezvousState string
 }
 
 var debugf = func(string, ...interface{}) {}
@@ -98,7 +104,7 @@ func main() {
 	if err != nil {
 		// (don't run)
 	} else if runOptions.Once {
-		err = (&Balancer{}).Run(config, runOptions)
+		_, err = (&Balancer{}).Run(config, runOptions)
 	} else {
 		err = RunForever(config, runOptions, nil)
 	}
@@ -138,7 +144,9 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
 			logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
 		}
 
-		err := (&Balancer{}).Run(config, runOptions)
+		bal := &Balancer{}
+		var err error
+		runOptions, err = bal.Run(config, runOptions)
 		if err != nil {
 			logger.Print("run failed: ", err)
 		} else {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list