[arvados] created: 2.1.0-2616-g66ac14621
git repository hosting
git at public.arvados.org
Fri Jun 24 03:57:56 UTC 2022
at 66ac14621e8a624c704618f604e7998656d63042 (commit)
commit 66ac14621e8a624c704618f604e7998656d63042
Author: Tom Clegg <tom at curii.com>
Date: Thu Jun 23 23:57:34 2022 -0400
19205: Report longest-running active and abandoned reqs in metrics.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 679cbede1..978e73045 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -147,9 +147,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
instrumented := httpserver.Instrument(reg, log,
httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
httpserver.AddRequestIDs(
- httpserver.LogRequests(
- interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
- httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+ httpserver.Monitor(reg, cluster.ManagementToken,
+ httpserver.LogRequests(
+ interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+ httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go
new file mode 100644
index 000000000..cb08acf96
--- /dev/null
+++ b/sdk/go/httpserver/inspect.go
@@ -0,0 +1,133 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+ "encoding/json"
+ "net/http"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// Inspect serves a report of current requests at "GET
+// /_inspect/requests", and passes other requests through to the next
+// handler.
+//
+// If registry is not nil, Inspect registers metrics about current
+// requests.
+func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+ type ent struct {
+ startTime time.Time
+ hangupTime atomic.Value
+ }
+ current := map[*http.Request]*ent{}
+ mtx := sync.Mutex{}
+ if registry != nil {
+ registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_active_request_age_seconds",
+ Help: "Age of oldest active request",
+ },
+ func() float64 {
+ mtx.Lock()
+ defer mtx.Unlock()
+ earliest := time.Time{}
+ any := false
+ for _, e := range current {
+ if _, ok := e.hangupTime.Load().(time.Time); ok {
+ // Don't count abandoned requests here
+ continue
+ }
+ if !any || e.startTime.Before(earliest) {
+ any = true
+ earliest = e.startTime
+ }
+ }
+ if !any {
+ return 0
+ }
+ return float64(time.Since(earliest).Seconds())
+ },
+ ))
+ registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_abandoned_request_age_seconds",
+ Help: "Maximum time since client hung up on a request whose processing thread is still running",
+ },
+ func() float64 {
+ mtx.Lock()
+ defer mtx.Unlock()
+ earliest := time.Time{}
+ any := false
+ for _, e := range current {
+ if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+ if !any || hangupTime.Before(earliest) {
+ any = true
+ earliest = hangupTime
+ }
+ }
+ }
+ if !any {
+ return 0
+ }
+ return float64(time.Since(earliest).Seconds())
+ },
+ ))
+ }
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
+ if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+ Error(w, "unauthorized", http.StatusUnauthorized)
+ return
+ }
+ mtx.Lock()
+ defer mtx.Unlock()
+ type outrec struct {
+ RequestID string
+ Method string
+ Host string
+ URL string
+ RemoteAddr string
+ Elapsed float64
+ }
+ now := time.Now()
+ outrecs := []outrec{}
+ for req, e := range current {
+ outrecs = append(outrecs, outrec{
+ RequestID: req.Header.Get(HeaderRequestID),
+ Method: req.Method,
+ Host: req.Host,
+ URL: req.URL.String(),
+ RemoteAddr: req.RemoteAddr,
+ Elapsed: now.Sub(e.startTime).Seconds(),
+ })
+ }
+ sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(outrecs)
+ } else {
+ e := ent{startTime: time.Now()}
+ mtx.Lock()
+ current[req] = &e
+ mtx.Unlock()
+ go func() {
+ <-req.Context().Done()
+ e.hangupTime.Store(time.Now())
+ }()
+ defer func() {
+ mtx.Lock()
+ defer mtx.Unlock()
+ delete(current, req)
+ }()
+ next.ServeHTTP(w, req)
+ }
+ })
+}
diff --git a/sdk/go/httpserver/inspect_test.go b/sdk/go/httpserver/inspect_test.go
new file mode 100644
index 000000000..cab8a434d
--- /dev/null
+++ b/sdk/go/httpserver/inspect_test.go
@@ -0,0 +1,89 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ check "gopkg.in/check.v1"
+)
+
+func (s *Suite) TestInspect(c *check.C) {
+ reg := prometheus.NewRegistry()
+ h := newTestHandler()
+ mh := Inspect(reg, "abcd", h)
+ handlerReturned := make(chan struct{})
+ reqctx, reqcancel := context.WithCancel(context.Background())
+ longreq := httptest.NewRequest("GET", "/test", nil).WithContext(reqctx)
+ go func() {
+ mh.ServeHTTP(httptest.NewRecorder(), longreq)
+ close(handlerReturned)
+ }()
+ <-h.inHandler
+
+ resp := httptest.NewRecorder()
+ req := httptest.NewRequest("GET", "/_inspect/requests", nil)
+ mh.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+ c.Check(resp.Body.String(), check.Equals, `{"errors":["unauthorized"]}`+"\n")
+
+ resp = httptest.NewRecorder()
+ req.Header.Set("Authorization", "Bearer abcde")
+ mh.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+ resp = httptest.NewRecorder()
+ req.Header.Set("Authorization", "Bearer abcd")
+ mh.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ reqs := []map[string]interface{}{}
+ err := json.NewDecoder(resp.Body).Decode(&reqs)
+ c.Check(err, check.IsNil)
+ c.Check(reqs, check.HasLen, 1)
+ c.Check(reqs[0]["URL"], check.Equals, "/test")
+
+ // Request is active, so we should see active request age > 0
+ resp = httptest.NewRecorder()
+ mreq := httptest.NewRequest("GET", "/metrics", nil)
+ promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+ reqcancel()
+
+ // Request context is canceled but handler hasn't returned, so
+ // we should see max abandoned request age > 0
+ resp = httptest.NewRecorder()
+ promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+
+ h.okToProceed <- struct{}{}
+ <-handlerReturned
+
+ // Handler has returned, so we should see max abandoned
+ // request age == max active request age == 0
+ resp = httptest.NewRecorder()
+ promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+ // ...and no active requests at the /_monitor endpoint
+ resp = httptest.NewRecorder()
+ mh.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ reqs = nil
+ err = json.NewDecoder(resp.Body).Decode(&reqs)
+ c.Check(err, check.IsNil)
+ c.Assert(reqs, check.HasLen, 0)
+}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 64d1f3d4c..9258fbfa5 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -22,7 +22,7 @@ func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
<-h.okToProceed
}
-func newTestHandler(maxReqs int) *testHandler {
+func newTestHandler() *testHandler {
return &testHandler{
inHandler: make(chan struct{}),
okToProceed: make(chan struct{}),
@@ -30,7 +30,7 @@ func newTestHandler(maxReqs int) *testHandler {
}
func TestRequestLimiter1(t *testing.T) {
- h := newTestHandler(10)
+ h := newTestHandler()
l := NewRequestLimiter(1, h, nil)
var wg sync.WaitGroup
resps := make([]*httptest.ResponseRecorder, 10)
@@ -90,7 +90,7 @@ func TestRequestLimiter1(t *testing.T) {
}
func TestRequestLimiter10(t *testing.T) {
- h := newTestHandler(10)
+ h := newTestHandler()
l := NewRequestLimiter(10, h, nil)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list