[ARVADOS] updated: 87f2f44c70532acc70885c5b7f528ce4d45cc65d
Git user
git at public.curoverse.com
Wed May 18 20:08:52 EDT 2016
Summary of changes:
services/keep-balance/balance.go | 119 ++++++++++++++++++++----------
services/keep-balance/block_state.go | 14 +++-
services/keep-balance/change_set.go | 2 +-
services/keep-balance/example-config.json | 8 +-
services/keep-balance/integration_test.go | 2 +-
services/keep-balance/keep_service.go | 7 ++
services/keep-balance/main.go | 48 ++++++++++--
services/keep-balance/main_test.go | 25 +++++--
8 files changed, 165 insertions(+), 60 deletions(-)
via 87f2f44c70532acc70885c5b7f528ce4d45cc65d (commit)
from 92fd8da1e911a2ef39fb23d944f124d4264c3322 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 87f2f44c70532acc70885c5b7f528ce4d45cc65d
Author: Tom Clegg <tom at curoverse.com>
Date: Wed May 18 20:08:47 2016 -0400
9162: Load list of services from `arv ... list` json.
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index f45e346..38333fa 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -11,11 +11,19 @@ import (
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
)
+// Balancer compares the contents of keepstore servers with the
+// collections stored in Arvados, and issues pull/trash requests
+// needed to get (closer to) the optimal data layout.
+//
+// In the optimal data layout: every data block referenced by a
+// collection is replicated at least as many times as desired by the
+// collection; there are no unreferenced data blocks older than
+// BlobSignatureTTL; and all N existing replicas of a given data block
+// are in the N best positions in rendezvous probe order.
type Balancer struct {
*BlockStateMap
KeepServices map[string]*KeepService
DefaultReplication int
- ServiceTypes []string
Logger *log.Logger
Dumper *log.Logger
MinMtime int64
@@ -43,23 +51,19 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
return nil
}
-func (bal *Balancer) addKeepService(srv arvados.KeepService) error {
- for _, t := range bal.ServiceTypes {
- if t == srv.ServiceType {
- bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
- return nil
- }
- }
- bal.Logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
- return nil
-}
-
-func (bal *Balancer) Logf(f string, args ...interface{}) {
+func (bal *Balancer) logf(f string, args ...interface{}) {
if bal.Logger != nil {
bal.Logger.Printf(f, args...)
}
}
+// CheckSanityEarly checks for configuration and runtime errors that
+// can be detected before GetCurrentState() and ComputeChangeSets()
+// are called.
+//
+// If it returns an error, it is pointless to run GetCurrentState or
+// ComputeChangeSets: after doing so, the statistics would be
+// meaningless and it would be dangerous to run any Commit methods.
func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
u, err := c.CurrentUser()
if err != nil {
@@ -68,13 +72,22 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
if !u.IsActive || !u.IsAdmin {
return fmt.Errorf("Current user (%s) is not an active admin user", u.UUID)
}
+ for _, srv := range bal.KeepServices {
+ if srv.ServiceType == "proxy" {
+ return fmt.Errorf("Balancing a proxy service (%s) must be a config error", srv)
+ }
+ }
return nil
}
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+//
+// If it returns an error, it is dangerous to run any Commit methods.
func (bal *Balancer) CheckSanityLate() error {
if bal.errors != nil {
for _, err := range bal.errors {
- bal.Logf("deferred error: %v", err)
+ bal.logf("deferred error: %v", err)
}
return fmt.Errorf("cannot proceed safely after deferred errors")
}
@@ -102,6 +115,28 @@ func (bal *Balancer) CheckSanityLate() error {
return nil
}
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ for _, srv := range srvList.Items {
+ bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+ }
+ return nil
+}
+
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+ bal.KeepServices = make(map[string]*KeepService)
+ return c.EachKeepService(func(srv arvados.KeepService) error {
+ for _, t := range okTypes {
+ if t == srv.ServiceType {
+ bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+ return nil
+ }
+ }
+ bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+ return nil
+ })
+}
+
// GetCurrentState determines the current replication state, and the
// desired replication level, for every block that is either
// retrievable or referenced.
@@ -116,7 +151,6 @@ func (bal *Balancer) CheckSanityLate() error {
func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
- bal.KeepServices = make(map[string]*KeepService)
dd, err := c.DiscoveryDocument()
if err != nil {
@@ -128,22 +162,19 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
errs := make(chan error)
wg := sync.WaitGroup{}
- if err := c.EachKeepService(bal.addKeepService); err != nil {
- return err
- }
for _, srv := range bal.KeepServices {
wg.Add(1)
go func(srv *KeepService) {
defer wg.Done()
- bal.Logf("%s: retrieve index", srv)
+ bal.logf("%s: retrieve index", srv)
idx, err := srv.Index(c, "")
if err != nil {
errs <- fmt.Errorf("%s: %v", srv, err)
return
}
- bal.Logf("%s: add replicas to map", srv)
+ bal.logf("%s: add replicas to map", srv)
bal.BlockStateMap.AddReplicas(srv, idx)
- bal.Logf("%s: done", srv)
+ bal.logf("%s: done", srv)
}(srv)
}
@@ -179,7 +210,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
}
return nil
}, func(done, total int) {
- bal.Logf("collections: %d/%d", done, total)
+ bal.logf("collections: %d/%d", done, total)
})
close(collQ)
if err != nil {
@@ -203,8 +234,10 @@ func (bal *Balancer) setupServiceRoots() {
// ComputeChangeSets compares, for each known block, the current and
// desired replication states. If it is possible to get closer to the
-// desired state by copying or deleting blocks, it submits those
-// changes to the relevant KeepServices' ChangeSets.
+// desired state by copying or deleting blocks, it adds those changes
+// to the relevant KeepServices' ChangeSets.
+//
+// It does not actually apply any of the computed changes.
func (bal *Balancer) ComputeChangeSets() {
defer timeMe(bal.Logger, "ComputeChangeSets")()
bal.setupServiceRoots()
@@ -384,25 +417,32 @@ func (bal *Balancer) getStatistics() (s balancerStats) {
return
}
+// PrintStatistics writes statistics about the computed changes to
+// bal.Logger. It should not be called until ComputeChangeSets has
+// finished.
func (bal *Balancer) PrintStatistics() {
s := bal.getStatistics()
- bal.Logf("===")
- bal.Logf("%s lost (0=have<want)", s.lost)
- bal.Logf("%s underreplicated (0<have<want)", s.underrep)
- bal.Logf("%s just right (have=want)", s.justright)
- bal.Logf("%s overreplicated (have>want>0)", s.overrep)
- bal.Logf("%s unreferenced (have>want=0, new)", s.unref)
- bal.Logf("%s garbage (have>want=0, old)", s.garbage)
- bal.Logf("===")
- bal.Logf("%s total commitment (excluding unreferenced)", s.desired)
- bal.Logf("%s total usage", s.current)
- bal.Logf("===")
+ bal.logf("===")
+ bal.logf("%s lost (0=have<want)", s.lost)
+ bal.logf("%s underreplicated (0<have<want)", s.underrep)
+ bal.logf("%s just right (have=want)", s.justright)
+ bal.logf("%s overreplicated (have>want>0)", s.overrep)
+ bal.logf("%s unreferenced (have>want=0, new)", s.unref)
+ bal.logf("%s garbage (have>want=0, old)", s.garbage)
+ bal.logf("===")
+ bal.logf("%s total commitment (excluding unreferenced)", s.desired)
+ bal.logf("%s total usage", s.current)
+ bal.logf("===")
for _, srv := range bal.KeepServices {
- bal.Logf("%s: %v\n", srv, srv.ChangeSet)
+ bal.logf("%s: %v\n", srv, &srv.ChangeSet)
}
- bal.Logf("===")
+ bal.logf("===")
}
+// CommitPulls sends the computed lists of pull requests to the
+// keepstore servers. This has the effect of increasing replication of
+// existing blocks that are either underreplicated or poorly
+// distributed according to rendezvous hashing.
func (bal *Balancer) CommitPulls(c *arvados.Client) error {
return bal.commitAsync(c, "send pull list",
func(srv *KeepService) error {
@@ -410,6 +450,9 @@ func (bal *Balancer) CommitPulls(c *arvados.Client) error {
})
}
+// CommitTrash sends the computed lists of trash requests to the
+// keepstore servers. This has the effect of deleting blocks that are
+// overreplicated or unreferenced.
func (bal *Balancer) CommitTrash(c *arvados.Client) error {
return bal.commitAsync(c, "send trash list",
func(srv *KeepService) error {
@@ -437,7 +480,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
var lastErr error
for _ = range bal.KeepServices {
if err := <-errs; err != nil {
- bal.Logf("%v", err)
+ bal.logf("%v", err)
lastErr = err
}
}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index 4cca976..eaab002 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -6,21 +6,26 @@ import (
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
)
+// Replica is a file on disk (or equivalent), as reported in a
+// keepstore index response.
type Replica struct {
*KeepService
Mtime int64
}
+// BlockState indicates the number of desired replicas (according to
+// the collections we know about) and the replicas found on disk
+// (according to the keepstore indexes we know about).
type BlockState struct {
Replicas []Replica
Desired int
}
-func (bs *BlockState) AddReplica(r Replica) {
+func (bs *BlockState) addReplica(r Replica) {
bs.Replicas = append(bs.Replicas, r)
}
-func (bs *BlockState) IncreaseDesired(n int) {
+func (bs *BlockState) increaseDesired(n int) {
if bs.Desired < n {
bs.Desired = n
}
@@ -33,6 +38,7 @@ type BlockStateMap struct {
mutex sync.Mutex
}
+// NewBlockStateMap returns a newly allocated BlockStateMap.
func NewBlockStateMap() *BlockStateMap {
return &BlockStateMap{
entries: make(map[arvados.SizedDigest]*BlockState),
@@ -68,7 +74,7 @@ func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServic
defer bsm.mutex.Unlock()
for _, ent := range idx {
- bsm.get(ent.SizedDigest).AddReplica(Replica{
+ bsm.get(ent.SizedDigest).addReplica(Replica{
KeepService: srv,
Mtime: ent.Mtime,
})
@@ -82,6 +88,6 @@ func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
defer bsm.mutex.Unlock()
for _, blkid := range blocks {
- bsm.get(blkid).IncreaseDesired(n)
+ bsm.get(blkid).increaseDesired(n)
}
}
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
index 3edb21e..fa2a5e3 100644
--- a/services/keep-balance/change_set.go
+++ b/services/keep-balance/change_set.go
@@ -68,7 +68,7 @@ func (cs *ChangeSet) AddTrash(t Trash) {
}
// String implements fmt.Stringer.
-func (cs ChangeSet) String() string {
+func (cs *ChangeSet) String() string {
cs.mutex.Lock()
defer cs.mutex.Unlock()
return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
index b7cecee..07480a6 100644
--- a/services/keep-balance/example-config.json
+++ b/services/keep-balance/example-config.json
@@ -6,7 +6,13 @@
},
"CommitPull": true,
"CommitTrash": false,
- "ServiceTypes": [
+ "KeepServiceTypes": [
"disk"
]
+ "KeepServiceList": {
+ "items": [
+ {"uuid":"zzzzz-bi6l4-0123456789abcdef","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107},
+ {"uuid":"zzzzz-bi6l4-123456789abcdef0","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107}
+ ]
+ }
}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index d2b2a2d..6fa65e0 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -65,7 +65,7 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
AuthToken: arvadostest.DataManagerToken,
Insecure: true,
},
- ServiceTypes: []string{"disk"},
+ KeepServiceTypes: []string{"disk"},
}
}
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index 1a502e0..3d293f7 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -10,25 +10,32 @@ import (
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
)
+// KeepService represents a keepstore server that is being rebalanced.
type KeepService struct {
arvados.KeepService
ChangeSet
}
+// String implements fmt.Stringer.
func (srv *KeepService) String() string {
return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
}
var ksSchemes = map[bool]string{false: "http", true: "https"}
+// URLBase returns scheme://host:port for this server.
func (srv *KeepService) URLBase() string {
return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
}
+// CommitPulls sends the current list of pull requests to the storage
+// server (even if the list is empty).
func (srv *KeepService) CommitPulls(c *arvados.Client) error {
return srv.put(c, "pull", srv.ChangeSet.Pulls)
}
+// CommitTrash sends the current list of trash requests to the storage
+// server (even if the list is empty).
func (srv *KeepService) CommitTrash(c *arvados.Client) error {
return srv.put(c, "trash", srv.ChangeSet.Trashes)
}
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 0d7e56c..a301f8e 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"flag"
+ "fmt"
"io/ioutil"
"log"
"os"
@@ -10,11 +11,24 @@ import (
"git.curoverse.com/arvados.git/sdk/go/x/arvados"
)
+// Config indicates which servers are to be balanced. Config is loaded
+// from a JSON config file.
type Config struct {
- Client arvados.Client
- ServiceTypes []string
+ // Arvados API endpoint and credentials.
+ Client arvados.Client
+
+ // List of service types (e.g., "disk") to balance.
+ KeepServiceTypes []string
+
+ KeepServiceList arvados.KeepServiceList
}
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "dry run" is a runtime option rather than a config item because it
+// doesn't affect the balancing decisions being made.
+//
+// RunOptions fields are controlled by command line flags.
type RunOptions struct {
CommitPulls bool
CommitTrash bool
@@ -31,6 +45,9 @@ func main() {
configPath := flag.String("config",
os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
"configuration file")
+ serviceListPath := flag.String("config.KeepServiceList", "",
+ "list of keep services to balance, as given by `arv keep_service list` "+
+ "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
@@ -39,10 +56,9 @@ func main() {
debugFlag := flag.Bool("debug", false, "enable debug messages")
flag.Parse()
- if buf, err := ioutil.ReadFile(*configPath); err != nil {
- log.Fatalf("Reading %q: %v", *configPath, err)
- } else if err = json.Unmarshal(buf, &config); err != nil {
- log.Fatalf("Decoding %q: %v", *configPath, err)
+ mustReadJSON(&config, *configPath)
+ if *serviceListPath != "" {
+ mustReadJSON(&config.KeepServiceList, *serviceListPath)
}
if *debugFlag {
@@ -61,6 +77,14 @@ func main() {
}
}
+func mustReadJSON(dst interface{}, path string) {
+ if buf, err := ioutil.ReadFile(path); err != nil {
+ log.Fatalf("Reading %q: %v", path, err)
+ } else if err = json.Unmarshal(buf, dst); err != nil {
+ log.Fatalf("Decoding %q: %v", path, err)
+ }
+}
+
func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
if runOptions.Logger == nil {
runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
@@ -68,9 +92,19 @@ func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
bal = &Balancer{
Logger: runOptions.Logger,
Dumper: runOptions.Dumper,
- ServiceTypes: config.ServiceTypes,
}
+ if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+ err = fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+ } else if len(config.KeepServiceList.Items) > 0 {
+ err = bal.SetKeepServices(config.KeepServiceList)
+ } else {
+ err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+ }
+ if err != nil {
+ return
+ }
+
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index d1d13c3..a44a575 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -36,6 +36,8 @@ func (rt *reqTracker) Add(req *http.Request) int {
return len(rt.reqs)
}
+// stubServer is an HTTP transport that intercepts and processes all
+// requests using its own handlers.
type stubServer struct {
mux *http.ServeMux
srv *httptest.Server
@@ -44,7 +46,12 @@ type stubServer struct {
logf func(string, ...interface{})
}
-func (s *stubServer) start() *http.Client {
+// Start initializes the stub server and returns an *http.Client that
+// uses the stub server to handle all requests.
+//
+// A stubServer that has been started should eventually be shut down
+// with Close().
+func (s *stubServer) Start() *http.Client {
// Set up a config.Client that forwards all requests to s.mux
// via s.srv. Test cases will attach handlers to s.mux to get
// the desired responses.
@@ -69,7 +76,8 @@ func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
Body: ioutil.NopCloser(w.Body)}, nil
}
-func (s *stubServer) stop() {
+// Close releases resources used by the server.
+func (s *stubServer) Close() {
s.srv.Close()
}
@@ -160,6 +168,7 @@ type mainSuite struct {
config Config
}
+// make a log.Logger that writes to the current test's c.Log().
func (s *mainSuite) logger(c *check.C) *log.Logger {
r, w := io.Pipe()
go func() {
@@ -183,16 +192,16 @@ func (s *mainSuite) logger(c *check.C) *log.Logger {
func (s *mainSuite) SetUpTest(c *check.C) {
s.config = Config{
Client: arvados.Client{
- AuthToken: "xyzzy",
- APIHost: "zzzzz.arvadosapi.com",
- Client: s.stub.start()},
- ServiceTypes: []string{"disk"}}
+ AuthToken: "xyzzy",
+ APIHost: "zzzzz.arvadosapi.com",
+ Client: s.stub.Start()},
+ KeepServiceTypes: []string{"disk"}}
s.stub.serveDiscoveryDoc()
s.stub.logf = c.Logf
}
func (s *mainSuite) TearDownTest(c *check.C) {
- s.stub.stop()
+ s.stub.Close()
}
func (s *mainSuite) TestRefuseZeroCollections(c *check.C) {
@@ -217,7 +226,7 @@ func (s *mainSuite) TestServiceTypes(c *check.C) {
CommitTrash: true,
Logger: s.logger(c),
}
- s.config.ServiceTypes = []string{"unlisted-type"}
+ s.config.KeepServiceTypes = []string{"unlisted-type"}
s.stub.serveCurrentUserAdmin()
s.stub.serveZeroCollections()
s.stub.serveFourDiskKeepServices()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list