[ARVADOS] updated: 1.3.0-225-gf50aff88c
Git user
git at public.curoverse.com
Wed Feb 13 17:59:35 EST 2019
Summary of changes:
services/keepstore/azure_blob_volume.go | 2 +-
services/keepstore/config.go | 10 +-
services/keepstore/handler_test.go | 7 +-
services/keepstore/handlers.go | 22 +-
services/keepstore/keepstore.go | 7 +-
services/keepstore/metrics.go | 345 +++++++++++++++++++-------------
services/keepstore/mounts_test.go | 8 +-
services/keepstore/pipe_adapters.go | 2 +-
services/keepstore/proxy_remote_test.go | 8 +-
services/keepstore/pull_worker_test.go | 3 +-
services/keepstore/s3_volume.go | 2 +-
services/keepstore/s3_volume_test.go | 13 +-
services/keepstore/stats_ticker.go | 44 ++--
services/keepstore/volume.go | 9 +-
services/keepstore/volume_test.go | 2 +-
services/keepstore/volume_unix.go | 100 +++++----
services/keepstore/volume_unix_test.go | 5 +-
17 files changed, 348 insertions(+), 241 deletions(-)
via f50aff88ccf1ce6e590a3fe98689eabef4ad292a (commit)
via 0caf177b78c5cf41f6a3a6f699bcebdee6aafb7f (commit)
via 3a3d67ccee068a85aa3b79c5abd40170223071e3 (commit)
from f0553505e32ee00999d1d680da14260a9a0f6b99 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f50aff88ccf1ce6e590a3fe98689eabef4ad292a
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Wed Feb 13 19:58:45 2019 -0300
13937: Adds facility for drivers to register their own counters.
Used on unix_volume. All the others are pending.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/services/keepstore/metrics.go b/services/keepstore/metrics.go
index 63e406c93..80194eb82 100644
--- a/services/keepstore/metrics.go
+++ b/services/keepstore/metrics.go
@@ -88,6 +88,7 @@ func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
}
type volumeMetricsVecs struct {
+ reg *prometheus.Registry
BytesFree *prometheus.GaugeVec
BytesUsed *prometheus.GaugeVec
Errors *prometheus.CounterVec
@@ -102,21 +103,26 @@ type volumeMetricsVecs struct {
}
type volumeMetrics struct {
- BytesFree prometheus.Gauge
- BytesUsed prometheus.Gauge
- Errors prometheus.Counter
- Ops prometheus.Counter
- CompareOps prometheus.Counter
- GetOps prometheus.Counter
- PutOps prometheus.Counter
- TouchOps prometheus.Counter
- InBytes prometheus.Counter
- OutBytes prometheus.Counter
- ErrorCodes *prometheus.CounterVec
+ reg *prometheus.Registry
+ lbls []string
+ internalCounters map[string]*prometheus.CounterVec
+ BytesFree prometheus.Gauge
+ BytesUsed prometheus.Gauge
+ Errors prometheus.Counter
+ Ops prometheus.Counter
+ CompareOps prometheus.Counter
+ GetOps prometheus.Counter
+ PutOps prometheus.Counter
+ TouchOps prometheus.Counter
+ InBytes prometheus.Counter
+ OutBytes prometheus.Counter
+ ErrorCodes *prometheus.CounterVec
}
func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
- m := &volumeMetricsVecs{}
+ m := &volumeMetricsVecs{
+ reg: reg,
+ }
m.BytesFree = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "arvados",
@@ -234,16 +240,19 @@ func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volumeMetrics {
lbls := []string{lbl, mnt, dev}
curried := &volumeMetrics{
- BytesFree: m.BytesFree.WithLabelValues(lbls...),
- BytesUsed: m.BytesUsed.WithLabelValues(lbls...),
- Errors: m.Errors.WithLabelValues(lbls...),
- Ops: m.Ops.WithLabelValues(lbls...),
- CompareOps: m.CompareOps.WithLabelValues(lbls...),
- GetOps: m.GetOps.WithLabelValues(lbls...),
- PutOps: m.PutOps.WithLabelValues(lbls...),
- TouchOps: m.TouchOps.WithLabelValues(lbls...),
- InBytes: m.InBytes.WithLabelValues(lbls...),
- OutBytes: m.OutBytes.WithLabelValues(lbls...),
+ reg: m.reg,
+ lbls: lbls,
+ internalCounters: make(map[string]*prometheus.CounterVec),
+ BytesFree: m.BytesFree.WithLabelValues(lbls...),
+ BytesUsed: m.BytesUsed.WithLabelValues(lbls...),
+ Errors: m.Errors.WithLabelValues(lbls...),
+ Ops: m.Ops.WithLabelValues(lbls...),
+ CompareOps: m.CompareOps.WithLabelValues(lbls...),
+ GetOps: m.GetOps.WithLabelValues(lbls...),
+ PutOps: m.PutOps.WithLabelValues(lbls...),
+ TouchOps: m.TouchOps.WithLabelValues(lbls...),
+ InBytes: m.InBytes.WithLabelValues(lbls...),
+ OutBytes: m.OutBytes.WithLabelValues(lbls...),
ErrorCodes: m.ErrorCodes.MustCurryWith(prometheus.Labels{
"label": lbl,
"mount_point": mnt,
@@ -252,3 +261,23 @@ func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volum
}
return curried
}
+
+// Returns a driver specific counter, creating it when needed. The 'name' argument
+// should include the driver prefix.
+func (m *volumeMetrics) getInternalCounter(name string, help string) prometheus.Counter {
+ counterVec, ok := m.internalCounters[name]
+ if !ok {
+ counterVec = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: name,
+ Help: help,
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ m.reg.MustRegister(counterVec)
+ m.internalCounters[name] = counterVec
+ }
+ return counterVec.WithLabelValues(m.lbls...)
+}
diff --git a/services/keepstore/pipe_adapters.go b/services/keepstore/pipe_adapters.go
index e4a5865a4..69ed6d2ff 100644
--- a/services/keepstore/pipe_adapters.go
+++ b/services/keepstore/pipe_adapters.go
@@ -39,7 +39,7 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (i
}
}
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
// from buf into the pipe. If ctx is done before all data is copied,
// putWithPipe closes the pipe with an error, and returns early with
// an error.
diff --git a/services/keepstore/stats_ticker.go b/services/keepstore/stats_ticker.go
index 7f52b744d..a9f24744b 100644
--- a/services/keepstore/stats_ticker.go
+++ b/services/keepstore/stats_ticker.go
@@ -48,7 +48,9 @@ func (s *statsTicker) TickErr(err error, errType string) {
if err == nil {
return
}
- s.errors.Inc()
+ if s.errors != nil {
+ s.errors.Inc()
+ }
s.Tick(&s.Errors)
s.lock.Lock()
@@ -57,17 +59,23 @@ func (s *statsTicker) TickErr(err error, errType string) {
}
s.ErrorCodes[errType]++
s.lock.Unlock()
- s.errCounters.WithLabelValues(errType).Inc()
+ if s.errCounters != nil {
+ s.errCounters.WithLabelValues(errType).Inc()
+ }
}
// TickInBytes increments the incoming byte counter by n.
func (s *statsTicker) TickInBytes(n uint64) {
- s.inBytes.Add(float64(n))
+ if s.inBytes != nil {
+ s.inBytes.Add(float64(n))
+ }
atomic.AddUint64(&s.InBytes, n)
}
// TickOutBytes increments the outgoing byte counter by n.
func (s *statsTicker) TickOutBytes(n uint64) {
- s.outBytes.Add(float64(n))
+ if s.outBytes != nil {
+ s.outBytes.Add(float64(n))
+ }
atomic.AddUint64(&s.OutBytes, n)
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 5313059ea..8d61f9619 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -21,6 +21,8 @@ import (
"sync/atomic"
"syscall"
"time"
+
+ "github.com/prometheus/client_golang/prometheus"
)
type unixVolumeAdder struct {
@@ -234,7 +236,7 @@ func (v *UnixVolume) Start(m *volumeMetrics) error {
if err == nil {
// Set up prometheus metrics
v.metrics = m
- v.os.stats.statsTicker.setup(m)
+ v.os.stats.setup(v.metrics)
// Periodically update free/used volume space
go func() {
for {
@@ -269,6 +271,7 @@ func (v *UnixVolume) Touch(loc string) error {
}
defer v.unlockfile(f)
ts := syscall.NsecToTimespec(time.Now().UnixNano())
+ v.os.stats.utimesOps.Inc()
v.os.stats.Tick(&v.os.stats.UtimesOps)
err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
v.os.stats.TickErr(err)
@@ -462,6 +465,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
return err
}
defer rootdir.Close()
+ v.os.stats.readdirOps.Inc()
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
names, err := rootdir.Readdirnames(1)
@@ -484,6 +488,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
lastErr = err
continue
}
+ v.os.stats.readdirOps.Inc()
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
fileInfo, err := blockdir.Readdir(1)
@@ -572,6 +577,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
return MethodDisabledError
}
+ v.os.stats.readdirOps.Inc()
v.os.stats.Tick(&v.os.stats.ReaddirOps)
files, err := ioutil.ReadDir(v.blockDir(loc))
if err != nil {
@@ -718,6 +724,7 @@ func (v *UnixVolume) unlock() {
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func (v *UnixVolume) lockfile(f *os.File) error {
+ v.os.stats.flockOps.Inc()
v.os.stats.Tick(&v.os.stats.FlockOps)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
v.os.stats.TickErr(err)
@@ -822,6 +829,27 @@ type unixStats struct {
RenameOps uint64
UnlinkOps uint64
ReaddirOps uint64
+ // Prometheus metrics -- Above ad-hoc counters will be eventually removed
+ openOps prometheus.Counter
+ statOps prometheus.Counter
+ flockOps prometheus.Counter
+ utimesOps prometheus.Counter
+ createOps prometheus.Counter
+ renameOps prometheus.Counter
+ unlinkOps prometheus.Counter
+ readdirOps prometheus.Counter
+}
+
+func (s *unixStats) setup(m *volumeMetrics) {
+ s.statsTicker.setup(m)
+ s.openOps = m.getInternalCounter("unix_open_ops", "Number of backend open operations")
+ s.statOps = m.getInternalCounter("unix_stat_ops", "Number of backend stat operations")
+ s.flockOps = m.getInternalCounter("unix_flock_ops", "Number of backend flock operations")
+ s.utimesOps = m.getInternalCounter("unix_utimes_ops", "Number of backend utimes operations")
+ s.createOps = m.getInternalCounter("unix_create_ops", "Number of backend create operations")
+ s.renameOps = m.getInternalCounter("unix_rename_ops", "Number of backend rename operations")
+ s.unlinkOps = m.getInternalCounter("unix_unlink_ops", "Number of backend unlink operations")
+ s.readdirOps = m.getInternalCounter("unix_readdir_ops", "Number of backend readdir operations")
}
func (s *unixStats) TickErr(err error) {
@@ -836,6 +864,7 @@ type osWithStats struct {
}
func (o *osWithStats) Open(name string) (*os.File, error) {
+ o.stats.openOps.Inc()
o.stats.Tick(&o.stats.OpenOps)
f, err := os.Open(name)
o.stats.TickErr(err)
@@ -843,6 +872,7 @@ func (o *osWithStats) Open(name string) (*os.File, error) {
}
func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+ o.stats.openOps.Inc()
o.stats.Tick(&o.stats.OpenOps)
f, err := os.OpenFile(name, flag, perm)
o.stats.TickErr(err)
@@ -850,6 +880,7 @@ func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.Fil
}
func (o *osWithStats) Remove(path string) error {
+ o.stats.unlinkOps.Inc()
o.stats.Tick(&o.stats.UnlinkOps)
err := os.Remove(path)
o.stats.TickErr(err)
@@ -857,6 +888,7 @@ func (o *osWithStats) Remove(path string) error {
}
func (o *osWithStats) Rename(a, b string) error {
+ o.stats.renameOps.Inc()
o.stats.Tick(&o.stats.RenameOps)
err := os.Rename(a, b)
o.stats.TickErr(err)
@@ -864,6 +896,10 @@ func (o *osWithStats) Rename(a, b string) error {
}
func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+ // Avoid segfaulting when called from vol.Status() on theConfig.Start()
+ if o.stats.statOps != nil {
+ o.stats.statOps.Inc()
+ }
o.stats.Tick(&o.stats.StatOps)
fi, err := os.Stat(path)
o.stats.TickErr(err)
@@ -871,6 +907,7 @@ func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
}
func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+ o.stats.createOps.Inc()
o.stats.Tick(&o.stats.CreateOps)
f, err := ioutil.TempFile(dir, base)
o.stats.TickErr(err)
commit 0caf177b78c5cf41f6a3a6f699bcebdee6aafb7f
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Wed Feb 13 17:10:41 2019 -0300
13937: Moves statsTicker metrics init to its own func.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/services/keepstore/stats_ticker.go b/services/keepstore/stats_ticker.go
index 6742dbaa9..7f52b744d 100644
--- a/services/keepstore/stats_ticker.go
+++ b/services/keepstore/stats_ticker.go
@@ -17,15 +17,22 @@ type statsTicker struct {
OutBytes uint64
// Prometheus metrics
- PromErrors prometheus.Counter
- PromInBytes prometheus.Counter
- PromOutBytes prometheus.Counter
- PromErrorCodes *prometheus.CounterVec
+ errors prometheus.Counter
+ inBytes prometheus.Counter
+ outBytes prometheus.Counter
+ errCounters *prometheus.CounterVec
ErrorCodes map[string]uint64 `json:",omitempty"`
lock sync.Mutex
}
+func (s *statsTicker) setup(m *volumeMetrics) {
+ s.errors = m.Errors
+ s.errCounters = m.ErrorCodes
+ s.inBytes = m.InBytes
+ s.outBytes = m.OutBytes
+}
+
// Tick increments each of the given counters by 1 using
// atomic.AddUint64.
func (s *statsTicker) Tick(counters ...*uint64) {
@@ -41,7 +48,7 @@ func (s *statsTicker) TickErr(err error, errType string) {
if err == nil {
return
}
- s.PromErrors.Inc()
+ s.errors.Inc()
s.Tick(&s.Errors)
s.lock.Lock()
@@ -50,17 +57,17 @@ func (s *statsTicker) TickErr(err error, errType string) {
}
s.ErrorCodes[errType]++
s.lock.Unlock()
- s.PromErrorCodes.WithLabelValues(errType).Inc()
+ s.errCounters.WithLabelValues(errType).Inc()
}
// TickInBytes increments the incoming byte counter by n.
func (s *statsTicker) TickInBytes(n uint64) {
- s.PromInBytes.Add(float64(n))
+ s.inBytes.Add(float64(n))
atomic.AddUint64(&s.InBytes, n)
}
// TickOutBytes increments the outgoing byte counter by n.
func (s *statsTicker) TickOutBytes(n uint64) {
- s.PromOutBytes.Add(float64(n))
+ s.outBytes.Add(float64(n))
atomic.AddUint64(&s.OutBytes, n)
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 10cd6398c..5313059ea 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -28,7 +28,7 @@ type unixVolumeAdder struct {
}
// String implements flag.Value
-func (s *unixVolumeAdder) String() string {
+func (vs *unixVolumeAdder) String() string {
return "-"
}
@@ -234,10 +234,7 @@ func (v *UnixVolume) Start(m *volumeMetrics) error {
if err == nil {
// Set up prometheus metrics
v.metrics = m
- v.os.stats.PromErrors = v.metrics.Errors
- v.os.stats.PromErrorCodes = v.metrics.ErrorCodes
- v.os.stats.PromInBytes = v.metrics.InBytes
- v.os.stats.PromOutBytes = v.metrics.OutBytes
+ v.os.stats.statsTicker.setup(m)
// Periodically update free/used volume space
go func() {
for {
@@ -365,7 +362,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
return putWithPipe(ctx, loc, block, v)
}
-// ReadBlock implements BlockWriter.
+// WriteBlock implements BlockWriter.
func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
if v.ReadOnly {
return MethodDisabledError
commit 3a3d67ccee068a85aa3b79c5abd40170223071e3
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Wed Feb 13 16:33:53 2019 -0300
13937: Refactors approach to pass volume metrics as curried vecs (WIP)
Pending: driver-specific metrics
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 5da2055b7..ab199d991 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -147,7 +147,7 @@ func (v *AzureBlobVolume) Type() string {
}
// Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(m *volumeMetrics) error {
if v.ContainerName == "" {
return errors.New("no container name given")
}
diff --git a/services/keepstore/config.go b/services/keepstore/config.go
index 2bd989de3..43b309916 100644
--- a/services/keepstore/config.go
+++ b/services/keepstore/config.go
@@ -13,6 +13,7 @@ import (
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
@@ -81,7 +82,7 @@ func DefaultConfig() *Config {
// Start should be called exactly once: after setting all public
// fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
if cfg.Debug {
log.Level = logrus.DebugLevel
cfg.debugLogf = log.Printf
@@ -143,8 +144,13 @@ func (cfg *Config) Start() error {
return fmt.Errorf("no volumes found")
}
}
+ vm := newVolumeMetricsVecs(reg)
for _, v := range cfg.Volumes {
- if err := v.Start(); err != nil {
+ metrics := vm.curryWith(
+ v.String(),
+ v.Status().MountPoint,
+ fmt.Sprintf("%d", v.Status().DeviceNum))
+ if err := v.Start(metrics); err != nil {
return fmt.Errorf("volume %s: %s", v, err)
}
log.Printf("Using volume %v (writable=%v)", v, v.Writable())
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index c37a4d112..cbfc0bcda 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -28,6 +28,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
)
var testCluster = &arvados.Cluster{
@@ -827,7 +828,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
@@ -839,7 +840,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
if rt.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
@@ -979,7 +980,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+ MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
ok <- struct{}{}
}()
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index e4f025d6b..7da9f69ad 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -32,17 +32,16 @@ type router struct {
limiter httpserver.RequestCounter
cluster *arvados.Cluster
remoteProxy remoteProxy
- registry *prometheus.Registry
- metrics nodeMetrics
+ metrics *nodeMetrics
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
rtr := &router{
- Router: mux.NewRouter(),
- cluster: cluster,
- registry: prometheus.NewRegistry(),
+ Router: mux.NewRouter(),
+ cluster: cluster,
+ metrics: &nodeMetrics{reg: reg},
}
rtr.HandleFunc(
@@ -89,13 +88,12 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
- rtr.metrics = nodeMetrics{
- reg: rtr.registry,
- rc: rtr.limiter,
- }
- rtr.metrics.setup()
+ rtr.metrics.setupBufferPoolMetrics(bufs)
+ rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+ rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+ rtr.metrics.setupRequestMetrics(rtr.limiter)
- instrumented := httpserver.Instrument(rtr.registry, nil,
+ instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 6ae414bf9..fb1e1ea54 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -18,6 +18,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
+ "github.com/prometheus/client_golang/prometheus"
)
var version = "dev"
@@ -120,7 +121,9 @@ func main() {
log.Printf("keepstore %s started", version)
- err = theConfig.Start()
+ metricsRegistry := prometheus.NewRegistry()
+
+ err = theConfig.Start(metricsRegistry)
if err != nil {
log.Fatal(err)
}
@@ -173,7 +176,7 @@ func main() {
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
// Middleware/handler stack
- router := MakeRESTRouter(cluster)
+ router := MakeRESTRouter(cluster, metricsRegistry)
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/metrics.go b/services/keepstore/metrics.go
index f0815ae4a..63e406c93 100644
--- a/services/keepstore/metrics.go
+++ b/services/keepstore/metrics.go
@@ -13,10 +13,9 @@ import (
type nodeMetrics struct {
reg *prometheus.Registry
- rc httpserver.RequestCounter
}
-func (m *nodeMetrics) setup() {
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
@@ -24,7 +23,7 @@ func (m *nodeMetrics) setup() {
Name: "bufferpool_bytes_allocated",
Help: "Number of bytes allocated to buffers",
},
- func() float64 { return float64(bufs.Alloc()) },
+ func() float64 { return float64(b.Alloc()) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
@@ -33,7 +32,7 @@ func (m *nodeMetrics) setup() {
Name: "bufferpool_buffers_max",
Help: "Maximum number of buffers allowed",
},
- func() float64 { return float64(bufs.Cap()) },
+ func() float64 { return float64(b.Cap()) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
@@ -42,173 +41,214 @@ func (m *nodeMetrics) setup() {
Name: "bufferpool_buffers_in_use",
Help: "Number of buffers in use",
},
- func() float64 { return float64(bufs.Len()) },
+ func() float64 { return float64(b.Len()) },
))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "pull_queue_in_progress",
- Help: "Number of pull requests in progress",
+ Name: fmt.Sprintf("%s_queue_in_progress", qName),
+ Help: fmt.Sprintf("Number of %s requests in progress", qName),
},
- func() float64 { return float64(getWorkQueueStatus(pullq).InProgress) },
+ func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "pull_queue_queued",
- Help: "Number of queued pull requests",
+ Name: fmt.Sprintf("%s_queue_queued", qName),
+ Help: fmt.Sprintf("Number of queued %s requests", qName),
},
- func() float64 { return float64(getWorkQueueStatus(pullq).Queued) },
+ func() float64 { return float64(getWorkQueueStatus(q).Queued) },
))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "trash_queue_in_progress",
- Help: "Number of trash requests in progress",
+ Name: "requests_current",
+ Help: "Number of requests in progress",
},
- func() float64 { return float64(getWorkQueueStatus(trashq).InProgress) },
+ func() float64 { return float64(rc.Current()) },
))
m.reg.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "trash_queue_queued",
- Help: "Number of queued trash requests",
+ Name: "requests_max",
+ Help: "Maximum number of concurrent requests",
},
- func() float64 { return float64(getWorkQueueStatus(trashq).Queued) },
+ func() float64 { return float64(rc.Max()) },
))
- m.reg.MustRegister(prometheus.NewGaugeFunc(
+}
+
+type volumeMetricsVecs struct {
+ BytesFree *prometheus.GaugeVec
+ BytesUsed *prometheus.GaugeVec
+ Errors *prometheus.CounterVec
+ Ops *prometheus.CounterVec
+ CompareOps *prometheus.CounterVec
+ GetOps *prometheus.CounterVec
+ PutOps *prometheus.CounterVec
+ TouchOps *prometheus.CounterVec
+ InBytes *prometheus.CounterVec
+ OutBytes *prometheus.CounterVec
+ ErrorCodes *prometheus.CounterVec
+}
+
+type volumeMetrics struct {
+ BytesFree prometheus.Gauge
+ BytesUsed prometheus.Gauge
+ Errors prometheus.Counter
+ Ops prometheus.Counter
+ CompareOps prometheus.Counter
+ GetOps prometheus.Counter
+ PutOps prometheus.Counter
+ TouchOps prometheus.Counter
+ InBytes prometheus.Counter
+ OutBytes prometheus.Counter
+ ErrorCodes *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+ m := &volumeMetricsVecs{}
+ m.BytesFree = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "requests_current",
- Help: "Number of requests in progress",
+ Name: "volume_bytes_free",
+ Help: "Number of free bytes on the volume",
},
- func() float64 { return float64(m.rc.Current()) },
- ))
- m.reg.MustRegister(prometheus.NewGaugeFunc(
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.BytesFree)
+ m.BytesUsed = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "keepstore",
- Name: "requests_max",
- Help: "Maximum number of concurrent requests",
+ Name: "volume_bytes_used",
+ Help: "Number of used bytes on the volume",
},
- func() float64 { return float64(m.rc.Max()) },
- ))
- // Register individual volume's metrics
- vols := KeepVM.AllReadable()
- for _, vol := range vols {
- labels := prometheus.Labels{
- "label": vol.String(),
- "mount_point": vol.Status().MountPoint,
- "device_number": fmt.Sprintf("%d", vol.Status().DeviceNum),
- }
- if vol, ok := vol.(InternalMetricser); ok {
- // Per-driver internal metrics
- vol.SetupInternalMetrics(m.reg, labels)
- }
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_bytes_free",
- Help: "Number of free bytes on the volume",
- ConstLabels: labels,
- },
- func() float64 { return float64(vol.Status().BytesFree) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_bytes_used",
- Help: "Number of used bytes on the volume",
- ConstLabels: labels,
- },
- func() float64 { return float64(vol.Status().BytesUsed) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_errors",
- Help: "Number of I/O errors",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).Errors) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_ops",
- Help: "Number of I/O operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).Ops) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_compare_ops",
- Help: "Number of I/O compare operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).CompareOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_get_ops",
- Help: "Number of I/O get operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).GetOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_put_ops",
- Help: "Number of I/O put operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).PutOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_touch_ops",
- Help: "Number of I/O touch operations",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).TouchOps) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_input_bytes",
- Help: "Number of input bytes",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).InBytes) },
- ))
- m.reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: "volume_io_output_bytes",
- Help: "Number of output bytes",
- ConstLabels: labels,
- },
- func() float64 { return float64(KeepVM.VolumeStats(vol).OutBytes) },
- ))
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.BytesUsed)
+ m.Errors = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_errors",
+ Help: "Number of volume I/O errors",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.Errors)
+ m.Ops = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_ops",
+ Help: "Number of volume I/O operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.Ops)
+ m.CompareOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_compare_ops",
+ Help: "Number of volume I/O compare operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.CompareOps)
+ m.GetOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_get_ops",
+ Help: "Number of volume I/O get operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.GetOps)
+ m.PutOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_put_ops",
+ Help: "Number of volume I/O put operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.PutOps)
+ m.TouchOps = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_touch_ops",
+ Help: "Number of volume I/O touch operations",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.TouchOps)
+ m.InBytes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_in_bytes",
+ Help: "Number of input bytes",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.InBytes)
+ m.OutBytes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_out_bytes",
+ Help: "Number of output bytes",
+ },
+ []string{"label", "mount_point", "device_number"},
+ )
+ reg.MustRegister(m.OutBytes)
+ m.ErrorCodes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_error_codes",
+ Help: "Number of I/O errors by error code",
+ },
+ []string{"label", "mount_point", "device_number", "error_code"},
+ )
+ reg.MustRegister(m.ErrorCodes)
+
+ return m
+}
+
+func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volumeMetrics {
+ lbls := []string{lbl, mnt, dev}
+ curried := &volumeMetrics{
+ BytesFree: m.BytesFree.WithLabelValues(lbls...),
+ BytesUsed: m.BytesUsed.WithLabelValues(lbls...),
+ Errors: m.Errors.WithLabelValues(lbls...),
+ Ops: m.Ops.WithLabelValues(lbls...),
+ CompareOps: m.CompareOps.WithLabelValues(lbls...),
+ GetOps: m.GetOps.WithLabelValues(lbls...),
+ PutOps: m.PutOps.WithLabelValues(lbls...),
+ TouchOps: m.TouchOps.WithLabelValues(lbls...),
+ InBytes: m.InBytes.WithLabelValues(lbls...),
+ OutBytes: m.OutBytes.WithLabelValues(lbls...),
+ ErrorCodes: m.ErrorCodes.MustCurryWith(prometheus.Labels{
+ "label": lbl,
+ "mount_point": mnt,
+ "device_number": dev,
+ }),
}
+ return curried
}
diff --git a/services/keepstore/mounts_test.go b/services/keepstore/mounts_test.go
index 588bb4299..ac30c369a 100644
--- a/services/keepstore/mounts_test.go
+++ b/services/keepstore/mounts_test.go
@@ -12,6 +12,7 @@ import (
"net/http/httptest"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
@@ -28,15 +29,16 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.ManagementToken = arvadostest.ManagementToken
- theConfig.Start()
- s.rtr = MakeRESTRouter(testCluster)
+ r := prometheus.NewRegistry()
+ theConfig.Start(r)
+ s.rtr = MakeRESTRouter(testCluster, r)
}
func (s *MountsSuite) TearDownTest(c *check.C) {
s.vm.Close()
KeepVM = nil
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
}
func (s *MountsSuite) TestMounts(c *check.C) {
diff --git a/services/keepstore/proxy_remote_test.go b/services/keepstore/proxy_remote_test.go
index 6e720b849..6c22d1d32 100644
--- a/services/keepstore/proxy_remote_test.go
+++ b/services/keepstore/proxy_remote_test.go
@@ -20,6 +20,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/auth"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
@@ -100,15 +101,16 @@ func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.blobSigningKey = []byte(knownKey)
- theConfig.Start()
- s.rtr = MakeRESTRouter(s.cluster)
+ r := prometheus.NewRegistry()
+ theConfig.Start(r)
+ s.rtr = MakeRESTRouter(s.cluster, r)
}
func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
s.vm.Close()
KeepVM = nil
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
s.remoteAPI.Close()
s.remoteKeepproxy.Close()
}
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 7b5077c1a..8e667e048 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -14,6 +14,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
. "gopkg.in/check.v1"
)
@@ -58,7 +59,7 @@ func (s *PullWorkerTestSuite) TearDownTest(c *C) {
pullq = nil
teardown()
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
}
var firstPullList = []byte(`[
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index fb978fe2b..f281b363e 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -198,7 +198,7 @@ func (*S3Volume) Type() string {
// Start populates private fields and verifies the configuration is
// valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(m *volumeMetrics) error {
region, ok := aws.Regions[v.Region]
if v.Endpoint == "" {
if !ok {
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 10c71125d..e88efffe4 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -20,6 +20,7 @@ import (
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
@@ -170,7 +171,9 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
vol := *v.S3Volume
vol.Endpoint = srv.URL
v = &TestableS3Volume{S3Volume: &vol}
- v.Start()
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+ v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+ v.Start(metrics)
ctx, cancel := context.WithCancel(context.Background())
@@ -430,7 +433,9 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
server: srv,
serverClock: clock,
}
- v.Start()
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+ v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+ v.Start(metrics)
err = v.bucket.PutBucket(s3.ACL("private"))
c.Assert(err, check.IsNil)
return v
@@ -448,7 +453,7 @@ Volumes:
c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
}
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(m *volumeMetrics) error {
tmp, err := ioutil.TempFile("", "keepstore")
v.c.Assert(err, check.IsNil)
defer os.Remove(tmp.Name())
@@ -459,7 +464,7 @@ func (v *TestableS3Volume) Start() error {
v.S3Volume.AccessKeyFile = tmp.Name()
v.S3Volume.SecretKeyFile = tmp.Name()
- v.c.Assert(v.S3Volume.Start(), check.IsNil)
+ v.c.Assert(v.S3Volume.Start(m), check.IsNil)
return nil
}
diff --git a/services/keepstore/stats_ticker.go b/services/keepstore/stats_ticker.go
index 36fbcf98a..6742dbaa9 100644
--- a/services/keepstore/stats_ticker.go
+++ b/services/keepstore/stats_ticker.go
@@ -5,7 +5,6 @@
package main
import (
- "fmt"
"sync"
"sync/atomic"
@@ -17,32 +16,16 @@ type statsTicker struct {
InBytes uint64
OutBytes uint64
+ // Prometheus metrics
+ PromErrors prometheus.Counter
+ PromInBytes prometheus.Counter
+ PromOutBytes prometheus.Counter
+ PromErrorCodes *prometheus.CounterVec
+
ErrorCodes map[string]uint64 `json:",omitempty"`
lock sync.Mutex
}
-func (s *statsTicker) setupPrometheus(drv string, reg *prometheus.Registry, lbl prometheus.Labels) {
- metrics := map[string][]interface{}{
- "errors": []interface{}{string("errors"), s.Errors},
- "in_bytes": []interface{}{string("input bytes"), s.InBytes},
- "out_bytes": []interface{}{string("output bytes"), s.OutBytes},
- }
- for mName, data := range metrics {
- mHelp := data[0].(string)
- mVal := data[1].(uint64)
- reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: fmt.Sprintf("%s_%s", drv, mName),
- Help: fmt.Sprintf("Number of %s backend %s", drv, mHelp),
- ConstLabels: lbl,
- },
- func() float64 { return float64(mVal) },
- ))
- }
-}
-
// Tick increments each of the given counters by 1 using
// atomic.AddUint64.
func (s *statsTicker) Tick(counters ...*uint64) {
@@ -58,6 +41,7 @@ func (s *statsTicker) TickErr(err error, errType string) {
if err == nil {
return
}
+ s.PromErrors.Inc()
s.Tick(&s.Errors)
s.lock.Lock()
@@ -66,14 +50,17 @@ func (s *statsTicker) TickErr(err error, errType string) {
}
s.ErrorCodes[errType]++
s.lock.Unlock()
+ s.PromErrorCodes.WithLabelValues(errType).Inc()
}
// TickInBytes increments the incoming byte counter by n.
func (s *statsTicker) TickInBytes(n uint64) {
+ s.PromInBytes.Add(float64(n))
atomic.AddUint64(&s.InBytes, n)
}
// TickOutBytes increments the outgoing byte counter by n.
func (s *statsTicker) TickOutBytes(n uint64) {
+ s.PromOutBytes.Add(float64(n))
atomic.AddUint64(&s.OutBytes, n)
}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 963804639..5c6d1f51a 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -14,7 +14,6 @@ import (
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/prometheus/client_golang/prometheus"
)
type BlockWriter interface {
@@ -40,7 +39,7 @@ type Volume interface {
// Do whatever private setup tasks and configuration checks
// are needed. Return non-nil if the volume is unusable (e.g.,
// invalid config).
- Start() error
+ Start(m *volumeMetrics) error
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
@@ -416,9 +415,3 @@ type ioStats struct {
type InternalStatser interface {
InternalStats() interface{}
}
-
-// InternalMetricser provides an interface for volume drivers to register their
-// own specific metrics.
-type InternalMetricser interface {
- SetupInternalMetrics(*prometheus.Registry, prometheus.Labels)
-}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 43ddd090c..72666638d 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -211,7 +211,7 @@ func (v *MockVolume) Type() string {
return "Mock"
}
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(m *volumeMetrics) error {
return nil
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index a80bb7bf4..10cd6398c 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -21,8 +21,6 @@ import (
"sync/atomic"
"syscall"
"time"
-
- "github.com/prometheus/client_golang/prometheus"
)
type unixVolumeAdder struct {
@@ -120,6 +118,8 @@ type UnixVolume struct {
locker sync.Locker
os osWithStats
+
+ metrics *volumeMetrics
}
// DeviceID returns a globally unique ID for the volume's root
@@ -220,7 +220,7 @@ func (v *UnixVolume) Type() string {
}
// Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(m *volumeMetrics) error {
if v.Serialize {
v.locker = &sync.Mutex{}
}
@@ -231,11 +231,29 @@ func (v *UnixVolume) Start() error {
v.DirectoryReplication = 1
}
_, err := v.os.Stat(v.Root)
+ if err == nil {
+ // Set up prometheus metrics
+ v.metrics = m
+ v.os.stats.PromErrors = v.metrics.Errors
+ v.os.stats.PromErrorCodes = v.metrics.ErrorCodes
+ v.os.stats.PromInBytes = v.metrics.InBytes
+ v.os.stats.PromOutBytes = v.metrics.OutBytes
+ // Periodically update free/used volume space
+ go func() {
+ for {
+ v.metrics.BytesFree.Set(float64(v.Status().BytesFree))
+ v.metrics.BytesUsed.Set(float64(v.Status().BytesUsed))
+ time.Sleep(10 * time.Second)
+ }
+ }()
+ }
return err
}
// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
+ v.metrics.Ops.Inc()
+ v.metrics.TouchOps.Inc()
if v.ReadOnly {
return MethodDisabledError
}
@@ -301,6 +319,8 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ v.metrics.Ops.Inc()
+ v.metrics.GetOps.Inc()
return getWithPipe(ctx, loc, buf, v)
}
@@ -324,6 +344,8 @@ func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) err
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+ v.metrics.Ops.Inc()
+ v.metrics.CompareOps.Inc()
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
@@ -338,6 +360,8 @@ func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) err
// returns a FullError. If the write fails due to some other error,
// that error is returned.
func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+ v.metrics.Ops.Inc()
+ v.metrics.PutOps.Inc()
return putWithPipe(ctx, loc, block, v)
}
@@ -791,42 +815,6 @@ func (v *UnixVolume) EmptyTrash() {
log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
-// SetupInternalMetrics registers driver stats to Prometheus.
-// Implements InternalMetricser interface.
-func (v *UnixVolume) SetupInternalMetrics(reg *prometheus.Registry, lbl prometheus.Labels) {
- v.os.stats.setupPrometheus(reg, lbl)
-}
-
-func (s *unixStats) setupPrometheus(reg *prometheus.Registry, lbl prometheus.Labels) {
- // Common backend metrics
- s.statsTicker.setupPrometheus("unix", reg, lbl)
- // Driver-specific backend metrics
- metrics := map[string][]interface{}{
- "open_ops": []interface{}{string("open operations"), s.OpenOps},
- "stat_ops": []interface{}{string("stat operations"), s.StatOps},
- "flock_ops": []interface{}{string("flock operations"), s.FlockOps},
- "utimes_ops": []interface{}{string("utimes operations"), s.UtimesOps},
- "create_ops": []interface{}{string("create operations"), s.CreateOps},
- "rename_ops": []interface{}{string("rename operations"), s.RenameOps},
- "unlink_ops": []interface{}{string("unlink operations"), s.UnlinkOps},
- "readdir_ops": []interface{}{string("readdir operations"), s.ReaddirOps},
- }
- for mName, data := range metrics {
- mHelp := data[0].(string)
- mVal := data[1].(uint64)
- reg.Register(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "keepstore",
- Name: fmt.Sprintf("unix_%s", mName),
- Help: fmt.Sprintf("Number of unix backend %s", mHelp),
- ConstLabels: lbl,
- },
- func() float64 { return float64(mVal) },
- ))
- }
-}
-
type unixStats struct {
statsTicker
OpenOps uint64
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 7f1cd2196..05c7a93ae 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -20,6 +20,7 @@ import (
"time"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
@@ -115,7 +116,9 @@ func TestReplicationDefault1(t *testing.T) {
Root: "/",
ReadOnly: true,
}
- if err := v.Start(); err != nil {
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+ v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+ if err := v.Start(metrics); err != nil {
t.Error(err)
}
if got := v.Replication(); got != 1 {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list