[ARVADOS] updated: 87f2f44c70532acc70885c5b7f528ce4d45cc65d

Git user git at public.curoverse.com
Wed May 18 20:08:52 EDT 2016


Summary of changes:
 services/keep-balance/balance.go          | 119 ++++++++++++++++++++----------
 services/keep-balance/block_state.go      |  14 +++-
 services/keep-balance/change_set.go       |   2 +-
 services/keep-balance/example-config.json |   8 +-
 services/keep-balance/integration_test.go |   2 +-
 services/keep-balance/keep_service.go     |   7 ++
 services/keep-balance/main.go             |  48 ++++++++++--
 services/keep-balance/main_test.go        |  25 +++++--
 8 files changed, 165 insertions(+), 60 deletions(-)

       via  87f2f44c70532acc70885c5b7f528ce4d45cc65d (commit)
      from  92fd8da1e911a2ef39fb23d944f124d4264c3322 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 87f2f44c70532acc70885c5b7f528ce4d45cc65d
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed May 18 20:08:47 2016 -0400

    9162: Load list of services from `arv ... list` json.

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index f45e346..38333fa 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -11,11 +11,19 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 )
 
+// Balancer compares the contents of keepstore servers with the
+// collections stored in Arvados, and issues pull/trash requests
+// needed to get (closer to) the optimal data layout.
+//
+// In the optimal data layout: every data block referenced by a
+// collection is replicated at least as many times as desired by the
+// collection; there are no unreferenced data blocks older than
+// BlobSignatureTTL; and all N existing replicas of a given data block
+// are in the N best positions in rendezvous probe order.
 type Balancer struct {
 	*BlockStateMap
 	KeepServices       map[string]*KeepService
 	DefaultReplication int
-	ServiceTypes       []string
 	Logger             *log.Logger
 	Dumper             *log.Logger
 	MinMtime           int64
@@ -43,23 +51,19 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
 	return nil
 }
 
-func (bal *Balancer) addKeepService(srv arvados.KeepService) error {
-	for _, t := range bal.ServiceTypes {
-		if t == srv.ServiceType {
-			bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
-			return nil
-		}
-	}
-	bal.Logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
-	return nil
-}
-
-func (bal *Balancer) Logf(f string, args ...interface{}) {
+func (bal *Balancer) logf(f string, args ...interface{}) {
 	if bal.Logger != nil {
 		bal.Logger.Printf(f, args...)
 	}
 }
 
+// CheckSanityEarly checks for configuration and runtime errors that
+// can be detected before GetCurrentState() and ComputeChangeSets()
+// are called.
+//
+// If it returns an error, it is pointless to run GetCurrentState or
+// ComputeChangeSets: after doing so, the statistics would be
+// meaningless and it would be dangerous to run any Commit methods.
 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
 	u, err := c.CurrentUser()
 	if err != nil {
@@ -68,13 +72,22 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
 	if !u.IsActive || !u.IsAdmin {
 		return fmt.Errorf("Current user (%s) is not an active admin user", u.UUID)
 	}
+	for _, srv := range bal.KeepServices {
+		if srv.ServiceType == "proxy" {
+			return fmt.Errorf("Balancing a proxy service (%s) must be a config error", srv)
+		}
+	}
 	return nil
 }
 
+// CheckSanityLate checks for configuration and runtime errors after
+// GetCurrentState() and ComputeChangeSets() have finished.
+//
+// If it returns an error, it is dangerous to run any Commit methods.
 func (bal *Balancer) CheckSanityLate() error {
 	if bal.errors != nil {
 		for _, err := range bal.errors {
-			bal.Logf("deferred error: %v", err)
+			bal.logf("deferred error: %v", err)
 		}
 		return fmt.Errorf("cannot proceed safely after deferred errors")
 	}
@@ -102,6 +115,28 @@ func (bal *Balancer) CheckSanityLate() error {
 	return nil
 }
 
+func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
+	bal.KeepServices = make(map[string]*KeepService)
+	for _, srv := range srvList.Items {
+		bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+	}
+	return nil
+}
+
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+	bal.KeepServices = make(map[string]*KeepService)
+	return c.EachKeepService(func(srv arvados.KeepService) error {
+		for _, t := range okTypes {
+			if t == srv.ServiceType {
+				bal.KeepServices[srv.UUID] = &KeepService{KeepService: srv}
+				return nil
+			}
+		}
+		bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
+		return nil
+	})
+}
+
 // GetCurrentState determines the current replication state, and the
 // desired replication level, for every block that is either
 // retrievable or referenced.
@@ -116,7 +151,6 @@ func (bal *Balancer) CheckSanityLate() error {
 func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
 	defer timeMe(bal.Logger, "GetCurrentState")()
 	bal.BlockStateMap = NewBlockStateMap()
-	bal.KeepServices = make(map[string]*KeepService)
 
 	dd, err := c.DiscoveryDocument()
 	if err != nil {
@@ -128,22 +162,19 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
 	errs := make(chan error)
 	wg := sync.WaitGroup{}
 
-	if err := c.EachKeepService(bal.addKeepService); err != nil {
-		return err
-	}
 	for _, srv := range bal.KeepServices {
 		wg.Add(1)
 		go func(srv *KeepService) {
 			defer wg.Done()
-			bal.Logf("%s: retrieve index", srv)
+			bal.logf("%s: retrieve index", srv)
 			idx, err := srv.Index(c, "")
 			if err != nil {
 				errs <- fmt.Errorf("%s: %v", srv, err)
 				return
 			}
-			bal.Logf("%s: add replicas to map", srv)
+			bal.logf("%s: add replicas to map", srv)
 			bal.BlockStateMap.AddReplicas(srv, idx)
-			bal.Logf("%s: done", srv)
+			bal.logf("%s: done", srv)
 		}(srv)
 	}
 
@@ -179,7 +210,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
 				}
 				return nil
 			}, func(done, total int) {
-				bal.Logf("collections: %d/%d", done, total)
+				bal.logf("collections: %d/%d", done, total)
 			})
 		close(collQ)
 		if err != nil {
@@ -203,8 +234,10 @@ func (bal *Balancer) setupServiceRoots() {
 
 // ComputeChangeSets compares, for each known block, the current and
 // desired replication states. If it is possible to get closer to the
-// desired state by copying or deleting blocks, it submits those
-// changes to the relevant KeepServices' ChangeSets.
+// desired state by copying or deleting blocks, it adds those changes
+// to the relevant KeepServices' ChangeSets.
+//
+// It does not actually apply any of the computed changes.
 func (bal *Balancer) ComputeChangeSets() {
 	defer timeMe(bal.Logger, "ComputeChangeSets")()
 	bal.setupServiceRoots()
@@ -384,25 +417,32 @@ func (bal *Balancer) getStatistics() (s balancerStats) {
 	return
 }
 
+// PrintStatistics writes statistics about the computed changes to
+// bal.Logger. It should not be called until ComputeChangeSets has
+// finished.
 func (bal *Balancer) PrintStatistics() {
 	s := bal.getStatistics()
-	bal.Logf("===")
-	bal.Logf("%s lost (0=have<want)", s.lost)
-	bal.Logf("%s underreplicated (0<have<want)", s.underrep)
-	bal.Logf("%s just right (have=want)", s.justright)
-	bal.Logf("%s overreplicated (have>want>0)", s.overrep)
-	bal.Logf("%s unreferenced (have>want=0, new)", s.unref)
-	bal.Logf("%s garbage (have>want=0, old)", s.garbage)
-	bal.Logf("===")
-	bal.Logf("%s total commitment (excluding unreferenced)", s.desired)
-	bal.Logf("%s total usage", s.current)
-	bal.Logf("===")
+	bal.logf("===")
+	bal.logf("%s lost (0=have<want)", s.lost)
+	bal.logf("%s underreplicated (0<have<want)", s.underrep)
+	bal.logf("%s just right (have=want)", s.justright)
+	bal.logf("%s overreplicated (have>want>0)", s.overrep)
+	bal.logf("%s unreferenced (have>want=0, new)", s.unref)
+	bal.logf("%s garbage (have>want=0, old)", s.garbage)
+	bal.logf("===")
+	bal.logf("%s total commitment (excluding unreferenced)", s.desired)
+	bal.logf("%s total usage", s.current)
+	bal.logf("===")
 	for _, srv := range bal.KeepServices {
-		bal.Logf("%s: %v\n", srv, srv.ChangeSet)
+		bal.logf("%s: %v\n", srv, &srv.ChangeSet)
 	}
-	bal.Logf("===")
+	bal.logf("===")
 }
 
+// CommitPulls sends the computed lists of pull requests to the
+// keepstore servers. This has the effect of increasing replication of
+// existing blocks that are either underreplicated or poorly
+// distributed according to rendezvous hashing.
 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
 	return bal.commitAsync(c, "send pull list",
 		func(srv *KeepService) error {
@@ -410,6 +450,9 @@ func (bal *Balancer) CommitPulls(c *arvados.Client) error {
 		})
 }
 
+// CommitTrash sends the computed lists of trash requests to the
+// keepstore servers. This has the effect of deleting blocks that are
+// overreplicated or unreferenced.
 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
 	return bal.commitAsync(c, "send trash list",
 		func(srv *KeepService) error {
@@ -437,7 +480,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
 	var lastErr error
 	for _ = range bal.KeepServices {
 		if err := <-errs; err != nil {
-			bal.Logf("%v", err)
+			bal.logf("%v", err)
 			lastErr = err
 		}
 	}
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index 4cca976..eaab002 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -6,21 +6,26 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 )
 
+// Replica is a file on disk (or equivalent), as reported in a
+// keepstore index response.
 type Replica struct {
 	*KeepService
 	Mtime int64
 }
 
+// BlockState indicates the number of desired replicas (according to
+// the collections we know about) and the replicas found on disk
+// (according to the keepstore indexes we know about).
 type BlockState struct {
 	Replicas []Replica
 	Desired  int
 }
 
-func (bs *BlockState) AddReplica(r Replica) {
+func (bs *BlockState) addReplica(r Replica) {
 	bs.Replicas = append(bs.Replicas, r)
 }
 
-func (bs *BlockState) IncreaseDesired(n int) {
+func (bs *BlockState) increaseDesired(n int) {
 	if bs.Desired < n {
 		bs.Desired = n
 	}
@@ -33,6 +38,7 @@ type BlockStateMap struct {
 	mutex  sync.Mutex
 }
 
+// NewBlockStateMap returns a newly allocated BlockStateMap.
 func NewBlockStateMap() *BlockStateMap {
 	return &BlockStateMap{
 		entries: make(map[arvados.SizedDigest]*BlockState),
@@ -68,7 +74,7 @@ func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServic
 	defer bsm.mutex.Unlock()
 
 	for _, ent := range idx {
-		bsm.get(ent.SizedDigest).AddReplica(Replica{
+		bsm.get(ent.SizedDigest).addReplica(Replica{
 			KeepService: srv,
 			Mtime:       ent.Mtime,
 		})
@@ -82,6 +88,6 @@ func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
 	defer bsm.mutex.Unlock()
 
 	for _, blkid := range blocks {
-		bsm.get(blkid).IncreaseDesired(n)
+		bsm.get(blkid).increaseDesired(n)
 	}
 }
diff --git a/services/keep-balance/change_set.go b/services/keep-balance/change_set.go
index 3edb21e..fa2a5e3 100644
--- a/services/keep-balance/change_set.go
+++ b/services/keep-balance/change_set.go
@@ -68,7 +68,7 @@ func (cs *ChangeSet) AddTrash(t Trash) {
 }
 
 // String implements fmt.Stringer.
-func (cs ChangeSet) String() string {
+func (cs *ChangeSet) String() string {
 	cs.mutex.Lock()
 	defer cs.mutex.Unlock()
 	return fmt.Sprintf("ChangeSet{Pulls:%d, Trashes:%d}", len(cs.Pulls), len(cs.Trashes))
diff --git a/services/keep-balance/example-config.json b/services/keep-balance/example-config.json
index b7cecee..07480a6 100644
--- a/services/keep-balance/example-config.json
+++ b/services/keep-balance/example-config.json
@@ -6,7 +6,13 @@
     },
     "CommitPull": true,
     "CommitTrash": false,
-    "ServiceTypes": [
+    "KeepServiceTypes": [
         "disk"
     ]
+    "KeepServiceList": {
+        "items": [
+            {"uuid":"zzzzz-bi6l4-0123456789abcdef","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107},
+            {"uuid":"zzzzz-bi6l4-123456789abcdef0","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107}
+        ]
+    }
 }
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index d2b2a2d..6fa65e0 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -65,7 +65,7 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
 			AuthToken: arvadostest.DataManagerToken,
 			Insecure:  true,
 		},
-		ServiceTypes: []string{"disk"},
+		KeepServiceTypes: []string{"disk"},
 	}
 }
 
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index 1a502e0..3d293f7 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -10,25 +10,32 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 )
 
+// KeepService represents a keepstore server that is being rebalanced.
 type KeepService struct {
 	arvados.KeepService
 	ChangeSet
 }
 
+// String implements fmt.Stringer.
 func (srv *KeepService) String() string {
 	return fmt.Sprintf("%s (%s:%d, %s)", srv.UUID, srv.ServiceHost, srv.ServicePort, srv.ServiceType)
 }
 
 var ksSchemes = map[bool]string{false: "http", true: "https"}
 
+// URLBase returns scheme://host:port for this server.
 func (srv *KeepService) URLBase() string {
 	return fmt.Sprintf("%s://%s:%d", ksSchemes[srv.ServiceSSLFlag], srv.ServiceHost, srv.ServicePort)
 }
 
+// CommitPulls sends the current list of pull requests to the storage
+// server (even if the list is empty).
 func (srv *KeepService) CommitPulls(c *arvados.Client) error {
 	return srv.put(c, "pull", srv.ChangeSet.Pulls)
 }
 
+// CommitTrash sends the current list of trash requests to the storage
+// server (even if the list is empty).
 func (srv *KeepService) CommitTrash(c *arvados.Client) error {
 	return srv.put(c, "trash", srv.ChangeSet.Trashes)
 }
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 0d7e56c..a301f8e 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -3,6 +3,7 @@ package main
 import (
 	"encoding/json"
 	"flag"
+	"fmt"
 	"io/ioutil"
 	"log"
 	"os"
@@ -10,11 +11,24 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/x/arvados"
 )
 
+// Config indicates which servers are to be balanced. Config is loaded
+// from a JSON config file.
 type Config struct {
-	Client       arvados.Client
-	ServiceTypes []string
+	// Arvados API endpoint and credentials.
+	Client arvados.Client
+
+	// List of service types (e.g., "disk") to balance.
+	KeepServiceTypes []string
+
+	KeepServiceList arvados.KeepServiceList
 }
 
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "dry run" is a runtime option rather than a config item because it
+// doesn't affect the balancing decisions being made.
+//
+// RunOptions fields are controlled by command line flags.
 type RunOptions struct {
 	CommitPulls bool
 	CommitTrash bool
@@ -31,6 +45,9 @@ func main() {
 	configPath := flag.String("config",
 		os.Getenv("HOME")+"/.config/arvados/keep-balance.json",
 		"configuration file")
+	serviceListPath := flag.String("config.KeepServiceList", "",
+		"list of keep services to balance, as given by `arv keep_service list` "+
+			"(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
 	flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
 		"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
 	flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
@@ -39,10 +56,9 @@ func main() {
 	debugFlag := flag.Bool("debug", false, "enable debug messages")
 	flag.Parse()
 
-	if buf, err := ioutil.ReadFile(*configPath); err != nil {
-		log.Fatalf("Reading %q: %v", *configPath, err)
-	} else if err = json.Unmarshal(buf, &config); err != nil {
-		log.Fatalf("Decoding %q: %v", *configPath, err)
+	mustReadJSON(&config, *configPath)
+	if *serviceListPath != "" {
+		mustReadJSON(&config.KeepServiceList, *serviceListPath)
 	}
 
 	if *debugFlag {
@@ -61,6 +77,14 @@ func main() {
 	}
 }
 
+func mustReadJSON(dst interface{}, path string) {
+	if buf, err := ioutil.ReadFile(path); err != nil {
+		log.Fatalf("Reading %q: %v", path, err)
+	} else if err = json.Unmarshal(buf, dst); err != nil {
+		log.Fatalf("Decoding %q: %v", path, err)
+	}
+}
+
 func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
 	if runOptions.Logger == nil {
 		runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
@@ -68,9 +92,19 @@ func Run(config Config, runOptions RunOptions) (bal *Balancer, err error) {
 	bal = &Balancer{
 		Logger:       runOptions.Logger,
 		Dumper:       runOptions.Dumper,
-		ServiceTypes: config.ServiceTypes,
 	}
 
+	if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+		err = fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+	} else if len(config.KeepServiceList.Items) > 0 {
+		err = bal.SetKeepServices(config.KeepServiceList)
+	} else {
+		err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+	}
+	if err != nil {
+		return
+	}
+	
 	if err = bal.CheckSanityEarly(&config.Client); err != nil {
 		return
 	}
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
index d1d13c3..a44a575 100644
--- a/services/keep-balance/main_test.go
+++ b/services/keep-balance/main_test.go
@@ -36,6 +36,8 @@ func (rt *reqTracker) Add(req *http.Request) int {
 	return len(rt.reqs)
 }
 
+// stubServer is an HTTP transport that intercepts and processes all
+// requests using its own handlers.
 type stubServer struct {
 	mux      *http.ServeMux
 	srv      *httptest.Server
@@ -44,7 +46,12 @@ type stubServer struct {
 	logf     func(string, ...interface{})
 }
 
-func (s *stubServer) start() *http.Client {
+// Start initializes the stub server and returns an *http.Client that
+// uses the stub server to handle all requests.
+//
+// A stubServer that has been started should eventually be shut down
+// with Close().
+func (s *stubServer) Start() *http.Client {
 	// Set up a config.Client that forwards all requests to s.mux
 	// via s.srv. Test cases will attach handlers to s.mux to get
 	// the desired responses.
@@ -69,7 +76,8 @@ func (s *stubServer) RoundTrip(req *http.Request) (*http.Response, error) {
 		Body:       ioutil.NopCloser(w.Body)}, nil
 }
 
-func (s *stubServer) stop() {
+// Close releases resources used by the server.
+func (s *stubServer) Close() {
 	s.srv.Close()
 }
 
@@ -160,6 +168,7 @@ type mainSuite struct {
 	config Config
 }
 
+// make a log.Logger that writes to the current test's c.Log().
 func (s *mainSuite) logger(c *check.C) *log.Logger {
 	r, w := io.Pipe()
 	go func() {
@@ -183,16 +192,16 @@ func (s *mainSuite) logger(c *check.C) *log.Logger {
 func (s *mainSuite) SetUpTest(c *check.C) {
 	s.config = Config{
 		Client: arvados.Client{
-			AuthToken:    "xyzzy",
-			APIHost:      "zzzzz.arvadosapi.com",
-			Client:       s.stub.start()},
-		ServiceTypes: []string{"disk"}}
+			AuthToken: "xyzzy",
+			APIHost:   "zzzzz.arvadosapi.com",
+			Client:    s.stub.Start()},
+		KeepServiceTypes: []string{"disk"}}
 	s.stub.serveDiscoveryDoc()
 	s.stub.logf = c.Logf
 }
 
 func (s *mainSuite) TearDownTest(c *check.C) {
-	s.stub.stop()
+	s.stub.Close()
 }
 
 func (s *mainSuite) TestRefuseZeroCollections(c *check.C) {
@@ -217,7 +226,7 @@ func (s *mainSuite) TestServiceTypes(c *check.C) {
 		CommitTrash: true,
 		Logger:      s.logger(c),
 	}
-	s.config.ServiceTypes = []string{"unlisted-type"}
+	s.config.KeepServiceTypes = []string{"unlisted-type"}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveZeroCollections()
 	s.stub.serveFourDiskKeepServices()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list