[ARVADOS] updated: 177bfc7d6088040164b1242247e13a7c5bc45f0c

Git user git at public.curoverse.com
Fri May 20 11:22:09 EDT 2016


Summary of changes:
 sdk/go/x/arvados/collection.go            | 30 +++++++++-----
 services/keep-balance/balance_test.go     |  8 ++--
 services/keep-balance/block_state.go      |  2 +-
 services/keep-balance/change_set_test.go  |  2 +-
 services/keep-balance/example-config.json |  3 +-
 services/keep-balance/main.go             | 67 +++++++++++++++++++++++++++++--
 6 files changed, 89 insertions(+), 23 deletions(-)

       via  177bfc7d6088040164b1242247e13a7c5bc45f0c (commit)
       via  4ed795f09ea2c8a1c3d11faf564183ebf7d70a86 (commit)
      from  a0c3e9caf12ba9873c28aae990c1d2abf480f34b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 177bfc7d6088040164b1242247e13a7c5bc45f0c
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri May 20 11:22:06 2016 -0400

    9162: Add -once flag and RunPeriod config

diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index 1facb7c..4d2699d 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -169,13 +169,13 @@ func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
 	bal.try(c, tester{
 		desired:    2,
 		current:    slots{0, 1, 2},
-		timestamps: []int64{oldTime, newTime, newTime+1}})
+		timestamps: []int64{oldTime, newTime, newTime + 1}})
 	// The best replicas are too new to delete, but the excess
 	// replica is old enough.
 	bal.try(c, tester{
-		desired:    2,
-		current:    slots{0, 1, 2},
-		timestamps: []int64{newTime, newTime+1, oldTime},
+		desired:     2,
+		current:     slots{0, 1, 2},
+		timestamps:  []int64{newTime, newTime + 1, oldTime},
 		shouldTrash: slots{2}})
 }
 
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index eaab002..0a4cb12 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -35,7 +35,7 @@ func (bs *BlockState) increaseDesired(n int) {
 // map[arvados.SizedDigest]*BlockState.
 type BlockStateMap struct {
 	entries map[arvados.SizedDigest]*BlockState
-	mutex  sync.Mutex
+	mutex   sync.Mutex
 }
 
 // NewBlockStateMap returns a newly allocated BlockStateMap.
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
index 50a2715..2056db9 100644
--- a/services/keep-balance/change_set_test.go
+++ b/services/keep-balance/change_set_test.go
@@ -29,7 +29,7 @@ func (s *changeSetSuite) TestJSONFormat(c *check.C) {
 
 	buf, err = json.Marshal([]Trash{{
 		SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
-		Mtime:      123456789}})
+		Mtime:       123456789}})
 	c.Check(err, check.IsNil)
 	c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
 }
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
index 07480a6..4f47219 100644
--- a/services/keep-balance/example-config.json
+++ b/services/keep-balance/example-config.json
@@ -4,8 +4,7 @@
         "AuthToken": "zzzzzz",
         "Insecure": false
     },
-    "CommitPull": true,
-    "CommitTrash": false,
+    "RunPeriod": "600s",
     "KeepServiceTypes": [
         "disk"
     ]
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 6b11311..c756d4b 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -7,6 +7,9 @@ import (
 	"io/ioutil"
 	"log"
 	"os"
+	"os/signal"
+	"syscall"
+	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 )
@@ -21,6 +24,9 @@ type Config struct {
 	KeepServiceTypes []string
 
 	KeepServiceList arvados.KeepServiceList
+
+	// How often to check
+	RunPeriod duration
 }
 
 // RunOptions controls runtime behavior. The flags/options that belong
@@ -30,6 +36,7 @@ type Config struct {
 //
 // RunOptions fields are controlled by command line flags.
 type RunOptions struct {
+	Once        bool
 	CommitPulls bool
 	CommitTrash bool
 	Logger      *log.Logger
@@ -48,6 +55,8 @@ func main() {
 	serviceListPath := flag.String("config.KeepServiceList", "",
 		"`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
 			"(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
+	once := flag.Bool("once", false,
+		"balance once and then exit, instead of balancing on a timer (config[\"RunPeriod\"]) and SIGUSR1")
 	flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
 		"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
 	flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
@@ -60,6 +69,9 @@ func main() {
 	if *serviceListPath != "" {
 		mustReadJSON(&config.KeepServiceList, *serviceListPath)
 	}
+	if !*once && config.RunPeriod == duration(0) {
+		log.Fatal("You must either use the -once flag or specify a RunPeriod in your config file")
+	}
 
 	if *debugFlag {
 		debugf = log.Printf
@@ -72,7 +84,13 @@ func main() {
 	if *dumpFlag {
 		runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
 	}
-	if _, err := Run(config, runOptions); err != nil {
+	var err error
+	if *once {
+		_, err = Run(config, runOptions)
+	} else {
+		err = RunForever(config, runOptions)
+	}
+	if err != nil {
 		log.Fatal(err)
 	}
 }
@@ -85,13 +103,39 @@ func mustReadJSON(dst interface{}, path string) {
 	}
 }
 
+func RunForever(config Config, runOptions RunOptions) error {
+	ticker := time.NewTicker(time.Duration(config.RunPeriod))
+
+	// The unbuffered channel here means we only hear SIGUSR1 if
+	// it arrives while we're waiting in select{}.
+	sigUSR1 := make(chan os.Signal)
+	signal.Notify(sigUSR1, syscall.SIGUSR1)
+
+	log.Printf("starting in service mode, running every %v and on SIGUSR1", config.RunPeriod)
+
+	for {
+		_, err := Run(config, runOptions)
+		if err != nil {
+			log.Print("run failed: ", err)
+		} else {
+			log.Print("run succeeded")
+		}
+
+		select {
+		case <-ticker.C:
+		case <-sigUSR1:
+		}
+		log.Print("waking up")
+	}
+}
+
 func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
 	if runOptions.Logger == nil {
 		runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
 	}
 	bal = &Balancer{
-		Logger:       runOptions.Logger,
-		Dumper:       runOptions.Dumper,
+		Logger: runOptions.Logger,
+		Dumper: runOptions.Dumper,
 	}
 
 	if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
@@ -104,7 +148,7 @@ func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
 	if err != nil {
 		return
 	}
-	
+
 	if err = bal.CheckSanityEarly(&config.Client); err != nil {
 		return
 	}
@@ -128,3 +172,18 @@ func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
 	}
 	return
 }
+
+type duration time.Duration
+
+func (d *duration) UnmarshalJSON(data []byte) error {
+	if data[0] == '"' {
+		dur, err := time.ParseDuration(string(data[1 : len(data)-1]))
+		*d = duration(dur)
+		return err
+	}
+	return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"")
+}
+
+func (d duration) String() string {
+	return time.Duration(d).String()
+}

commit 4ed795f09ea2c8a1c3d11faf564183ebf7d70a86
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri May 20 09:48:12 2016 -0400

    9162: Re-fetch total expected collections for sanity check.

diff --git a/sdk/go/x/arvados/collection.go b/sdk/go/x/arvados/collection.go
index b43cb98..b247d9c 100644
--- a/sdk/go/x/arvados/collection.go
+++ b/sdk/go/x/arvados/collection.go
@@ -61,6 +61,13 @@ type CollectionList struct {
 	Limit          int          `json:"limit"`
 }
 
+func (c *Client) countCollections(params ResourceListParams) (int, error) {
+	var page CollectionList
+	var zero int
+	err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
+	return page.ItemsAvailable, err
+}
+
 // EachCollection calls f once for every readable
 // collection. EachCollection stops if it encounters an error, such as
 // f returning a non-nil error.
@@ -73,15 +80,9 @@ func (c *Client) EachCollection(f func(Collection) error, progress func(done, to
 		progress = func(_, _ int) {}
 	}
 
-	var expectCount int
-	{
-		var page CollectionList
-		var zero int
-		err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
-		if err != nil {
-			return err
-		}
-		expectCount = page.ItemsAvailable
+	expectCount, err := c.countCollections(ResourceListParams{})
+	if err != nil {
+		return err
 	}
 
 	limit := 1000
@@ -133,8 +134,15 @@ func (c *Client) EachCollection(f func(Collection) error, progress func(done, to
 		}}
 	}
 	progress(callCount, expectCount)
-	if callCount < expectCount {
-		return fmt.Errorf("Retrieved only %d collections, expected %d", callCount, expectCount)
+
+	if checkCount, err := c.countCollections(ResourceListParams{Filters: []Filter{{
+		Attr:     "modified_at",
+		Operator: ">=",
+		Operand:  filterTime}}}); err != nil {
+		return err
+	} else if callCount < checkCount {
+		return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, expectCount)
 	}
+
 	return nil
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list