[ARVADOS] updated: 177bfc7d6088040164b1242247e13a7c5bc45f0c
Git user
git at public.curoverse.com
Fri May 20 11:22:09 EDT 2016
Summary of changes:
sdk/go/x/arvados/collection.go | 30 +++++++++-----
services/keep-balance/balance_test.go | 8 ++--
services/keep-balance/block_state.go | 2 +-
services/keep-balance/change_set_test.go | 2 +-
services/keep-balance/example-config.json | 3 +-
services/keep-balance/main.go | 67 +++++++++++++++++++++++++++++--
6 files changed, 89 insertions(+), 23 deletions(-)
via 177bfc7d6088040164b1242247e13a7c5bc45f0c (commit)
via 4ed795f09ea2c8a1c3d11faf564183ebf7d70a86 (commit)
from a0c3e9caf12ba9873c28aae990c1d2abf480f34b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 177bfc7d6088040164b1242247e13a7c5bc45f0c
Author: Tom Clegg <tom at curoverse.com>
Date: Fri May 20 11:22:06 2016 -0400
9162: Add -once flag and RunPeriod config
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index 1facb7c..4d2699d 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -169,13 +169,13 @@ func (bal *balancerSuite) TestDecreaseReplBlockTooNew(c *check.C) {
bal.try(c, tester{
desired: 2,
current: slots{0, 1, 2},
- timestamps: []int64{oldTime, newTime, newTime+1}})
+ timestamps: []int64{oldTime, newTime, newTime + 1}})
// The best replicas are too new to delete, but the excess
// replica is old enough.
bal.try(c, tester{
- desired: 2,
- current: slots{0, 1, 2},
- timestamps: []int64{newTime, newTime+1, oldTime},
+ desired: 2,
+ current: slots{0, 1, 2},
+ timestamps: []int64{newTime, newTime + 1, oldTime},
shouldTrash: slots{2}})
}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index eaab002..0a4cb12 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -35,7 +35,7 @@ func (bs *BlockState) increaseDesired(n int) {
// map[arvados.SizedDigest]*BlockState.
type BlockStateMap struct {
entries map[arvados.SizedDigest]*BlockState
- mutex sync.Mutex
+ mutex sync.Mutex
}
// NewBlockStateMap returns a newly allocated BlockStateMap.
diff --git a/services/keep-balance/change_set_test.go b/services/keep-balance/change_set_test.go
index 50a2715..2056db9 100644
--- a/services/keep-balance/change_set_test.go
+++ b/services/keep-balance/change_set_test.go
@@ -29,7 +29,7 @@ func (s *changeSetSuite) TestJSONFormat(c *check.C) {
buf, err = json.Marshal([]Trash{{
SizedDigest: arvados.SizedDigest("acbd18db4cc2f85cedef654fccc4a4d8+3"),
- Mtime: 123456789}})
+ Mtime: 123456789}})
c.Check(err, check.IsNil)
c.Check(string(buf), check.Equals, `[{"locator":"acbd18db4cc2f85cedef654fccc4a4d8","block_mtime":123456789}]`)
}
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
index 07480a6..4f47219 100644
--- a/services/keep-balance/example-config.json
+++ b/services/keep-balance/example-config.json
@@ -4,8 +4,7 @@
"AuthToken": "zzzzzz",
"Insecure": false
},
- "CommitPull": true,
- "CommitTrash": false,
+ "RunPeriod": "600s",
"KeepServiceTypes": [
"disk"
]
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 6b11311..c756d4b 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -7,6 +7,9 @@ import (
"io/ioutil"
"log"
"os"
+ "os/signal"
+ "syscall"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
)
@@ -21,6 +24,9 @@ type Config struct {
KeepServiceTypes []string
KeepServiceList arvados.KeepServiceList
+
+ // How often to check
+ RunPeriod duration
}
// RunOptions controls runtime behavior. The flags/options that belong
@@ -30,6 +36,7 @@ type Config struct {
//
// RunOptions fields are controlled by command line flags.
type RunOptions struct {
+ Once bool
CommitPulls bool
CommitTrash bool
Logger *log.Logger
@@ -48,6 +55,8 @@ func main() {
serviceListPath := flag.String("config.KeepServiceList", "",
"`path` of json file with list of keep services to balance, as given by \"arv keep_service list\" "+
"(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
+ once := flag.Bool("once", false,
+ "balance once and then exit, instead of balancing on a timer (config[\"RunPeriod\"]) and SIGUSR1")
flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
@@ -60,6 +69,9 @@ func main() {
if *serviceListPath != "" {
mustReadJSON(&config.KeepServiceList, *serviceListPath)
}
+ if !*once && config.RunPeriod == duration(0) {
+ log.Fatal("You must either use the -once flag or specify a RunPeriod in your config file")
+ }
if *debugFlag {
debugf = log.Printf
@@ -72,7 +84,13 @@ func main() {
if *dumpFlag {
runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
}
- if _, err := Run(config, runOptions); err != nil {
+ var err error
+ if *once {
+ _, err = Run(config, runOptions)
+ } else {
+ err = RunForever(config, runOptions)
+ }
+ if err != nil {
log.Fatal(err)
}
}
@@ -85,13 +103,39 @@ func mustReadJSON(dst interface{}, path string) {
}
}
+func RunForever(config Config, runOptions RunOptions) error {
+ ticker := time.NewTicker(time.Duration(config.RunPeriod))
+
+ // The unbuffered channel here means we only hear SIGUSR1 if
+ // it arrives while we're waiting in select{}.
+ sigUSR1 := make(chan os.Signal)
+ signal.Notify(sigUSR1, syscall.SIGUSR1)
+
+ log.Printf("starting in service mode, running every %v and on SIGUSR1", config.RunPeriod)
+
+ for {
+ _, err := Run(config, runOptions)
+ if err != nil {
+ log.Print("run failed: ", err)
+ } else {
+ log.Print("run succeeded")
+ }
+
+ select {
+ case <-ticker.C:
+ case <-sigUSR1:
+ }
+ log.Print("waking up")
+ }
+}
+
func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
if runOptions.Logger == nil {
runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
bal = &Balancer{
- Logger: runOptions.Logger,
- Dumper: runOptions.Dumper,
+ Logger: runOptions.Logger,
+ Dumper: runOptions.Dumper,
}
if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
@@ -104,7 +148,7 @@ func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
if err != nil {
return
}
-
+
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
}
@@ -128,3 +172,18 @@ func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
}
return
}
+
+type duration time.Duration
+
+func (d *duration) UnmarshalJSON(data []byte) error {
+ if data[0] == '"' {
+ dur, err := time.ParseDuration(string(data[1 : len(data)-1]))
+ *d = duration(dur)
+ return err
+ }
+ return fmt.Errorf("duration must be given as a string like \"600s\" or \"1h30m\"")
+}
+
+func (d duration) String() string {
+ return time.Duration(d).String()
+}
commit 4ed795f09ea2c8a1c3d11faf564183ebf7d70a86
Author: Tom Clegg <tom at curoverse.com>
Date: Fri May 20 09:48:12 2016 -0400
9162: Re-fetch total expected collections for sanity check.
diff --git a/sdk/go/x/arvados/collection.go b/sdk/go/x/arvados/collection.go
index b43cb98..b247d9c 100644
--- a/sdk/go/x/arvados/collection.go
+++ b/sdk/go/x/arvados/collection.go
@@ -61,6 +61,13 @@ type CollectionList struct {
Limit int `json:"limit"`
}
+func (c *Client) countCollections(params ResourceListParams) (int, error) {
+ var page CollectionList
+ var zero int
+ err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
+ return page.ItemsAvailable, err
+}
+
// EachCollection calls f once for every readable
// collection. EachCollection stops if it encounters an error, such as
// f returning a non-nil error.
@@ -73,15 +80,9 @@ func (c *Client) EachCollection(f func(Collection) error, progress func(done, to
progress = func(_, _ int) {}
}
- var expectCount int
- {
- var page CollectionList
- var zero int
- err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, ResourceListParams{Limit: &zero})
- if err != nil {
- return err
- }
- expectCount = page.ItemsAvailable
+ expectCount, err := c.countCollections(ResourceListParams{})
+ if err != nil {
+ return err
}
limit := 1000
@@ -133,8 +134,15 @@ func (c *Client) EachCollection(f func(Collection) error, progress func(done, to
}}
}
progress(callCount, expectCount)
- if callCount < expectCount {
- return fmt.Errorf("Retrieved only %d collections, expected %d", callCount, expectCount)
+
+ if checkCount, err := c.countCollections(ResourceListParams{Filters: []Filter{{
+ Attr: "modified_at",
+ Operator: ">=",
+ Operand: filterTime}}}); err != nil {
+ return err
+ } else if callCount < checkCount {
+ return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, expectCount)
}
+
return nil
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list