[ARVADOS] created: 1.2.0-144-ga23722793
Git user
git at public.curoverse.com
Mon Oct 8 16:47:03 EDT 2018
at a23722793aa13e0e8dd37aa91e16111dba452ba0 (commit)
commit a23722793aa13e0e8dd37aa91e16111dba452ba0
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Oct 8 16:43:40 2018 -0400
14285: Export stats as prometheus metrics.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/doc/install/install-keep-balance.html.textile.liquid b/doc/install/install-keep-balance.html.textile.liquid
index 3a8dce078..043f3ebfd 100644
--- a/doc/install/install-keep-balance.html.textile.liquid
+++ b/doc/install/install-keep-balance.html.textile.liquid
@@ -75,7 +75,8 @@ h3. Create a keep-balance configuration file
On the host running keep-balance, create @/etc/arvados/keep-balance/keep-balance.yml@ using the token you generated above. Follow this YAML format:
<notextile>
-<pre><code>Client:
+<pre><code>Listen: :9005
+Client:
APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
KeepServiceTypes:
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index c723be7d1..e2e9907d5 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -161,6 +161,7 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
type NodeProfile struct {
Controller SystemServiceInstance `json:"arvados-controller"`
Health SystemServiceInstance `json:"arvados-health"`
+ Keepbalance SystemServiceInstance `json:"keep-balance"`
Keepproxy SystemServiceInstance `json:"keepproxy"`
Keepstore SystemServiceInstance `json:"keepstore"`
Keepweb SystemServiceInstance `json:"keep-web"`
@@ -178,6 +179,7 @@ const (
ServiceNameNodemanager ServiceName = "arvados-node-manager"
ServiceNameWorkbench ServiceName = "arvados-workbench"
ServiceNameWebsocket ServiceName = "arvados-ws"
+ ServiceNameKeepbalance ServiceName = "keep-balance"
ServiceNameKeepweb ServiceName = "keep-web"
ServiceNameKeepproxy ServiceName = "keepproxy"
ServiceNameKeepstore ServiceName = "keepstore"
@@ -192,6 +194,7 @@ func (np *NodeProfile) ServicePorts() map[ServiceName]string {
ServiceNameNodemanager: np.Nodemanager.Listen,
ServiceNameWorkbench: np.Workbench.Listen,
ServiceNameWebsocket: np.Websocket.Listen,
+ ServiceNameKeepbalance: np.Keepbalance.Listen,
ServiceNameKeepweb: np.Keepweb.Listen,
ServiceNameKeepproxy: np.Keepproxy.Listen,
ServiceNameKeepstore: np.Keepstore.Listen,
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d86234a93..333a4fbde 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -10,7 +10,6 @@ import (
"fmt"
"log"
"math"
- "os"
"runtime"
"sort"
"strings"
@@ -19,20 +18,9 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/Sirupsen/logrus"
)
-// CheckConfig returns an error if anything is wrong with the given
-// config and runOptions.
-func CheckConfig(config Config, runOptions RunOptions) error {
- if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
- return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
- }
- if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
- return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
- }
- return nil
-}
-
// Balancer compares the contents of keepstore servers with the
// collections stored in Arvados, and issues pull/trash requests
// needed to get (closer to) the optimal data layout.
@@ -43,11 +31,13 @@ func CheckConfig(config Config, runOptions RunOptions) error {
// BlobSignatureTTL; and all N existing replicas of a given data block
// are in the N best positions in rendezvous probe order.
type Balancer struct {
+ Logger *logrus.Logger
+ Dumper *logrus.Logger
+ Metrics *metrics
+
*BlockStateMap
KeepServices map[string]*KeepService
DefaultReplication int
- Logger *log.Logger
- Dumper *log.Logger
MinMtime int64
classes []string
@@ -72,13 +62,7 @@ type Balancer struct {
func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
nextRunOptions = runOptions
- bal.Dumper = runOptions.Dumper
- bal.Logger = runOptions.Logger
- if bal.Logger == nil {
- bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
- }
-
- defer timeMe(bal.Logger, "Run")()
+ defer bal.time("sweep", "wall clock time to run one full sweep")()
if len(config.KeepServiceList.Items) > 0 {
err = bal.SetKeepServices(config.KeepServiceList)
@@ -269,7 +253,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
//
// It encodes the resulting information in BlockStateMap.
func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
- defer timeMe(bal.Logger, "GetCurrentState")()
+ defer bal.time("get_state", "wall clock time to get current state")()
bal.BlockStateMap = NewBlockStateMap()
dd, err := c.DiscoveryDocument()
@@ -413,7 +397,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
func (bal *Balancer) ComputeChangeSets() {
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
- defer timeMe(bal.Logger, "ComputeChangeSets")()
+ defer bal.time("changeset_compute", "wall clock time to compute changesets")()
bal.setupLookupTables()
type balanceTask struct {
@@ -893,6 +877,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
s.trashes += len(srv.ChangeSet.Trashes)
}
bal.stats = s
+ bal.Metrics.UpdateStats(s)
}
// PrintStatistics writes statistics about the computed changes to
@@ -986,6 +971,7 @@ func (bal *Balancer) CheckSanityLate() error {
// existing blocks that are either underreplicated or poorly
// distributed according to rendezvous hashing.
func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+ defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
return bal.commitAsync(c, "send pull list",
func(srv *KeepService) error {
return srv.CommitPulls(c)
@@ -996,6 +982,7 @@ func (bal *Balancer) CommitPulls(c *arvados.Client) error {
// keepstore servers. This has the effect of deleting blocks that are
// overreplicated or unreferenced.
func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+ defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
return bal.commitAsync(c, "send trash list",
func(srv *KeepService) error {
return srv.CommitTrash(c)
@@ -1009,7 +996,6 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
var err error
defer func() { errs <- err }()
label := fmt.Sprintf("%s: %v", srv, label)
- defer timeMe(bal.Logger, label)()
err = f(srv)
if err != nil {
err = fmt.Errorf("%s: %v", label, err)
@@ -1033,6 +1019,17 @@ func (bal *Balancer) logf(f string, args ...interface{}) {
}
}
+func (bal *Balancer) time(name, help string) func() {
+ observer := bal.Metrics.DurationObserver(name+"_seconds", help)
+ t0 := time.Now()
+ bal.Logger.Printf("%s: start", name)
+ return func() {
+ dur := time.Since(t0)
+ observer.Observe(dur.Seconds())
+ bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
+ }
+}
+
// Rendezvous hash sort function. Less efficient than sorting on
// precomputed rendezvous hashes, but also rarely used.
func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 28776abc4..26aee213d 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -9,7 +9,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"net/http/httptest"
"strings"
@@ -17,6 +16,7 @@ import (
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
check "gopkg.in/check.v1"
)
@@ -282,7 +282,7 @@ type runSuite struct {
}
// make a log.Logger that writes to the current test's c.Log().
-func (s *runSuite) logger(c *check.C) *log.Logger {
+func (s *runSuite) logger(c *check.C) *logrus.Logger {
r, w := io.Pipe()
go func() {
buf := make([]byte, 10000)
@@ -299,7 +299,9 @@ func (s *runSuite) logger(c *check.C) *log.Logger {
}
}
}()
- return log.New(w, "", log.LstdFlags)
+ logger := logrus.New()
+ logger.Out = w
+ return logger
}
func (s *runSuite) SetUpTest(c *check.C) {
@@ -308,7 +310,9 @@ func (s *runSuite) SetUpTest(c *check.C) {
AuthToken: "xyzzy",
APIHost: "zzzzz.arvadosapi.com",
Client: s.stub.Start()},
- KeepServiceTypes: []string{"disk"}}
+ KeepServiceTypes: []string{"disk"},
+ RunPeriod: arvados.Duration(time.Second),
+ }
s.stub.serveDiscoveryDoc()
s.stub.logf = c.Logf
}
@@ -330,7 +334,9 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -349,7 +355,9 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
s.stub.serveKeepstoreMounts()
indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.IsNil)
c.Check(indexReqs.Count(), check.Equals, 0)
c.Check(trashReqs.Count(), check.Equals, 0)
@@ -367,7 +375,9 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
s.stub.serveKeepstoreMounts()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -386,7 +396,9 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
@@ -405,8 +417,9 @@ func (s *runSuite) TestDryRun(c *check.C) {
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- var bal Balancer
- _, err := bal.Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ bal, err := srv.Run()
c.Check(err, check.IsNil)
for _, req := range collReqs.reqs {
c.Check(req.Form.Get("include_trash"), check.Equals, "true")
@@ -419,6 +432,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
}
func (s *runSuite) TestCommit(c *check.C) {
+ s.config.Listen = ":"
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
@@ -432,8 +446,9 @@ func (s *runSuite) TestCommit(c *check.C) {
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- var bal Balancer
- _, err := bal.Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ bal, err := srv.Run()
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
@@ -442,9 +457,15 @@ func (s *runSuite) TestCommit(c *check.C) {
// "bar" block is underreplicated by 1, and its only copy is
// in a poor rendezvous position
c.Check(bal.stats.pulls, check.Equals, 2)
+
+ metrics := s.getMetrics(c, srv)
+ c.Check(metrics, check.Matches, `(?ms).*\nkeep_total_bytes 15\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count 1\n.*`)
}
func (s *runSuite) TestRunForever(c *check.C) {
+ s.config.Listen = ":"
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
@@ -461,7 +482,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
stop := make(chan interface{})
s.config.RunPeriod = arvados.Duration(time.Millisecond)
- go RunForever(s.config, opts, stop)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+
+ done := make(chan bool)
+ go func() {
+ srv.RunForever(stop)
+ close(done)
+ }()
// Each run should send 4 pull lists + 4 trash lists. The
// first run should also send 4 empty trash lists at
@@ -471,6 +499,16 @@ func (s *runSuite) TestRunForever(c *check.C) {
time.Sleep(time.Millisecond)
}
stop <- true
+ <-done
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
+ c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
+}
+
+func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
+ resp, err := http.Get("http://" + srv.listening + "/metrics")
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+ return string(buf)
}
diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go
index 9fc47623e..5280b40c9 100644
--- a/services/keep-balance/integration_test.go
+++ b/services/keep-balance/integration_test.go
@@ -6,7 +6,6 @@ package main
import (
"bytes"
- "log"
"os"
"strings"
"testing"
@@ -16,6 +15,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/Sirupsen/logrus"
check "gopkg.in/check.v1"
)
@@ -67,6 +67,7 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
Insecure: true,
},
KeepServiceTypes: []string{"disk"},
+ RunPeriod: arvados.Duration(time.Second),
}
}
@@ -74,12 +75,19 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
var logBuf *bytes.Buffer
for iter := 0; iter < 20; iter++ {
logBuf := &bytes.Buffer{}
+ logger := logrus.New()
+ logger.Out = logBuf
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: log.New(logBuf, "", log.LstdFlags),
+ Logger: logger,
}
- nextOpts, err := (&Balancer{}).Run(s.config, opts)
+
+ bal := &Balancer{
+ Logger: logger,
+ Metrics: newMetrics(),
+ }
+ nextOpts, err := bal.Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
c.Check(nextOpts.CommitPulls, check.Equals, true)
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index 90235cbf3..eb741fa7e 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -17,11 +17,16 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/Sirupsen/logrus"
)
var version = "dev"
-const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+const (
+ defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+ rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+)
// Config specifies site configuration, like API credentials and the
// choice of which servers are to be balanced.
@@ -36,6 +41,9 @@ type Config struct {
KeepServiceList arvados.KeepServiceList
+ // address, address:port, or :port for management interface
+ Listen string
+
// How often to check
RunPeriod arvados.Duration
@@ -62,8 +70,8 @@ type RunOptions struct {
Once bool
CommitPulls bool
CommitTrash bool
- Logger *log.Logger
- Dumper *log.Logger
+ Logger *logrus.Logger
+ Dumper *logrus.Logger
// SafeRendezvousState from the most recent balance operation,
// or "" if unknown. If this changes from one run to the next,
@@ -130,15 +138,17 @@ func main() {
}
}
if *dumpFlag {
- runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+ runOptions.Dumper = logrus.New()
+ runOptions.Dumper.Out = os.Stdout
+ runOptions.Dumper.Formatter = &logrus.TextFormatter{}
}
- err := CheckConfig(cfg, runOptions)
+ srv, err := NewServer(cfg, runOptions)
if err != nil {
// (don't run)
} else if runOptions.Once {
- _, err = (&Balancer{}).Run(cfg, runOptions)
+ _, err = srv.Run()
} else {
- err = RunForever(cfg, runOptions, nil)
+ err = srv.RunForever(nil)
}
if err != nil {
log.Fatal(err)
@@ -151,32 +161,96 @@ func mustReadConfig(dst interface{}, path string) {
}
}
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
+type Server struct {
+ config Config
+ runOptions RunOptions
+ metrics *metrics
+ listening string // for tests
+
+ Logger *logrus.Logger
+ Dumper *logrus.Logger
+}
+
+// NewServer returns a new Server that runs Balancers using the given
+// config and runOptions.
+func NewServer(config Config, runOptions RunOptions) (*Server, error) {
+ if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+ return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+ }
+ if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+ return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+ }
+
if runOptions.Logger == nil {
- runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+ log := logrus.New()
+ log.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ log.Out = os.Stderr
+ runOptions.Logger = log
+ }
+
+ srv := &Server{
+ config: config,
+ runOptions: runOptions,
+ metrics: newMetrics(),
+ Logger: runOptions.Logger,
+ Dumper: runOptions.Dumper,
}
- logger := runOptions.Logger
+ return srv, srv.start()
+}
+
+func (srv *Server) start() error {
+ if srv.config.Listen == "" {
+ return nil
+ }
+ server := &httpserver.Server{
+ Server: http.Server{
+ Handler: httpserver.LogRequests(srv.Logger, srv.metrics.Handler(srv.Logger)),
+ },
+ Addr: srv.config.Listen,
+ }
+ err := server.Start()
+ if err != nil {
+ return err
+ }
+ srv.Logger.Printf("listening at %s", server.Addr)
+ srv.listening = server.Addr
+ return nil
+}
+
+func (srv *Server) Run() (*Balancer, error) {
+ bal := &Balancer{
+ Logger: srv.Logger,
+ Dumper: srv.Dumper,
+ Metrics: srv.metrics,
+ }
+ var err error
+ srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+ return bal, err
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func (srv *Server) RunForever(stop <-chan interface{}) error {
+ logger := srv.runOptions.Logger
- ticker := time.NewTicker(time.Duration(config.RunPeriod))
+ ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
// The unbuffered channel here means we only hear SIGUSR1 if
// it arrives while we're waiting in select{}.
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
- logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
+ logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
for {
- if !runOptions.CommitPulls && !runOptions.CommitTrash {
+ if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- bal := &Balancer{}
- var err error
- runOptions, err = bal.Run(config, runOptions)
+ _, err := srv.Run()
if err != nil {
logger.Print("run failed: ", err)
} else {
@@ -195,7 +269,7 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
// run too soon after the Nth run is triggered
// by SIGUSR1.
ticker.Stop()
- ticker = time.NewTicker(time.Duration(config.RunPeriod))
+ ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
}
logger.Print("starting next run")
}
diff --git a/services/keep-balance/metrics.go b/services/keep-balance/metrics.go
new file mode 100644
index 000000000..96ee66ce5
--- /dev/null
+++ b/services/keep-balance/metrics.go
@@ -0,0 +1,112 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+ "net/http"
+ "sync"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type observer interface{ Observe(float64) }
+type setter interface{ Set(float64) }
+
+type metrics struct {
+ reg *prometheus.Registry
+ statsGauges map[string]setter
+ observers map[string]observer
+ setupOnce sync.Once
+ mtx sync.Mutex
+}
+
+func newMetrics() *metrics {
+ return &metrics{
+ reg: prometheus.NewRegistry(),
+ statsGauges: map[string]setter{},
+ observers: map[string]observer{},
+ }
+}
+
+func (m *metrics) DurationObserver(name, help string) observer {
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+ if obs, ok := m.observers[name]; ok {
+ return obs
+ }
+ summary := prometheus.NewSummary(prometheus.SummaryOpts{
+ Name: name,
+ Subsystem: "keepbalance",
+ Help: help,
+ })
+ m.reg.MustRegister(summary)
+ m.observers[name] = summary
+ return summary
+}
+
+// UpdateStats updates prometheus metrics using the given
+// balancerStats. It creates and registers the needed gauges on its
+// first invocation.
+func (m *metrics) UpdateStats(s balancerStats) {
+ type gauge struct {
+ Value interface{}
+ Help string
+ }
+ s2g := map[string]gauge{
+ "total": {s.current, "current backend storage usage"},
+ "garbage": {s.garbage, "garbage (unreferenced, old)"},
+ "transient": {s.unref, "transient (unreferenced, new)"},
+ "overreplicated": {s.overrep, "overreplicated"},
+ "underreplicated": {s.underrep, "underreplicated"},
+ "lost": {s.lost, "lost"},
+ }
+ m.setupOnce.Do(func() {
+ // Register gauge(s) for each balancerStats field.
+ addGauge := func(name, help string) {
+ g := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: name,
+ Subsystem: "keep",
+ Help: help,
+ })
+ m.reg.MustRegister(g)
+ m.statsGauges[name] = g
+ }
+ for name, gauge := range s2g {
+ switch gauge.Value.(type) {
+ case blocksNBytes:
+ for _, sub := range []string{"blocks", "bytes", "replicas"} {
+ addGauge(name+"_"+sub, sub+" of "+gauge.Help)
+ }
+ case int, int64:
+ addGauge(name, gauge.Help)
+ default:
+ panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+ }
+ }
+ })
+ // Set gauges to values from s.
+ for name, gauge := range s2g {
+ switch val := gauge.Value.(type) {
+ case blocksNBytes:
+ m.statsGauges[name+"_blocks"].Set(float64(val.blocks))
+ m.statsGauges[name+"_bytes"].Set(float64(val.bytes))
+ m.statsGauges[name+"_replicas"].Set(float64(val.replicas))
+ case int:
+ m.statsGauges[name].Set(float64(val))
+ case int64:
+ m.statsGauges[name].Set(float64(val))
+ default:
+ panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+ }
+ }
+}
+
+func (m *metrics) Handler(log promhttp.Logger) http.Handler {
+ return promhttp.HandlerFor(m.reg, promhttp.HandlerOpts{
+ ErrorLog: log,
+ })
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
deleted file mode 100644
index 06d727dfc..000000000
--- a/services/keep-balance/time_me.go
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "log"
- "time"
-)
-
-func timeMe(logger *log.Logger, label string) func() {
- t0 := time.Now()
- logger.Printf("%s: start", label)
- return func() {
- logger.Printf("%s: took %v", label, time.Since(t0))
- }
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list