[ARVADOS] updated: 1.3.0-225-gf50aff88c

Git user git at public.curoverse.com
Wed Feb 13 17:59:35 EST 2019


Summary of changes:
 services/keepstore/azure_blob_volume.go |   2 +-
 services/keepstore/config.go            |  10 +-
 services/keepstore/handler_test.go      |   7 +-
 services/keepstore/handlers.go          |  22 +-
 services/keepstore/keepstore.go         |   7 +-
 services/keepstore/metrics.go           | 345 +++++++++++++++++++-------------
 services/keepstore/mounts_test.go       |   8 +-
 services/keepstore/pipe_adapters.go     |   2 +-
 services/keepstore/proxy_remote_test.go |   8 +-
 services/keepstore/pull_worker_test.go  |   3 +-
 services/keepstore/s3_volume.go         |   2 +-
 services/keepstore/s3_volume_test.go    |  13 +-
 services/keepstore/stats_ticker.go      |  44 ++--
 services/keepstore/volume.go            |   9 +-
 services/keepstore/volume_test.go       |   2 +-
 services/keepstore/volume_unix.go       | 100 +++++----
 services/keepstore/volume_unix_test.go  |   5 +-
 17 files changed, 348 insertions(+), 241 deletions(-)

       via  f50aff88ccf1ce6e590a3fe98689eabef4ad292a (commit)
       via  0caf177b78c5cf41f6a3a6f699bcebdee6aafb7f (commit)
       via  3a3d67ccee068a85aa3b79c5abd40170223071e3 (commit)
      from  f0553505e32ee00999d1d680da14260a9a0f6b99 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f50aff88ccf1ce6e590a3fe98689eabef4ad292a
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date:   Wed Feb 13 19:58:45 2019 -0300

    13937: Adds facility for drivers to register their own counters.
    
    Used on unix_volume. All the others are pending.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>

diff --git a/services/keepstore/metrics.go b/services/keepstore/metrics.go
index 63e406c93..80194eb82 100644
--- a/services/keepstore/metrics.go
+++ b/services/keepstore/metrics.go
@@ -88,6 +88,7 @@ func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
 }
 
 type volumeMetricsVecs struct {
+	reg        *prometheus.Registry
 	BytesFree  *prometheus.GaugeVec
 	BytesUsed  *prometheus.GaugeVec
 	Errors     *prometheus.CounterVec
@@ -102,21 +103,26 @@ type volumeMetricsVecs struct {
 }
 
 type volumeMetrics struct {
-	BytesFree  prometheus.Gauge
-	BytesUsed  prometheus.Gauge
-	Errors     prometheus.Counter
-	Ops        prometheus.Counter
-	CompareOps prometheus.Counter
-	GetOps     prometheus.Counter
-	PutOps     prometheus.Counter
-	TouchOps   prometheus.Counter
-	InBytes    prometheus.Counter
-	OutBytes   prometheus.Counter
-	ErrorCodes *prometheus.CounterVec
+	reg              *prometheus.Registry
+	lbls             []string
+	internalCounters map[string]*prometheus.CounterVec
+	BytesFree        prometheus.Gauge
+	BytesUsed        prometheus.Gauge
+	Errors           prometheus.Counter
+	Ops              prometheus.Counter
+	CompareOps       prometheus.Counter
+	GetOps           prometheus.Counter
+	PutOps           prometheus.Counter
+	TouchOps         prometheus.Counter
+	InBytes          prometheus.Counter
+	OutBytes         prometheus.Counter
+	ErrorCodes       *prometheus.CounterVec
 }
 
 func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
-	m := &volumeMetricsVecs{}
+	m := &volumeMetricsVecs{
+		reg: reg,
+	}
 	m.BytesFree = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
@@ -234,16 +240,19 @@ func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
 func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volumeMetrics {
 	lbls := []string{lbl, mnt, dev}
 	curried := &volumeMetrics{
-		BytesFree:  m.BytesFree.WithLabelValues(lbls...),
-		BytesUsed:  m.BytesUsed.WithLabelValues(lbls...),
-		Errors:     m.Errors.WithLabelValues(lbls...),
-		Ops:        m.Ops.WithLabelValues(lbls...),
-		CompareOps: m.CompareOps.WithLabelValues(lbls...),
-		GetOps:     m.GetOps.WithLabelValues(lbls...),
-		PutOps:     m.PutOps.WithLabelValues(lbls...),
-		TouchOps:   m.TouchOps.WithLabelValues(lbls...),
-		InBytes:    m.InBytes.WithLabelValues(lbls...),
-		OutBytes:   m.OutBytes.WithLabelValues(lbls...),
+		reg:              m.reg,
+		lbls:             lbls,
+		internalCounters: make(map[string]*prometheus.CounterVec),
+		BytesFree:        m.BytesFree.WithLabelValues(lbls...),
+		BytesUsed:        m.BytesUsed.WithLabelValues(lbls...),
+		Errors:           m.Errors.WithLabelValues(lbls...),
+		Ops:              m.Ops.WithLabelValues(lbls...),
+		CompareOps:       m.CompareOps.WithLabelValues(lbls...),
+		GetOps:           m.GetOps.WithLabelValues(lbls...),
+		PutOps:           m.PutOps.WithLabelValues(lbls...),
+		TouchOps:         m.TouchOps.WithLabelValues(lbls...),
+		InBytes:          m.InBytes.WithLabelValues(lbls...),
+		OutBytes:         m.OutBytes.WithLabelValues(lbls...),
 		ErrorCodes: m.ErrorCodes.MustCurryWith(prometheus.Labels{
 			"label":         lbl,
 			"mount_point":   mnt,
@@ -252,3 +261,23 @@ func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volum
 	}
 	return curried
 }
+
+// Returns a driver specific counter, creating it when needed. The 'name' argument
+// should include the driver prefix.
+func (m *volumeMetrics) getInternalCounter(name string, help string) prometheus.Counter {
+	counterVec, ok := m.internalCounters[name]
+	if !ok {
+		counterVec = prometheus.NewCounterVec(
+			prometheus.CounterOpts{
+				Namespace: "arvados",
+				Subsystem: "keepstore",
+				Name:      name,
+				Help:      help,
+			},
+			[]string{"label", "mount_point", "device_number"},
+		)
+		m.reg.MustRegister(counterVec)
+		m.internalCounters[name] = counterVec
+	}
+	return counterVec.WithLabelValues(m.lbls...)
+}
diff --git a/services/keepstore/pipe_adapters.go b/services/keepstore/pipe_adapters.go
index e4a5865a4..69ed6d2ff 100644
--- a/services/keepstore/pipe_adapters.go
+++ b/services/keepstore/pipe_adapters.go
@@ -39,7 +39,7 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (i
 	}
 }
 
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
 // from buf into the pipe. If ctx is done before all data is copied,
 // putWithPipe closes the pipe with an error, and returns early with
 // an error.
diff --git a/services/keepstore/stats_ticker.go b/services/keepstore/stats_ticker.go
index 7f52b744d..a9f24744b 100644
--- a/services/keepstore/stats_ticker.go
+++ b/services/keepstore/stats_ticker.go
@@ -48,7 +48,9 @@ func (s *statsTicker) TickErr(err error, errType string) {
 	if err == nil {
 		return
 	}
-	s.errors.Inc()
+	if s.errors != nil {
+		s.errors.Inc()
+	}
 	s.Tick(&s.Errors)
 
 	s.lock.Lock()
@@ -57,17 +59,23 @@ func (s *statsTicker) TickErr(err error, errType string) {
 	}
 	s.ErrorCodes[errType]++
 	s.lock.Unlock()
-	s.errCounters.WithLabelValues(errType).Inc()
+	if s.errCounters != nil {
+		s.errCounters.WithLabelValues(errType).Inc()
+	}
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
-	s.inBytes.Add(float64(n))
+	if s.inBytes != nil {
+		s.inBytes.Add(float64(n))
+	}
 	atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
-	s.outBytes.Add(float64(n))
+	if s.outBytes != nil {
+		s.outBytes.Add(float64(n))
+	}
 	atomic.AddUint64(&s.OutBytes, n)
 }
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 5313059ea..8d61f9619 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -21,6 +21,8 @@ import (
 	"sync/atomic"
 	"syscall"
 	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
 )
 
 type unixVolumeAdder struct {
@@ -234,7 +236,7 @@ func (v *UnixVolume) Start(m *volumeMetrics) error {
 	if err == nil {
 		// Set up prometheus metrics
 		v.metrics = m
-		v.os.stats.statsTicker.setup(m)
+		v.os.stats.setup(v.metrics)
 		// Periodically update free/used volume space
 		go func() {
 			for {
@@ -269,6 +271,7 @@ func (v *UnixVolume) Touch(loc string) error {
 	}
 	defer v.unlockfile(f)
 	ts := syscall.NsecToTimespec(time.Now().UnixNano())
+	v.os.stats.utimesOps.Inc()
 	v.os.stats.Tick(&v.os.stats.UtimesOps)
 	err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
 	v.os.stats.TickErr(err)
@@ -462,6 +465,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 		return err
 	}
 	defer rootdir.Close()
+	v.os.stats.readdirOps.Inc()
 	v.os.stats.Tick(&v.os.stats.ReaddirOps)
 	for {
 		names, err := rootdir.Readdirnames(1)
@@ -484,6 +488,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 			lastErr = err
 			continue
 		}
+		v.os.stats.readdirOps.Inc()
 		v.os.stats.Tick(&v.os.stats.ReaddirOps)
 		for {
 			fileInfo, err := blockdir.Readdir(1)
@@ -572,6 +577,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 		return MethodDisabledError
 	}
 
+	v.os.stats.readdirOps.Inc()
 	v.os.stats.Tick(&v.os.stats.ReaddirOps)
 	files, err := ioutil.ReadDir(v.blockDir(loc))
 	if err != nil {
@@ -718,6 +724,7 @@ func (v *UnixVolume) unlock() {
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func (v *UnixVolume) lockfile(f *os.File) error {
+	v.os.stats.flockOps.Inc()
 	v.os.stats.Tick(&v.os.stats.FlockOps)
 	err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
 	v.os.stats.TickErr(err)
@@ -822,6 +829,27 @@ type unixStats struct {
 	RenameOps  uint64
 	UnlinkOps  uint64
 	ReaddirOps uint64
+	// Prometheus metrics -- Above ad-hoc counters will be eventually removed
+	openOps    prometheus.Counter
+	statOps    prometheus.Counter
+	flockOps   prometheus.Counter
+	utimesOps  prometheus.Counter
+	createOps  prometheus.Counter
+	renameOps  prometheus.Counter
+	unlinkOps  prometheus.Counter
+	readdirOps prometheus.Counter
+}
+
+func (s *unixStats) setup(m *volumeMetrics) {
+	s.statsTicker.setup(m)
+	s.openOps = m.getInternalCounter("unix_open_ops", "Number of backend open operations")
+	s.statOps = m.getInternalCounter("unix_stat_ops", "Number of backend stat operations")
+	s.flockOps = m.getInternalCounter("unix_flock_ops", "Number of backend flock operations")
+	s.utimesOps = m.getInternalCounter("unix_utimes_ops", "Number of backend utimes operations")
+	s.createOps = m.getInternalCounter("unix_create_ops", "Number of backend create operations")
+	s.renameOps = m.getInternalCounter("unix_rename_ops", "Number of backend rename operations")
+	s.unlinkOps = m.getInternalCounter("unix_unlink_ops", "Number of backend unlink operations")
+	s.readdirOps = m.getInternalCounter("unix_readdir_ops", "Number of backend readdir operations")
 }
 
 func (s *unixStats) TickErr(err error) {
@@ -836,6 +864,7 @@ type osWithStats struct {
 }
 
 func (o *osWithStats) Open(name string) (*os.File, error) {
+	o.stats.openOps.Inc()
 	o.stats.Tick(&o.stats.OpenOps)
 	f, err := os.Open(name)
 	o.stats.TickErr(err)
@@ -843,6 +872,7 @@ func (o *osWithStats) Open(name string) (*os.File, error) {
 }
 
 func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+	o.stats.openOps.Inc()
 	o.stats.Tick(&o.stats.OpenOps)
 	f, err := os.OpenFile(name, flag, perm)
 	o.stats.TickErr(err)
@@ -850,6 +880,7 @@ func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.Fil
 }
 
 func (o *osWithStats) Remove(path string) error {
+	o.stats.unlinkOps.Inc()
 	o.stats.Tick(&o.stats.UnlinkOps)
 	err := os.Remove(path)
 	o.stats.TickErr(err)
@@ -857,6 +888,7 @@ func (o *osWithStats) Remove(path string) error {
 }
 
 func (o *osWithStats) Rename(a, b string) error {
+	o.stats.renameOps.Inc()
 	o.stats.Tick(&o.stats.RenameOps)
 	err := os.Rename(a, b)
 	o.stats.TickErr(err)
@@ -864,6 +896,10 @@ func (o *osWithStats) Rename(a, b string) error {
 }
 
 func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+	// Avoid segfaulting when called from vol.Status() on theConfig.Start()
+	if o.stats.statOps != nil {
+		o.stats.statOps.Inc()
+	}
 	o.stats.Tick(&o.stats.StatOps)
 	fi, err := os.Stat(path)
 	o.stats.TickErr(err)
@@ -871,6 +907,7 @@ func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
 }
 
 func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+	o.stats.createOps.Inc()
 	o.stats.Tick(&o.stats.CreateOps)
 	f, err := ioutil.TempFile(dir, base)
 	o.stats.TickErr(err)

commit 0caf177b78c5cf41f6a3a6f699bcebdee6aafb7f
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date:   Wed Feb 13 17:10:41 2019 -0300

    13937: Moves statsTicker metrics init to its own func.
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>

diff --git a/services/keepstore/stats_ticker.go b/services/keepstore/stats_ticker.go
index 6742dbaa9..7f52b744d 100644
--- a/services/keepstore/stats_ticker.go
+++ b/services/keepstore/stats_ticker.go
@@ -17,15 +17,22 @@ type statsTicker struct {
 	OutBytes uint64
 
 	// Prometheus metrics
-	PromErrors     prometheus.Counter
-	PromInBytes    prometheus.Counter
-	PromOutBytes   prometheus.Counter
-	PromErrorCodes *prometheus.CounterVec
+	errors      prometheus.Counter
+	inBytes     prometheus.Counter
+	outBytes    prometheus.Counter
+	errCounters *prometheus.CounterVec
 
 	ErrorCodes map[string]uint64 `json:",omitempty"`
 	lock       sync.Mutex
 }
 
+func (s *statsTicker) setup(m *volumeMetrics) {
+	s.errors = m.Errors
+	s.errCounters = m.ErrorCodes
+	s.inBytes = m.InBytes
+	s.outBytes = m.OutBytes
+}
+
 // Tick increments each of the given counters by 1 using
 // atomic.AddUint64.
 func (s *statsTicker) Tick(counters ...*uint64) {
@@ -41,7 +48,7 @@ func (s *statsTicker) TickErr(err error, errType string) {
 	if err == nil {
 		return
 	}
-	s.PromErrors.Inc()
+	s.errors.Inc()
 	s.Tick(&s.Errors)
 
 	s.lock.Lock()
@@ -50,17 +57,17 @@ func (s *statsTicker) TickErr(err error, errType string) {
 	}
 	s.ErrorCodes[errType]++
 	s.lock.Unlock()
-	s.PromErrorCodes.WithLabelValues(errType).Inc()
+	s.errCounters.WithLabelValues(errType).Inc()
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
-	s.PromInBytes.Add(float64(n))
+	s.inBytes.Add(float64(n))
 	atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
-	s.PromOutBytes.Add(float64(n))
+	s.outBytes.Add(float64(n))
 	atomic.AddUint64(&s.OutBytes, n)
 }
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 10cd6398c..5313059ea 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -28,7 +28,7 @@ type unixVolumeAdder struct {
 }
 
 // String implements flag.Value
-func (s *unixVolumeAdder) String() string {
+func (vs *unixVolumeAdder) String() string {
 	return "-"
 }
 
@@ -234,10 +234,7 @@ func (v *UnixVolume) Start(m *volumeMetrics) error {
 	if err == nil {
 		// Set up prometheus metrics
 		v.metrics = m
-		v.os.stats.PromErrors = v.metrics.Errors
-		v.os.stats.PromErrorCodes = v.metrics.ErrorCodes
-		v.os.stats.PromInBytes = v.metrics.InBytes
-		v.os.stats.PromOutBytes = v.metrics.OutBytes
+		v.os.stats.statsTicker.setup(m)
 		// Periodically update free/used volume space
 		go func() {
 			for {
@@ -365,7 +362,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
 	return putWithPipe(ctx, loc, block, v)
 }
 
-// ReadBlock implements BlockWriter.
+// WriteBlock implements BlockWriter.
 func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
 	if v.ReadOnly {
 		return MethodDisabledError

commit 3a3d67ccee068a85aa3b79c5abd40170223071e3
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date:   Wed Feb 13 16:33:53 2019 -0300

    13937: Refactors approach to pass volume metrics as curried vecs (WIP)
    
    Pending: driver-specific metrics
    
    Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 5da2055b7..ab199d991 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -147,7 +147,7 @@ func (v *AzureBlobVolume) Type() string {
 }
 
 // Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(m *volumeMetrics) error {
 	if v.ContainerName == "" {
 		return errors.New("no container name given")
 	}
diff --git a/services/keepstore/config.go b/services/keepstore/config.go
index 2bd989de3..43b309916 100644
--- a/services/keepstore/config.go
+++ b/services/keepstore/config.go
@@ -13,6 +13,7 @@ import (
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
 )
 
@@ -81,7 +82,7 @@ func DefaultConfig() *Config {
 
 // Start should be called exactly once: after setting all public
 // fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
 	if cfg.Debug {
 		log.Level = logrus.DebugLevel
 		cfg.debugLogf = log.Printf
@@ -143,8 +144,13 @@ func (cfg *Config) Start() error {
 			return fmt.Errorf("no volumes found")
 		}
 	}
+	vm := newVolumeMetricsVecs(reg)
 	for _, v := range cfg.Volumes {
-		if err := v.Start(); err != nil {
+		metrics := vm.curryWith(
+			v.String(),
+			v.Status().MountPoint,
+			fmt.Sprintf("%d", v.Status().DeviceNum))
+		if err := v.Start(metrics); err != nil {
 			return fmt.Errorf("volume %s: %s", v, err)
 		}
 		log.Printf("Using volume %v (writable=%v)", v, v.Writable())
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index c37a4d112..cbfc0bcda 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -28,6 +28,7 @@ import (
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"github.com/prometheus/client_golang/prometheus"
 )
 
 var testCluster = &arvados.Cluster{
@@ -827,7 +828,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
 	}
-	loggingRouter := MakeRESTRouter(testCluster)
+	loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
@@ -839,7 +840,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "Bearer "+rt.apiToken)
 	}
-	loggingRouter := MakeRESTRouter(testCluster)
+	loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
@@ -979,7 +980,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
 	ok := make(chan struct{})
 	go func() {
 		req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-		MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+		MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
 		ok <- struct{}{}
 	}()
 
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index e4f025d6b..7da9f69ad 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -32,17 +32,16 @@ type router struct {
 	limiter     httpserver.RequestCounter
 	cluster     *arvados.Cluster
 	remoteProxy remoteProxy
-	registry    *prometheus.Registry
-	metrics     nodeMetrics
+	metrics     *nodeMetrics
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
 	rtr := &router{
-		Router:   mux.NewRouter(),
-		cluster:  cluster,
-		registry: prometheus.NewRegistry(),
+		Router:  mux.NewRouter(),
+		cluster: cluster,
+		metrics: &nodeMetrics{reg: reg},
 	}
 
 	rtr.HandleFunc(
@@ -89,13 +88,12 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
 	rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
 	rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
-	rtr.metrics = nodeMetrics{
-		reg: rtr.registry,
-		rc:  rtr.limiter,
-	}
-	rtr.metrics.setup()
+	rtr.metrics.setupBufferPoolMetrics(bufs)
+	rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+	rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+	rtr.metrics.setupRequestMetrics(rtr.limiter)
 
-	instrumented := httpserver.Instrument(rtr.registry, nil,
+	instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
 		httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
 	return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 6ae414bf9..fb1e1ea54 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -18,6 +18,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/config"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"github.com/coreos/go-systemd/daemon"
+	"github.com/prometheus/client_golang/prometheus"
 )
 
 var version = "dev"
@@ -120,7 +121,9 @@ func main() {
 
 	log.Printf("keepstore %s started", version)
 
-	err = theConfig.Start()
+	metricsRegistry := prometheus.NewRegistry()
+
+	err = theConfig.Start(metricsRegistry)
 	if err != nil {
 		log.Fatal(err)
 	}
@@ -173,7 +176,7 @@ func main() {
 	KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
 	// Middleware/handler stack
-	router := MakeRESTRouter(cluster)
+	router := MakeRESTRouter(cluster, metricsRegistry)
 
 	// Set up a TCP listener.
 	listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/metrics.go b/services/keepstore/metrics.go
index f0815ae4a..63e406c93 100644
--- a/services/keepstore/metrics.go
+++ b/services/keepstore/metrics.go
@@ -13,10 +13,9 @@ import (
 
 type nodeMetrics struct {
 	reg *prometheus.Registry
-	rc  httpserver.RequestCounter
 }
 
-func (m *nodeMetrics) setup() {
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
 	m.reg.MustRegister(prometheus.NewGaugeFunc(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
@@ -24,7 +23,7 @@ func (m *nodeMetrics) setup() {
 			Name:      "bufferpool_bytes_allocated",
 			Help:      "Number of bytes allocated to buffers",
 		},
-		func() float64 { return float64(bufs.Alloc()) },
+		func() float64 { return float64(b.Alloc()) },
 	))
 	m.reg.MustRegister(prometheus.NewGaugeFunc(
 		prometheus.GaugeOpts{
@@ -33,7 +32,7 @@ func (m *nodeMetrics) setup() {
 			Name:      "bufferpool_buffers_max",
 			Help:      "Maximum number of buffers allowed",
 		},
-		func() float64 { return float64(bufs.Cap()) },
+		func() float64 { return float64(b.Cap()) },
 	))
 	m.reg.MustRegister(prometheus.NewGaugeFunc(
 		prometheus.GaugeOpts{
@@ -42,173 +41,214 @@ func (m *nodeMetrics) setup() {
 			Name:      "bufferpool_buffers_in_use",
 			Help:      "Number of buffers in use",
 		},
-		func() float64 { return float64(bufs.Len()) },
+		func() float64 { return float64(b.Len()) },
 	))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
 	m.reg.MustRegister(prometheus.NewGaugeFunc(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
 			Subsystem: "keepstore",
-			Name:      "pull_queue_in_progress",
-			Help:      "Number of pull requests in progress",
+			Name:      fmt.Sprintf("%s_queue_in_progress", qName),
+			Help:      fmt.Sprintf("Number of %s requests in progress", qName),
 		},
-		func() float64 { return float64(getWorkQueueStatus(pullq).InProgress) },
+		func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
 	))
 	m.reg.MustRegister(prometheus.NewGaugeFunc(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
 			Subsystem: "keepstore",
-			Name:      "pull_queue_queued",
-			Help:      "Number of queued pull requests",
+			Name:      fmt.Sprintf("%s_queue_queued", qName),
+			Help:      fmt.Sprintf("Number of queued %s requests", qName),
 		},
-		func() float64 { return float64(getWorkQueueStatus(pullq).Queued) },
+		func() float64 { return float64(getWorkQueueStatus(q).Queued) },
 	))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
 	m.reg.MustRegister(prometheus.NewGaugeFunc(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
 			Subsystem: "keepstore",
-			Name:      "trash_queue_in_progress",
-			Help:      "Number of trash requests in progress",
+			Name:      "requests_current",
+			Help:      "Number of requests in progress",
 		},
-		func() float64 { return float64(getWorkQueueStatus(trashq).InProgress) },
+		func() float64 { return float64(rc.Current()) },
 	))
 	m.reg.MustRegister(prometheus.NewGaugeFunc(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
 			Subsystem: "keepstore",
-			Name:      "trash_queue_queued",
-			Help:      "Number of queued trash requests",
+			Name:      "requests_max",
+			Help:      "Maximum number of concurrent requests",
 		},
-		func() float64 { return float64(getWorkQueueStatus(trashq).Queued) },
+		func() float64 { return float64(rc.Max()) },
 	))
-	m.reg.MustRegister(prometheus.NewGaugeFunc(
+}
+
+type volumeMetricsVecs struct {
+	BytesFree  *prometheus.GaugeVec
+	BytesUsed  *prometheus.GaugeVec
+	Errors     *prometheus.CounterVec
+	Ops        *prometheus.CounterVec
+	CompareOps *prometheus.CounterVec
+	GetOps     *prometheus.CounterVec
+	PutOps     *prometheus.CounterVec
+	TouchOps   *prometheus.CounterVec
+	InBytes    *prometheus.CounterVec
+	OutBytes   *prometheus.CounterVec
+	ErrorCodes *prometheus.CounterVec
+}
+
+type volumeMetrics struct {
+	BytesFree  prometheus.Gauge
+	BytesUsed  prometheus.Gauge
+	Errors     prometheus.Counter
+	Ops        prometheus.Counter
+	CompareOps prometheus.Counter
+	GetOps     prometheus.Counter
+	PutOps     prometheus.Counter
+	TouchOps   prometheus.Counter
+	InBytes    prometheus.Counter
+	OutBytes   prometheus.Counter
+	ErrorCodes *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+	m := &volumeMetricsVecs{}
+	m.BytesFree = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
 			Subsystem: "keepstore",
-			Name:      "requests_current",
-			Help:      "Number of requests in progress",
+			Name:      "volume_bytes_free",
+			Help:      "Number of free bytes on the volume",
 		},
-		func() float64 { return float64(m.rc.Current()) },
-	))
-	m.reg.MustRegister(prometheus.NewGaugeFunc(
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.BytesFree)
+	m.BytesUsed = prometheus.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: "arvados",
 			Subsystem: "keepstore",
-			Name:      "requests_max",
-			Help:      "Maximum number of concurrent requests",
+			Name:      "volume_bytes_used",
+			Help:      "Number of used bytes on the volume",
 		},
-		func() float64 { return float64(m.rc.Max()) },
-	))
-	// Register individual volume's metrics
-	vols := KeepVM.AllReadable()
-	for _, vol := range vols {
-		labels := prometheus.Labels{
-			"label":         vol.String(),
-			"mount_point":   vol.Status().MountPoint,
-			"device_number": fmt.Sprintf("%d", vol.Status().DeviceNum),
-		}
-		if vol, ok := vol.(InternalMetricser); ok {
-			// Per-driver internal metrics
-			vol.SetupInternalMetrics(m.reg, labels)
-		}
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_bytes_free",
-				Help:        "Number of free bytes on the volume",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(vol.Status().BytesFree) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_bytes_used",
-				Help:        "Number of used bytes on the volume",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(vol.Status().BytesUsed) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_errors",
-				Help:        "Number of I/O errors",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).Errors) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_ops",
-				Help:        "Number of I/O operations",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).Ops) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_compare_ops",
-				Help:        "Number of I/O compare operations",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).CompareOps) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_get_ops",
-				Help:        "Number of I/O get operations",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).GetOps) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_put_ops",
-				Help:        "Number of I/O put operations",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).PutOps) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_touch_ops",
-				Help:        "Number of I/O touch operations",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).TouchOps) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_input_bytes",
-				Help:        "Number of input bytes",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).InBytes) },
-		))
-		m.reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        "volume_io_output_bytes",
-				Help:        "Number of output bytes",
-				ConstLabels: labels,
-			},
-			func() float64 { return float64(KeepVM.VolumeStats(vol).OutBytes) },
-		))
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.BytesUsed)
+	m.Errors = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_errors",
+			Help:      "Number of volume I/O errors",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.Errors)
+	m.Ops = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_ops",
+			Help:      "Number of volume I/O operations",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.Ops)
+	m.CompareOps = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_compare_ops",
+			Help:      "Number of volume I/O compare operations",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.CompareOps)
+	m.GetOps = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_get_ops",
+			Help:      "Number of volume I/O get operations",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.GetOps)
+	m.PutOps = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_put_ops",
+			Help:      "Number of volume I/O put operations",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.PutOps)
+	m.TouchOps = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_touch_ops",
+			Help:      "Number of volume I/O touch operations",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.TouchOps)
+	m.InBytes = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_in_bytes",
+			Help:      "Number of input bytes",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.InBytes)
+	m.OutBytes = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_out_bytes",
+			Help:      "Number of output bytes",
+		},
+		[]string{"label", "mount_point", "device_number"},
+	)
+	reg.MustRegister(m.OutBytes)
+	m.ErrorCodes = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: "arvados",
+			Subsystem: "keepstore",
+			Name:      "volume_io_error_codes",
+			Help:      "Number of I/O errors by error code",
+		},
+		[]string{"label", "mount_point", "device_number", "error_code"},
+	)
+	reg.MustRegister(m.ErrorCodes)
+
+	return m
+}
+
+func (m *volumeMetricsVecs) curryWith(lbl string, mnt string, dev string) *volumeMetrics {
+	lbls := []string{lbl, mnt, dev}
+	curried := &volumeMetrics{
+		BytesFree:  m.BytesFree.WithLabelValues(lbls...),
+		BytesUsed:  m.BytesUsed.WithLabelValues(lbls...),
+		Errors:     m.Errors.WithLabelValues(lbls...),
+		Ops:        m.Ops.WithLabelValues(lbls...),
+		CompareOps: m.CompareOps.WithLabelValues(lbls...),
+		GetOps:     m.GetOps.WithLabelValues(lbls...),
+		PutOps:     m.PutOps.WithLabelValues(lbls...),
+		TouchOps:   m.TouchOps.WithLabelValues(lbls...),
+		InBytes:    m.InBytes.WithLabelValues(lbls...),
+		OutBytes:   m.OutBytes.WithLabelValues(lbls...),
+		ErrorCodes: m.ErrorCodes.MustCurryWith(prometheus.Labels{
+			"label":         lbl,
+			"mount_point":   mnt,
+			"device_number": dev,
+		}),
 	}
+	return curried
 }
diff --git a/services/keepstore/mounts_test.go b/services/keepstore/mounts_test.go
index 588bb4299..ac30c369a 100644
--- a/services/keepstore/mounts_test.go
+++ b/services/keepstore/mounts_test.go
@@ -12,6 +12,7 @@ import (
 	"net/http/httptest"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"github.com/prometheus/client_golang/prometheus"
 	check "gopkg.in/check.v1"
 )
 
@@ -28,15 +29,16 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
 	theConfig = DefaultConfig()
 	theConfig.systemAuthToken = arvadostest.DataManagerToken
 	theConfig.ManagementToken = arvadostest.ManagementToken
-	theConfig.Start()
-	s.rtr = MakeRESTRouter(testCluster)
+	r := prometheus.NewRegistry()
+	theConfig.Start(r)
+	s.rtr = MakeRESTRouter(testCluster, r)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
 	s.vm.Close()
 	KeepVM = nil
 	theConfig = DefaultConfig()
-	theConfig.Start()
+	theConfig.Start(prometheus.NewRegistry())
 }
 
 func (s *MountsSuite) TestMounts(c *check.C) {
diff --git a/services/keepstore/proxy_remote_test.go b/services/keepstore/proxy_remote_test.go
index 6e720b849..6c22d1d32 100644
--- a/services/keepstore/proxy_remote_test.go
+++ b/services/keepstore/proxy_remote_test.go
@@ -20,6 +20,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/auth"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"github.com/prometheus/client_golang/prometheus"
 	check "gopkg.in/check.v1"
 )
 
@@ -100,15 +101,16 @@ func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
 	theConfig = DefaultConfig()
 	theConfig.systemAuthToken = arvadostest.DataManagerToken
 	theConfig.blobSigningKey = []byte(knownKey)
-	theConfig.Start()
-	s.rtr = MakeRESTRouter(s.cluster)
+	r := prometheus.NewRegistry()
+	theConfig.Start(r)
+	s.rtr = MakeRESTRouter(s.cluster, r)
 }
 
 func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
 	s.vm.Close()
 	KeepVM = nil
 	theConfig = DefaultConfig()
-	theConfig.Start()
+	theConfig.Start(prometheus.NewRegistry())
 	s.remoteAPI.Close()
 	s.remoteKeepproxy.Close()
 }
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 7b5077c1a..8e667e048 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -14,6 +14,7 @@ import (
 
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"github.com/prometheus/client_golang/prometheus"
 	. "gopkg.in/check.v1"
 )
 
@@ -58,7 +59,7 @@ func (s *PullWorkerTestSuite) TearDownTest(c *C) {
 	pullq = nil
 	teardown()
 	theConfig = DefaultConfig()
-	theConfig.Start()
+	theConfig.Start(prometheus.NewRegistry())
 }
 
 var firstPullList = []byte(`[
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index fb978fe2b..f281b363e 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -198,7 +198,7 @@ func (*S3Volume) Type() string {
 
 // Start populates private fields and verifies the configuration is
 // valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(m *volumeMetrics) error {
 	region, ok := aws.Regions[v.Region]
 	if v.Endpoint == "" {
 		if !ok {
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 10c71125d..e88efffe4 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -20,6 +20,7 @@ import (
 	"github.com/AdRoll/goamz/s3"
 	"github.com/AdRoll/goamz/s3/s3test"
 	"github.com/ghodss/yaml"
+	"github.com/prometheus/client_golang/prometheus"
 	check "gopkg.in/check.v1"
 )
 
@@ -170,7 +171,9 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
 	vol := *v.S3Volume
 	vol.Endpoint = srv.URL
 	v = &TestableS3Volume{S3Volume: &vol}
-	v.Start()
+	metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+		v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+	v.Start(metrics)
 
 	ctx, cancel := context.WithCancel(context.Background())
 
@@ -430,7 +433,9 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
 		server:      srv,
 		serverClock: clock,
 	}
-	v.Start()
+	metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+		v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+	v.Start(metrics)
 	err = v.bucket.PutBucket(s3.ACL("private"))
 	c.Assert(err, check.IsNil)
 	return v
@@ -448,7 +453,7 @@ Volumes:
 	c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
 }
 
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(m *volumeMetrics) error {
 	tmp, err := ioutil.TempFile("", "keepstore")
 	v.c.Assert(err, check.IsNil)
 	defer os.Remove(tmp.Name())
@@ -459,7 +464,7 @@ func (v *TestableS3Volume) Start() error {
 	v.S3Volume.AccessKeyFile = tmp.Name()
 	v.S3Volume.SecretKeyFile = tmp.Name()
 
-	v.c.Assert(v.S3Volume.Start(), check.IsNil)
+	v.c.Assert(v.S3Volume.Start(m), check.IsNil)
 	return nil
 }
 
diff --git a/services/keepstore/stats_ticker.go b/services/keepstore/stats_ticker.go
index 36fbcf98a..6742dbaa9 100644
--- a/services/keepstore/stats_ticker.go
+++ b/services/keepstore/stats_ticker.go
@@ -5,7 +5,6 @@
 package main
 
 import (
-	"fmt"
 	"sync"
 	"sync/atomic"
 
@@ -17,32 +16,16 @@ type statsTicker struct {
 	InBytes  uint64
 	OutBytes uint64
 
+	// Prometheus metrics
+	PromErrors     prometheus.Counter
+	PromInBytes    prometheus.Counter
+	PromOutBytes   prometheus.Counter
+	PromErrorCodes *prometheus.CounterVec
+
 	ErrorCodes map[string]uint64 `json:",omitempty"`
 	lock       sync.Mutex
 }
 
-func (s *statsTicker) setupPrometheus(drv string, reg *prometheus.Registry, lbl prometheus.Labels) {
-	metrics := map[string][]interface{}{
-		"errors":    []interface{}{string("errors"), s.Errors},
-		"in_bytes":  []interface{}{string("input bytes"), s.InBytes},
-		"out_bytes": []interface{}{string("output bytes"), s.OutBytes},
-	}
-	for mName, data := range metrics {
-		mHelp := data[0].(string)
-		mVal := data[1].(uint64)
-		reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        fmt.Sprintf("%s_%s", drv, mName),
-				Help:        fmt.Sprintf("Number of %s backend %s", drv, mHelp),
-				ConstLabels: lbl,
-			},
-			func() float64 { return float64(mVal) },
-		))
-	}
-}
-
 // Tick increments each of the given counters by 1 using
 // atomic.AddUint64.
 func (s *statsTicker) Tick(counters ...*uint64) {
@@ -58,6 +41,7 @@ func (s *statsTicker) TickErr(err error, errType string) {
 	if err == nil {
 		return
 	}
+	s.PromErrors.Inc()
 	s.Tick(&s.Errors)
 
 	s.lock.Lock()
@@ -66,14 +50,17 @@ func (s *statsTicker) TickErr(err error, errType string) {
 	}
 	s.ErrorCodes[errType]++
 	s.lock.Unlock()
+	s.PromErrorCodes.WithLabelValues(errType).Inc()
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
+	s.PromInBytes.Add(float64(n))
 	atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
+	s.PromOutBytes.Add(float64(n))
 	atomic.AddUint64(&s.OutBytes, n)
 }
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 963804639..5c6d1f51a 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -14,7 +14,6 @@ import (
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"github.com/prometheus/client_golang/prometheus"
 )
 
 type BlockWriter interface {
@@ -40,7 +39,7 @@ type Volume interface {
 	// Do whatever private setup tasks and configuration checks
 	// are needed. Return non-nil if the volume is unusable (e.g.,
 	// invalid config).
-	Start() error
+	Start(m *volumeMetrics) error
 
 	// Get a block: copy the block data into buf, and return the
 	// number of bytes copied.
@@ -416,9 +415,3 @@ type ioStats struct {
 type InternalStatser interface {
 	InternalStats() interface{}
 }
-
-// InternalMetricser provides an interface for volume drivers to register their
-// own specific metrics.
-type InternalMetricser interface {
-	SetupInternalMetrics(*prometheus.Registry, prometheus.Labels)
-}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 43ddd090c..72666638d 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -211,7 +211,7 @@ func (v *MockVolume) Type() string {
 	return "Mock"
 }
 
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(m *volumeMetrics) error {
 	return nil
 }
 
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index a80bb7bf4..10cd6398c 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -21,8 +21,6 @@ import (
 	"sync/atomic"
 	"syscall"
 	"time"
-
-	"github.com/prometheus/client_golang/prometheus"
 )
 
 type unixVolumeAdder struct {
@@ -120,6 +118,8 @@ type UnixVolume struct {
 	locker sync.Locker
 
 	os osWithStats
+
+	metrics *volumeMetrics
 }
 
 // DeviceID returns a globally unique ID for the volume's root
@@ -220,7 +220,7 @@ func (v *UnixVolume) Type() string {
 }
 
 // Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(m *volumeMetrics) error {
 	if v.Serialize {
 		v.locker = &sync.Mutex{}
 	}
@@ -231,11 +231,29 @@ func (v *UnixVolume) Start() error {
 		v.DirectoryReplication = 1
 	}
 	_, err := v.os.Stat(v.Root)
+	if err == nil {
+		// Set up prometheus metrics
+		v.metrics = m
+		v.os.stats.PromErrors = v.metrics.Errors
+		v.os.stats.PromErrorCodes = v.metrics.ErrorCodes
+		v.os.stats.PromInBytes = v.metrics.InBytes
+		v.os.stats.PromOutBytes = v.metrics.OutBytes
+		// Periodically update free/used volume space
+		go func() {
+			for {
+				v.metrics.BytesFree.Set(float64(v.Status().BytesFree))
+				v.metrics.BytesUsed.Set(float64(v.Status().BytesUsed))
+				time.Sleep(10 * time.Second)
+			}
+		}()
+	}
 	return err
 }
 
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
+	v.metrics.Ops.Inc()
+	v.metrics.TouchOps.Inc()
 	if v.ReadOnly {
 		return MethodDisabledError
 	}
@@ -301,6 +319,8 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
 func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+	v.metrics.Ops.Inc()
+	v.metrics.GetOps.Inc()
 	return getWithPipe(ctx, loc, buf, v)
 }
 
@@ -324,6 +344,8 @@ func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) err
 // expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
 func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+	v.metrics.Ops.Inc()
+	v.metrics.CompareOps.Inc()
 	path := v.blockPath(loc)
 	if _, err := v.stat(path); err != nil {
 		return v.translateError(err)
@@ -338,6 +360,8 @@ func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) err
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+	v.metrics.Ops.Inc()
+	v.metrics.PutOps.Inc()
 	return putWithPipe(ctx, loc, block, v)
 }
 
@@ -791,42 +815,6 @@ func (v *UnixVolume) EmptyTrash() {
 	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
-// SetupInternalMetrics registers driver stats to Prometheus.
-// Implements InternalMetricser interface.
-func (v *UnixVolume) SetupInternalMetrics(reg *prometheus.Registry, lbl prometheus.Labels) {
-	v.os.stats.setupPrometheus(reg, lbl)
-}
-
-func (s *unixStats) setupPrometheus(reg *prometheus.Registry, lbl prometheus.Labels) {
-	// Common backend metrics
-	s.statsTicker.setupPrometheus("unix", reg, lbl)
-	// Driver-specific backend metrics
-	metrics := map[string][]interface{}{
-		"open_ops":    []interface{}{string("open operations"), s.OpenOps},
-		"stat_ops":    []interface{}{string("stat operations"), s.StatOps},
-		"flock_ops":   []interface{}{string("flock operations"), s.FlockOps},
-		"utimes_ops":  []interface{}{string("utimes operations"), s.UtimesOps},
-		"create_ops":  []interface{}{string("create operations"), s.CreateOps},
-		"rename_ops":  []interface{}{string("rename operations"), s.RenameOps},
-		"unlink_ops":  []interface{}{string("unlink operations"), s.UnlinkOps},
-		"readdir_ops": []interface{}{string("readdir operations"), s.ReaddirOps},
-	}
-	for mName, data := range metrics {
-		mHelp := data[0].(string)
-		mVal := data[1].(uint64)
-		reg.Register(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace:   "arvados",
-				Subsystem:   "keepstore",
-				Name:        fmt.Sprintf("unix_%s", mName),
-				Help:        fmt.Sprintf("Number of unix backend %s", mHelp),
-				ConstLabels: lbl,
-			},
-			func() float64 { return float64(mVal) },
-		))
-	}
-}
-
 type unixStats struct {
 	statsTicker
 	OpenOps    uint64
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 7f1cd2196..05c7a93ae 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -20,6 +20,7 @@ import (
 	"time"
 
 	"github.com/ghodss/yaml"
+	"github.com/prometheus/client_golang/prometheus"
 	check "gopkg.in/check.v1"
 )
 
@@ -115,7 +116,9 @@ func TestReplicationDefault1(t *testing.T) {
 		Root:     "/",
 		ReadOnly: true,
 	}
-	if err := v.Start(); err != nil {
+	metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith(
+		v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum))
+	if err := v.Start(metrics); err != nil {
 		t.Error(err)
 	}
 	if got := v.Replication(); got != 1 {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list