[ARVADOS] created: 1.2.0-144-ga23722793

Git user git at public.curoverse.com
Mon Oct 8 16:47:03 EDT 2018


        at  a23722793aa13e0e8dd37aa91e16111dba452ba0 (commit)


commit a23722793aa13e0e8dd37aa91e16111dba452ba0
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Oct 8 16:43:40 2018 -0400

    14285: Export stats as prometheus metrics.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/doc/install/install-keep-balance.html.textile.liquid b/doc/install/install-keep-balance.html.textile.liquid
index 3a8dce078..043f3ebfd 100644
--- a/doc/install/install-keep-balance.html.textile.liquid
+++ b/doc/install/install-keep-balance.html.textile.liquid
@@ -75,7 +75,8 @@ h3. Create a keep-balance configuration file
 On the host running keep-balance, create @/etc/arvados/keep-balance/keep-balance.yml@ using the token you generated above.  Follow this YAML format:
 
 <notextile>
-<pre><code>Client:
+<pre><code>Listen: :9005
+Client:
   APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
   AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
 KeepServiceTypes:
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index c723be7d1..e2e9907d5 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -161,6 +161,7 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
 type NodeProfile struct {
 	Controller  SystemServiceInstance `json:"arvados-controller"`
 	Health      SystemServiceInstance `json:"arvados-health"`
+	Keepbalance SystemServiceInstance `json:"keep-balance"`
 	Keepproxy   SystemServiceInstance `json:"keepproxy"`
 	Keepstore   SystemServiceInstance `json:"keepstore"`
 	Keepweb     SystemServiceInstance `json:"keep-web"`
@@ -178,6 +179,7 @@ const (
 	ServiceNameNodemanager ServiceName = "arvados-node-manager"
 	ServiceNameWorkbench   ServiceName = "arvados-workbench"
 	ServiceNameWebsocket   ServiceName = "arvados-ws"
+	ServiceNameKeepbalance ServiceName = "keep-balance"
 	ServiceNameKeepweb     ServiceName = "keep-web"
 	ServiceNameKeepproxy   ServiceName = "keepproxy"
 	ServiceNameKeepstore   ServiceName = "keepstore"
@@ -192,6 +194,7 @@ func (np *NodeProfile) ServicePorts() map[ServiceName]string {
 		ServiceNameNodemanager: np.Nodemanager.Listen,
 		ServiceNameWorkbench:   np.Workbench.Listen,
 		ServiceNameWebsocket:   np.Websocket.Listen,
+		ServiceNameKeepbalance: np.Keepbalance.Listen,
 		ServiceNameKeepweb:     np.Keepweb.Listen,
 		ServiceNameKeepproxy:   np.Keepproxy.Listen,
 		ServiceNameKeepstore:   np.Keepstore.Listen,
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d86234a93..333a4fbde 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -10,7 +10,6 @@ import (
 	"fmt"
 	"log"
 	"math"
-	"os"
 	"runtime"
 	"sort"
 	"strings"
@@ -19,20 +18,9 @@ import (
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"github.com/Sirupsen/logrus"
 )
 
-// CheckConfig returns an error if anything is wrong with the given
-// config and runOptions.
-func CheckConfig(config Config, runOptions RunOptions) error {
-	if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
-		return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
-	}
-	if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
-		return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
-	}
-	return nil
-}
-
 // Balancer compares the contents of keepstore servers with the
 // collections stored in Arvados, and issues pull/trash requests
 // needed to get (closer to) the optimal data layout.
@@ -43,11 +31,13 @@ func CheckConfig(config Config, runOptions RunOptions) error {
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
+	Logger  *logrus.Logger
+	Dumper  *logrus.Logger
+	Metrics *metrics
+
 	*BlockStateMap
 	KeepServices       map[string]*KeepService
 	DefaultReplication int
-	Logger             *log.Logger
-	Dumper             *log.Logger
 	MinMtime           int64
 
 	classes       []string
@@ -72,13 +62,7 @@ type Balancer struct {
 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
 	nextRunOptions = runOptions
 
-	bal.Dumper = runOptions.Dumper
-	bal.Logger = runOptions.Logger
-	if bal.Logger == nil {
-		bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
-	}
-
-	defer timeMe(bal.Logger, "Run")()
+	defer bal.time("sweep", "wall clock time to run one full sweep")()
 
 	if len(config.KeepServiceList.Items) > 0 {
 		err = bal.SetKeepServices(config.KeepServiceList)
@@ -269,7 +253,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 //
 // It encodes the resulting information in BlockStateMap.
 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
-	defer timeMe(bal.Logger, "GetCurrentState")()
+	defer bal.time("get_state", "wall clock time to get current state")()
 	bal.BlockStateMap = NewBlockStateMap()
 
 	dd, err := c.DiscoveryDocument()
@@ -413,7 +397,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
 func (bal *Balancer) ComputeChangeSets() {
 	// This just calls balanceBlock() once for each block, using a
 	// pool of worker goroutines.
-	defer timeMe(bal.Logger, "ComputeChangeSets")()
+	defer bal.time("changeset_compute", "wall clock time to compute changesets")()
 	bal.setupLookupTables()
 
 	type balanceTask struct {
@@ -893,6 +877,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
 		s.trashes += len(srv.ChangeSet.Trashes)
 	}
 	bal.stats = s
+	bal.Metrics.UpdateStats(s)
 }
 
 // PrintStatistics writes statistics about the computed changes to
@@ -986,6 +971,7 @@ func (bal *Balancer) CheckSanityLate() error {
 // existing blocks that are either underreplicated or poorly
 // distributed according to rendezvous hashing.
 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+	defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
 	return bal.commitAsync(c, "send pull list",
 		func(srv *KeepService) error {
 			return srv.CommitPulls(c)
@@ -996,6 +982,7 @@ func (bal *Balancer) CommitPulls(c *arvados.Client) error {
 // keepstore servers. This has the effect of deleting blocks that are
 // overreplicated or unreferenced.
 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+	defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
 	return bal.commitAsync(c, "send trash list",
 		func(srv *KeepService) error {
 			return srv.CommitTrash(c)
@@ -1009,7 +996,6 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
 			var err error
 			defer func() { errs <- err }()
 			label := fmt.Sprintf("%s: %v", srv, label)
-			defer timeMe(bal.Logger, label)()
 			err = f(srv)
 			if err != nil {
 				err = fmt.Errorf("%s: %v", label, err)
@@ -1033,6 +1019,17 @@ func (bal *Balancer) logf(f string, args ...interface{}) {
 	}
 }
 
+func (bal *Balancer) time(name, help string) func() {
+	observer := bal.Metrics.DurationObserver(name+"_seconds", help)
+	t0 := time.Now()
+	bal.Logger.Printf("%s: start", name)
+	return func() {
+		dur := time.Since(t0)
+		observer.Observe(dur.Seconds())
+		bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
+	}
+}
+
 // Rendezvous hash sort function. Less efficient than sorting on
 // precomputed rendezvous hashes, but also rarely used.
 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 28776abc4..26aee213d 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -9,7 +9,6 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"net/http"
 	"net/http/httptest"
 	"strings"
@@ -17,6 +16,7 @@ import (
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
 
 	check "gopkg.in/check.v1"
 )
@@ -282,7 +282,7 @@ type runSuite struct {
 }
 
 // make a log.Logger that writes to the current test's c.Log().
-func (s *runSuite) logger(c *check.C) *log.Logger {
+func (s *runSuite) logger(c *check.C) *logrus.Logger {
 	r, w := io.Pipe()
 	go func() {
 		buf := make([]byte, 10000)
@@ -299,7 +299,9 @@ func (s *runSuite) logger(c *check.C) *log.Logger {
 			}
 		}
 	}()
-	return log.New(w, "", log.LstdFlags)
+	logger := logrus.New()
+	logger.Out = w
+	return logger
 }
 
 func (s *runSuite) SetUpTest(c *check.C) {
@@ -308,7 +310,9 @@ func (s *runSuite) SetUpTest(c *check.C) {
 			AuthToken: "xyzzy",
 			APIHost:   "zzzzz.arvadosapi.com",
 			Client:    s.stub.Start()},
-		KeepServiceTypes: []string{"disk"}}
+		KeepServiceTypes: []string{"disk"},
+		RunPeriod:        arvados.Duration(time.Second),
+	}
 	s.stub.serveDiscoveryDoc()
 	s.stub.logf = c.Logf
 }
@@ -330,7 +334,9 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	_, err := (&Balancer{}).Run(s.config, opts)
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+	_, err = srv.Run()
 	c.Check(err, check.ErrorMatches, "received zero collections")
 	c.Check(trashReqs.Count(), check.Equals, 4)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -349,7 +355,9 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
 	s.stub.serveKeepstoreMounts()
 	indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
-	_, err := (&Balancer{}).Run(s.config, opts)
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+	_, err = srv.Run()
 	c.Check(err, check.IsNil)
 	c.Check(indexReqs.Count(), check.Equals, 0)
 	c.Check(trashReqs.Count(), check.Equals, 0)
@@ -367,7 +375,9 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
 	s.stub.serveKeepstoreMounts()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	_, err := (&Balancer{}).Run(s.config, opts)
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+	_, err = srv.Run()
 	c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
 	c.Check(trashReqs.Count(), check.Equals, 0)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -386,7 +396,9 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	_, err := (&Balancer{}).Run(s.config, opts)
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+	_, err = srv.Run()
 	c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
 	c.Check(trashReqs.Count(), check.Equals, 4)
 	c.Check(pullReqs.Count(), check.Equals, 0)
@@ -405,8 +417,9 @@ func (s *runSuite) TestDryRun(c *check.C) {
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	var bal Balancer
-	_, err := bal.Run(s.config, opts)
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+	bal, err := srv.Run()
 	c.Check(err, check.IsNil)
 	for _, req := range collReqs.reqs {
 		c.Check(req.Form.Get("include_trash"), check.Equals, "true")
@@ -419,6 +432,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
 }
 
 func (s *runSuite) TestCommit(c *check.C) {
+	s.config.Listen = ":"
 	opts := RunOptions{
 		CommitPulls: true,
 		CommitTrash: true,
@@ -432,8 +446,9 @@ func (s *runSuite) TestCommit(c *check.C) {
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
-	var bal Balancer
-	_, err := bal.Run(s.config, opts)
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+	bal, err := srv.Run()
 	c.Check(err, check.IsNil)
 	c.Check(trashReqs.Count(), check.Equals, 8)
 	c.Check(pullReqs.Count(), check.Equals, 4)
@@ -442,9 +457,15 @@ func (s *runSuite) TestCommit(c *check.C) {
 	// "bar" block is underreplicated by 1, and its only copy is
 	// in a poor rendezvous position
 	c.Check(bal.stats.pulls, check.Equals, 2)
+
+	metrics := s.getMetrics(c, srv)
+	c.Check(metrics, check.Matches, `(?ms).*\nkeep_total_bytes 15\n.*`)
+	c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+	c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count 1\n.*`)
 }
 
 func (s *runSuite) TestRunForever(c *check.C) {
+	s.config.Listen = ":"
 	opts := RunOptions{
 		CommitPulls: true,
 		CommitTrash: true,
@@ -461,7 +482,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
 
 	stop := make(chan interface{})
 	s.config.RunPeriod = arvados.Duration(time.Millisecond)
-	go RunForever(s.config, opts, stop)
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+
+	done := make(chan bool)
+	go func() {
+		srv.RunForever(stop)
+		close(done)
+	}()
 
 	// Each run should send 4 pull lists + 4 trash lists. The
 	// first run should also send 4 empty trash lists at
@@ -471,6 +499,16 @@ func (s *runSuite) TestRunForever(c *check.C) {
 		time.Sleep(time.Millisecond)
 	}
 	stop <- true
+	<-done
 	c.Check(pullReqs.Count() >= 16, check.Equals, true)
 	c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
+	c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
+}
+
+func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
+	resp, err := http.Get("http://" + srv.listening + "/metrics")
+	c.Assert(err, check.IsNil)
+	buf, err := ioutil.ReadAll(resp.Body)
+	c.Check(err, check.IsNil)
+	return string(buf)
 }
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index 9fc47623e..5280b40c9 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -6,7 +6,6 @@ package main
 
 import (
 	"bytes"
-	"log"
 	"os"
 	"strings"
 	"testing"
@@ -16,6 +15,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"github.com/Sirupsen/logrus"
 
 	check "gopkg.in/check.v1"
 )
@@ -67,6 +67,7 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
 			Insecure:  true,
 		},
 		KeepServiceTypes: []string{"disk"},
+		RunPeriod:        arvados.Duration(time.Second),
 	}
 }
 
@@ -74,12 +75,19 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 	var logBuf *bytes.Buffer
 	for iter := 0; iter < 20; iter++ {
 		logBuf := &bytes.Buffer{}
+		logger := logrus.New()
+		logger.Out = logBuf
 		opts := RunOptions{
 			CommitPulls: true,
 			CommitTrash: true,
-			Logger:      log.New(logBuf, "", log.LstdFlags),
+			Logger:      logger,
 		}
-		nextOpts, err := (&Balancer{}).Run(s.config, opts)
+
+		bal := &Balancer{
+			Logger:  logger,
+			Metrics: newMetrics(),
+		}
+		nextOpts, err := bal.Run(s.config, opts)
 		c.Check(err, check.IsNil)
 		c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
 		c.Check(nextOpts.CommitPulls, check.Equals, true)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 90235cbf3..eb741fa7e 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -17,11 +17,16 @@ import (
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/config"
+	"git.curoverse.com/arvados.git/sdk/go/httpserver"
+	"github.com/Sirupsen/logrus"
 )
 
 var version = "dev"
 
-const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+const (
+	defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+	rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
+)
 
 // Config specifies site configuration, like API credentials and the
 // choice of which servers are to be balanced.
@@ -36,6 +41,9 @@ type Config struct {
 
 	KeepServiceList arvados.KeepServiceList
 
+	// address, address:port, or :port for management interface
+	Listen string
+
 	// How often to check
 	RunPeriod arvados.Duration
 
@@ -62,8 +70,8 @@ type RunOptions struct {
 	Once        bool
 	CommitPulls bool
 	CommitTrash bool
-	Logger      *log.Logger
-	Dumper      *log.Logger
+	Logger      *logrus.Logger
+	Dumper      *logrus.Logger
 
 	// SafeRendezvousState from the most recent balance operation,
 	// or "" if unknown. If this changes from one run to the next,
@@ -130,15 +138,17 @@ func main() {
 		}
 	}
 	if *dumpFlag {
-		runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+		runOptions.Dumper = logrus.New()
+		runOptions.Dumper.Out = os.Stdout
+		runOptions.Dumper.Formatter = &logrus.TextFormatter{}
 	}
-	err := CheckConfig(cfg, runOptions)
+	srv, err := NewServer(cfg, runOptions)
 	if err != nil {
 		// (don't run)
 	} else if runOptions.Once {
-		_, err = (&Balancer{}).Run(cfg, runOptions)
+		_, err = srv.Run()
 	} else {
-		err = RunForever(cfg, runOptions, nil)
+		err = srv.RunForever(nil)
 	}
 	if err != nil {
 		log.Fatal(err)
@@ -151,32 +161,96 @@ func mustReadConfig(dst interface{}, path string) {
 	}
 }
 
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
+type Server struct {
+	config     Config
+	runOptions RunOptions
+	metrics    *metrics
+	listening  string // for tests
+
+	Logger *logrus.Logger
+	Dumper *logrus.Logger
+}
+
+// NewServer returns a new Server that runs Balancers using the given
+// config and runOptions.
+func NewServer(config Config, runOptions RunOptions) (*Server, error) {
+	if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+		return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+	}
+	if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+		return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+	}
+
 	if runOptions.Logger == nil {
-		runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+		log := logrus.New()
+		log.Formatter = &logrus.JSONFormatter{
+			TimestampFormat: rfc3339NanoFixed,
+		}
+		log.Out = os.Stderr
+		runOptions.Logger = log
+	}
+
+	srv := &Server{
+		config:     config,
+		runOptions: runOptions,
+		metrics:    newMetrics(),
+		Logger:     runOptions.Logger,
+		Dumper:     runOptions.Dumper,
 	}
-	logger := runOptions.Logger
+	return srv, srv.start()
+}
+
+func (srv *Server) start() error {
+	if srv.config.Listen == "" {
+		return nil
+	}
+	server := &httpserver.Server{
+		Server: http.Server{
+			Handler: httpserver.LogRequests(srv.Logger, srv.metrics.Handler(srv.Logger)),
+		},
+		Addr: srv.config.Listen,
+	}
+	err := server.Start()
+	if err != nil {
+		return err
+	}
+	srv.Logger.Printf("listening at %s", server.Addr)
+	srv.listening = server.Addr
+	return nil
+}
+
+func (srv *Server) Run() (*Balancer, error) {
+	bal := &Balancer{
+		Logger:  srv.Logger,
+		Dumper:  srv.Dumper,
+		Metrics: srv.metrics,
+	}
+	var err error
+	srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+	return bal, err
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func (srv *Server) RunForever(stop <-chan interface{}) error {
+	logger := srv.runOptions.Logger
 
-	ticker := time.NewTicker(time.Duration(config.RunPeriod))
+	ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
 
 	// The unbuffered channel here means we only hear SIGUSR1 if
 	// it arrives while we're waiting in select{}.
 	sigUSR1 := make(chan os.Signal)
 	signal.Notify(sigUSR1, syscall.SIGUSR1)
 
-	logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
+	logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
 
 	for {
-		if !runOptions.CommitPulls && !runOptions.CommitTrash {
+		if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
 			logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
 			logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
 		}
 
-		bal := &Balancer{}
-		var err error
-		runOptions, err = bal.Run(config, runOptions)
+		_, err := srv.Run()
 		if err != nil {
 			logger.Print("run failed: ", err)
 		} else {
@@ -195,7 +269,7 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
 			// run too soon after the Nth run is triggered
 			// by SIGUSR1.
 			ticker.Stop()
-			ticker = time.NewTicker(time.Duration(config.RunPeriod))
+			ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
 		}
 		logger.Print("starting next run")
 	}
diff --git a/services/keep-balance/metrics.go b/services/keep-balance/metrics.go
new file mode 100644
index 000000000..96ee66ce5
--- /dev/null
+++ b/services/keep-balance/metrics.go
@@ -0,0 +1,112 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"fmt"
+	"net/http"
+	"sync"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type observer interface{ Observe(float64) }
+type setter interface{ Set(float64) }
+
+type metrics struct {
+	reg         *prometheus.Registry
+	statsGauges map[string]setter
+	observers   map[string]observer
+	setupOnce   sync.Once
+	mtx         sync.Mutex
+}
+
+func newMetrics() *metrics {
+	return &metrics{
+		reg:         prometheus.NewRegistry(),
+		statsGauges: map[string]setter{},
+		observers:   map[string]observer{},
+	}
+}
+
+func (m *metrics) DurationObserver(name, help string) observer {
+	m.mtx.Lock()
+	defer m.mtx.Unlock()
+	if obs, ok := m.observers[name]; ok {
+		return obs
+	}
+	summary := prometheus.NewSummary(prometheus.SummaryOpts{
+		Name:      name,
+		Subsystem: "keepbalance",
+		Help:      help,
+	})
+	m.reg.MustRegister(summary)
+	m.observers[name] = summary
+	return summary
+}
+
+// UpdateStats updates prometheus metrics using the given
+// balancerStats. It creates and registers the needed gauges on its
+// first invocation.
+func (m *metrics) UpdateStats(s balancerStats) {
+	type gauge struct {
+		Value interface{}
+		Help  string
+	}
+	s2g := map[string]gauge{
+		"total":           {s.current, "current backend storage usage"},
+		"garbage":         {s.garbage, "garbage (unreferenced, old)"},
+		"transient":       {s.unref, "transient (unreferenced, new)"},
+		"overreplicated":  {s.overrep, "overreplicated"},
+		"underreplicated": {s.underrep, "underreplicated"},
+		"lost":            {s.lost, "lost"},
+	}
+	m.setupOnce.Do(func() {
+		// Register gauge(s) for each balancerStats field.
+		addGauge := func(name, help string) {
+			g := prometheus.NewGauge(prometheus.GaugeOpts{
+				Name:      name,
+				Subsystem: "keep",
+				Help:      help,
+			})
+			m.reg.MustRegister(g)
+			m.statsGauges[name] = g
+		}
+		for name, gauge := range s2g {
+			switch gauge.Value.(type) {
+			case blocksNBytes:
+				for _, sub := range []string{"blocks", "bytes", "replicas"} {
+					addGauge(name+"_"+sub, sub+" of "+gauge.Help)
+				}
+			case int, int64:
+				addGauge(name, gauge.Help)
+			default:
+				panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+			}
+		}
+	})
+	// Set gauges to values from s.
+	for name, gauge := range s2g {
+		switch val := gauge.Value.(type) {
+		case blocksNBytes:
+			m.statsGauges[name+"_blocks"].Set(float64(val.blocks))
+			m.statsGauges[name+"_bytes"].Set(float64(val.bytes))
+			m.statsGauges[name+"_replicas"].Set(float64(val.replicas))
+		case int:
+			m.statsGauges[name].Set(float64(val))
+		case int64:
+			m.statsGauges[name].Set(float64(val))
+		default:
+			panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+		}
+	}
+}
+
+func (m *metrics) Handler(log promhttp.Logger) http.Handler {
+	return promhttp.HandlerFor(m.reg, promhttp.HandlerOpts{
+		ErrorLog: log,
+	})
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
deleted file mode 100644
index 06d727dfc..000000000
--- a/services/keep-balance/time_me.go
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-	"log"
-	"time"
-)
-
-func timeMe(logger *log.Logger, label string) func() {
-	t0 := time.Now()
-	logger.Printf("%s: start", label)
-	return func() {
-		logger.Printf("%s: took %v", label, time.Since(t0))
-	}
-}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list