[arvados] created: 2.7.0-5677-g54bc3e80aa
git repository hosting
git at public.arvados.org
Tue Jan 16 23:13:31 UTC 2024
at 54bc3e80aa934729f9bbc73fee4116a0d7cc9f3a (commit)
commit 54bc3e80aa934729f9bbc73fee4116a0d7cc9f3a
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 16 17:13:01 2024 -0500
15317: Add transfer size bucketed latency/speed metrics.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 12c2839f8c..e0da14e774 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -34,6 +34,7 @@ import (
type handler struct {
Cache cache
Cluster *arvados.Cluster
+ metrics *metrics
lockMtx sync.Mutex
lock map[string]*sync.RWMutex
@@ -592,7 +593,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
if webdavPrefix == "" {
webdavPrefix = "/" + strings.Join(pathParts[:stripParts], "/")
}
- wh := webdav.Handler{
+ wh := &webdav.Handler{
Prefix: webdavPrefix,
FileSystem: &webdavfs.FS{
FileSystem: sessionFS,
@@ -607,7 +608,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
}
},
}
- wh.ServeHTTP(w, r)
+ h.metrics.track(wh, w, r)
if r.Method == http.MethodGet && w.WroteStatus() == http.StatusOK {
wrote := int64(w.WroteBodyBytes())
fnm := strings.Join(pathParts[stripParts:], "/")
diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go
index eefab36e69..07c7016d3a 100644
--- a/services/keep-web/handler_test.go
+++ b/services/keep-web/handler_test.go
@@ -60,6 +60,7 @@ func (s *UnitSuite) SetUpTest(c *check.C) {
logger: logger,
registry: prometheus.NewRegistry(),
},
+ metrics: newMetrics(prometheus.NewRegistry()),
}
}
diff --git a/services/keep-web/main.go b/services/keep-web/main.go
index cd379dc6bd..690e75a251 100644
--- a/services/keep-web/main.go
+++ b/services/keep-web/main.go
@@ -41,5 +41,6 @@ func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg
logger: logger,
registry: reg,
},
+ metrics: newMetrics(reg),
}, nil
}
diff --git a/services/keep-web/metrics.go b/services/keep-web/metrics.go
new file mode 100644
index 0000000000..ba296c5533
--- /dev/null
+++ b/services/keep-web/metrics.go
@@ -0,0 +1,153 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepweb
+
+import (
+ "io"
+ "net/http"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+type metrics struct {
+ mDownloadSpeed *prometheus.HistogramVec
+ mDownloadBackendSpeed *prometheus.HistogramVec
+ mUploadSpeed *prometheus.HistogramVec
+ mUploadSyncDelay *prometheus.HistogramVec
+}
+
+func newMetrics(reg *prometheus.Registry) *metrics {
+ m := &metrics{
+ mDownloadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb",
+ Name: "download_speed",
+ Help: "Download speed (bytes per second) bucketed by transfer size range",
+ Buckets: []float64{10_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000},
+ }, []string{"size_range"}),
+ mDownloadBackendSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb",
+ Name: "download_backend_speed",
+ Help: "Effective backend speed (bytes per second) when serving file downloads, bucketed by transfer size range",
+ Buckets: []float64{10_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000},
+ }, []string{"size_range"}),
+ mUploadSpeed: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb",
+ Name: "upload_speed",
+ Help: "Upload speed (bytes per second) bucketed by transfer size range",
+ Buckets: []float64{10_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000},
+ }, []string{"size_range"}),
+ mUploadSyncDelay: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "arvados",
+ Subsystem: "keepweb",
+ Name: "upload_sync_delay_seconds",
+ Help: "Upload sync delay (time from last byte received to HTTP response)",
+ }, []string{"size_range"}),
+ }
+ reg.MustRegister(m.mDownloadSpeed)
+ reg.MustRegister(m.mDownloadBackendSpeed)
+ reg.MustRegister(m.mUploadSpeed)
+ reg.MustRegister(m.mUploadSyncDelay)
+ return m
+}
+
+// run handler(w,r) and record upload/download metrics as applicable.
+func (m *metrics) track(handler http.Handler, w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case http.MethodGet:
+ dt := newDownloadTracker(w)
+ handler.ServeHTTP(dt, r)
+ size := dt.bytesOut
+ if size == 0 {
+ return
+ }
+ bucket := sizeRange(size)
+ m.mDownloadSpeed.WithLabelValues(bucket).Observe(float64(dt.bytesOut) / time.Since(dt.t0).Seconds())
+ m.mDownloadBackendSpeed.WithLabelValues(bucket).Observe(float64(size) / (dt.backendWait + time.Since(dt.lastByte)).Seconds())
+ case http.MethodPut:
+ ut := newUploadTracker(r)
+ handler.ServeHTTP(w, r)
+ d := ut.lastByte.Sub(ut.t0)
+ if d <= 0 {
+ // Read() was not called, or did not return
+ // any data
+ return
+ }
+ size := ut.bytesIn
+ bucket := sizeRange(size)
+ m.mUploadSpeed.WithLabelValues(bucket).Observe(float64(ut.bytesIn) / d.Seconds())
+ m.mUploadSyncDelay.WithLabelValues(bucket).Observe(time.Since(ut.lastByte).Seconds())
+ default:
+ handler.ServeHTTP(w, r)
+ }
+}
+
+// Assign a sizeRange based on number of bytes transferred (not the
+// same as file size in the case of a Range request or interrupted
+// transfer).
+func sizeRange(size int64) string {
+ switch {
+ case size <= 1_000_000:
+ return "0"
+ case size <= 10_000_000:
+ return "1M"
+ case size <= 100_000_000:
+ return "10M"
+ default:
+ return "100M"
+ }
+}
+
+type downloadTracker struct {
+ http.ResponseWriter
+ t0 time.Time
+
+ firstByte time.Time // time of first call to Write
+ lastByte time.Time // time of most recent call to Write
+ bytesOut int64 // bytes sent to client so far
+ backendWait time.Duration // total of intervals between Write calls
+}
+
+func newDownloadTracker(w http.ResponseWriter) *downloadTracker {
+ return &downloadTracker{ResponseWriter: w, t0: time.Now()}
+}
+func (dt *downloadTracker) Write(p []byte) (int, error) {
+ if dt.lastByte.IsZero() {
+ dt.backendWait += time.Since(dt.t0)
+ } else {
+ dt.backendWait += time.Since(dt.lastByte)
+ }
+ if dt.firstByte.IsZero() {
+ dt.firstByte = time.Now()
+ }
+ n, err := dt.ResponseWriter.Write(p)
+ dt.bytesOut += int64(n)
+ dt.lastByte = time.Now()
+ return n, err
+}
+
+type uploadTracker struct {
+ io.ReadCloser
+ t0 time.Time
+ lastByte time.Time
+ bytesIn int64
+}
+
+func newUploadTracker(r *http.Request) *uploadTracker {
+ now := time.Now()
+ ut := &uploadTracker{ReadCloser: r.Body, t0: now}
+ r.Body = ut
+ return ut
+}
+
+func (ut *uploadTracker) Read(p []byte) (int, error) {
+ n, err := ut.ReadCloser.Read(p)
+ ut.lastByte = time.Now()
+ ut.bytesIn += int64(n)
+ return n, err
+}
diff --git a/services/keep-web/server_test.go b/services/keep-web/server_test.go
index dd29c40082..ec7a2391ea 100644
--- a/services/keep-web/server_test.go
+++ b/services/keep-web/server_test.go
@@ -412,6 +412,24 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
resp.Body.Close()
}
+ var coll arvados.Collection
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.IsNil)
+ arv.ApiToken = arvadostest.ActiveTokenV2
+ err = arv.Create("collections", map[string]interface{}{"ensure_unique_name": true}, &coll)
+ c.Assert(err, check.IsNil)
+ defer arv.Delete("collections", coll.UUID, nil, nil)
+ for i := 0; i < 2; i++ {
+ size := 1 << (i * 12)
+ req, _ = http.NewRequest("PUT", srvaddr+"/zero-"+fmt.Sprintf("%d", size), bytes.NewReader(make([]byte, size)))
+ req.Host = coll.UUID + ".example.com"
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp, err = http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusCreated)
+ resp.Body.Close()
+ }
+
time.Sleep(metricsUpdateInterval * 2)
req, _ = http.NewRequest("GET", srvaddr+"/metrics.json", nil)
@@ -476,7 +494,7 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
c.Check(summaries["request_duration_seconds/get/200"].SampleCount, check.Equals, "3")
c.Check(summaries["request_duration_seconds/get/404"].SampleCount, check.Equals, "1")
c.Check(summaries["time_to_status_seconds/get/404"].SampleCount, check.Equals, "1")
- c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(624))
+ c.Check(gauges["arvados_keepweb_sessions_cached_session_bytes//"].Value, check.Equals, float64(1208))
// If the Host header indicates a collection, /metrics.json
// refers to a file in the collection -- the metrics handler
@@ -490,6 +508,24 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
c.Assert(err, check.IsNil)
c.Check(resp.StatusCode, check.Equals, http.StatusNotFound)
}
+
+ // Dump entire metrics output in test logs
+ req, _ = http.NewRequest("GET", srvaddr+"/metrics", nil)
+ req.Host = cluster.Services.WebDAVDownload.ExternalURL.Host
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+ resp, err = http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+
+ c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_download_backend_speed_bucket{size_range="0",le="1e\+06"} 4\n.*`)
+ c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_download_speed_bucket{size_range="0",le="\+Inf"} 4\n.*`)
+ c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_upload_speed_bucket{size_range="0",le="\+Inf"} 2\n.*`)
+ c.Check(string(buf), check.Matches, `(?ms).*\narvados_keepweb_upload_sync_delay_seconds_bucket{size_range="0",le="10"} 2\n.*`)
+
+ // Dump entire metrics output in test logs
+ c.Logf("%s", buf)
}
func (s *IntegrationSuite) SetUpSuite(c *check.C) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list