[arvados] created: 2.1.0-2616-g66dad8c81

git repository hosting git at public.arvados.org
Thu Jun 23 21:53:11 UTC 2022


        at  66dad8c813d165be187cc1d5671c50ec7f1f8147 (commit)


commit 66dad8c813d165be187cc1d5671c50ec7f1f8147
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 23 17:51:02 2022 -0400

    19205: Report longest-running active and abandoned reqs in metrics.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 679cbede1..978e73045 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -147,9 +147,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 	instrumented := httpserver.Instrument(reg, log,
 		httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
 			httpserver.AddRequestIDs(
-				httpserver.LogRequests(
-					interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-						httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+				httpserver.Monitor(reg, cluster.ManagementToken,
+					httpserver.LogRequests(
+						interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+							httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
 	srv := &httpserver.Server{
 		Server: http.Server{
 			Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
diff --git a/sdk/go/httpserver/monitor.go b/sdk/go/httpserver/monitor.go
new file mode 100644
index 000000000..84eab68fa
--- /dev/null
+++ b/sdk/go/httpserver/monitor.go
@@ -0,0 +1,134 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+	"encoding/json"
+	"net/http"
+	"sort"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+// Monitor serves a report of current requests at "GET /_monitor/",
+// and passes other requests through to the next handler.
+//
+// If registry is not nil, Monitor registers metrics about current
+// requests.
+func Monitor(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+	type ent struct {
+		startTime  time.Time
+		hangupTime atomic.Value
+	}
+	current := map[*http.Request]*ent{}
+	mtx := sync.Mutex{}
+	if registry != nil {
+		registry.MustRegister(prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Namespace: "arvados",
+				Name:      "max_active_request_age_seconds",
+				Help:      "Age of oldest active request",
+			},
+			func() float64 {
+				mtx.Lock()
+				defer mtx.Unlock()
+				earliest := time.Time{}
+				any := false
+				for _, e := range current {
+					if _, ok := e.hangupTime.Load().(time.Time); ok {
+						// Don't count abandoned requests here
+						continue
+					}
+					if !any || e.startTime.Before(earliest) {
+						any = true
+						earliest = e.startTime
+					}
+				}
+				if !any {
+					return 0
+				}
+				return float64(time.Since(earliest).Seconds())
+			},
+		))
+		registry.MustRegister(prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Namespace: "arvados",
+				Name:      "max_abandoned_request_age_seconds",
+				Help:      "Maximum time since client hung up on a request whose processing thread is still running",
+			},
+			func() float64 {
+				mtx.Lock()
+				defer mtx.Unlock()
+				earliest := time.Time{}
+				any := false
+				for _, e := range current {
+					if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+						if !any || hangupTime.Before(earliest) {
+							any = true
+							earliest = hangupTime
+						}
+					}
+				}
+				if !any {
+					return 0
+				}
+				return float64(time.Since(earliest).Seconds())
+			},
+		))
+	}
+	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+		if req.Method == "GET" && req.URL.Path == "/_monitor/" {
+			if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+				Error(w, "unauthorized", http.StatusUnauthorized)
+				return
+			}
+			mtx.Lock()
+			defer mtx.Unlock()
+			type outrec struct {
+				RequestID  string
+				Method     string
+				Host       string
+				URL        string
+				RemoteAddr string
+				Elapsed    float64
+			}
+			now := time.Now()
+			outrecs := []outrec{}
+			for req, e := range current {
+				outrecs = append(outrecs, outrec{
+					RequestID:  req.Header.Get(HeaderRequestID),
+					Method:     req.Method,
+					Host:       req.Host,
+					URL:        req.URL.String(),
+					RemoteAddr: req.RemoteAddr,
+					Elapsed:    now.Sub(e.startTime).Seconds(),
+				})
+			}
+			sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+			w.Header().Set("Content-Type", "application/json")
+			json.NewEncoder(w).Encode(map[string]interface{}{
+				"Requests": outrecs,
+			})
+		} else {
+			e := ent{startTime: time.Now()}
+			mtx.Lock()
+			current[req] = &e
+			mtx.Unlock()
+			go func() {
+				<-req.Context().Done()
+				e.hangupTime.Store(time.Now())
+			}()
+			defer func() {
+				mtx.Lock()
+				defer mtx.Unlock()
+				delete(current, req)
+			}()
+			next.ServeHTTP(w, req)
+		}
+	})
+}
diff --git a/sdk/go/httpserver/monitor_test.go b/sdk/go/httpserver/monitor_test.go
new file mode 100644
index 000000000..b95c0ad57
--- /dev/null
+++ b/sdk/go/httpserver/monitor_test.go
@@ -0,0 +1,57 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+	"encoding/json"
+	"net/http"
+	"net/http/httptest"
+
+	"github.com/prometheus/client_golang/prometheus"
+	check "gopkg.in/check.v1"
+)
+
+func (s *Suite) TestMonitor(c *check.C) {
+	reg := prometheus.NewRegistry()
+	h := newTestHandler()
+	mh := Monitor(reg, "abcd", h)
+	handlerReturned := make(chan struct{})
+	go func() {
+		mh.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "/test", nil))
+		close(handlerReturned)
+	}()
+	<-h.inHandler
+
+	resp := httptest.NewRecorder()
+	req := httptest.NewRequest("GET", "/_monitor/", nil)
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+	c.Check(resp.Body.String(), check.Equals, `{"errors":["unauthorized"]}`+"\n")
+
+	resp = httptest.NewRecorder()
+	req.Header.Set("Authorization", "Bearer abcde")
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+	resp = httptest.NewRecorder()
+	req.Header.Set("Authorization", "Bearer abcd")
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	m := map[string]interface{}{}
+	err := json.NewDecoder(resp.Body).Decode(&m)
+	c.Check(err, check.IsNil)
+	c.Assert(m["Requests"], check.NotNil)
+	c.Check(m["Requests"].([]interface{})[0].(map[string]interface{})["URL"], check.Equals, "/test")
+	h.okToProceed <- struct{}{}
+	<-handlerReturned
+
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	m = map[string]interface{}{}
+	err = json.NewDecoder(resp.Body).Decode(&m)
+	c.Check(err, check.IsNil)
+	c.Assert(m["Requests"], check.NotNil)
+	c.Check(m["Requests"].([]interface{}), check.HasLen, 0)
+}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 64d1f3d4c..9258fbfa5 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -22,7 +22,7 @@ func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	<-h.okToProceed
 }
 
-func newTestHandler(maxReqs int) *testHandler {
+func newTestHandler() *testHandler {
 	return &testHandler{
 		inHandler:   make(chan struct{}),
 		okToProceed: make(chan struct{}),
@@ -30,7 +30,7 @@ func newTestHandler(maxReqs int) *testHandler {
 }
 
 func TestRequestLimiter1(t *testing.T) {
-	h := newTestHandler(10)
+	h := newTestHandler()
 	l := NewRequestLimiter(1, h, nil)
 	var wg sync.WaitGroup
 	resps := make([]*httptest.ResponseRecorder, 10)
@@ -90,7 +90,7 @@ func TestRequestLimiter1(t *testing.T) {
 }
 
 func TestRequestLimiter10(t *testing.T) {
-	h := newTestHandler(10)
+	h := newTestHandler()
 	l := NewRequestLimiter(10, h, nil)
 	var wg sync.WaitGroup
 	for i := 0; i < 10; i++ {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list