[arvados] created: 2.1.0-2618-g13ea73854

git repository hosting git at public.arvados.org
Fri Jun 24 16:04:17 UTC 2022


        at  13ea738547ce7232c152873970770c21e97d2830 (commit)


commit 13ea738547ce7232c152873970770c21e97d2830
Author: Tom Clegg <tom at curii.com>
Date:   Fri Jun 24 11:25:59 2022 -0400

    19205: Remove redundant RequestTimeout implementation.
    
    lib/controller doesn't need its own -- it uses lib/service, which uses
    httpserver.HandlerWithDeadline.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/controller/handler.go b/lib/controller/handler.go
index f5840b34c..665fd5c63 100644
--- a/lib/controller/handler.go
+++ b/lib/controller/handler.go
@@ -13,7 +13,6 @@ import (
 	"net/url"
 	"strings"
 	"sync"
-	"time"
 
 	"git.arvados.org/arvados.git/lib/controller/api"
 	"git.arvados.org/arvados.git/lib/controller/federation"
@@ -61,12 +60,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 			req.URL.Path = strings.Replace(req.URL.Path, "//", "/", -1)
 		}
 	}
-	if h.Cluster.API.RequestTimeout > 0 {
-		ctx, cancel := context.WithDeadline(req.Context(), time.Now().Add(time.Duration(h.Cluster.API.RequestTimeout)))
-		req = req.WithContext(ctx)
-		defer cancel()
-	}
-
 	h.handlerStack.ServeHTTP(w, req)
 }
 
diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go
index 5e467cb05..39c2b1c68 100644
--- a/lib/controller/handler_test.go
+++ b/lib/controller/handler_test.go
@@ -204,17 +204,21 @@ func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
 	c.Check(len(dd.Schemas), check.Not(check.Equals), 0)
 }
 
-func (s *HandlerSuite) TestRequestTimeout(c *check.C) {
-	s.cluster.API.RequestTimeout = arvados.Duration(time.Nanosecond)
-	req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
+// Handler should give up and exit early if request context is
+// cancelled due to client hangup, httpserver.HandlerWithDeadline,
+// etc.
+func (s *HandlerSuite) TestRequestCancel(c *check.C) {
+	ctx, cancel := context.WithCancel(context.Background())
+	req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil).WithContext(ctx)
 	resp := httptest.NewRecorder()
+	cancel()
 	s.handler.ServeHTTP(resp, req)
 	c.Check(resp.Code, check.Equals, http.StatusBadGateway)
 	var jresp httpserver.ErrorResponse
 	err := json.Unmarshal(resp.Body.Bytes(), &jresp)
 	c.Check(err, check.IsNil)
 	c.Assert(len(jresp.Errors), check.Equals, 1)
-	c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded.*`)
+	c.Check(jresp.Errors[0], check.Matches, `.*context canceled`)
 }
 
 func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) {
diff --git a/sdk/go/httpserver/logger.go b/sdk/go/httpserver/logger.go
index 5a46635e9..b71adf711 100644
--- a/sdk/go/httpserver/logger.go
+++ b/sdk/go/httpserver/logger.go
@@ -47,7 +47,13 @@ func (hn hijackNotifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
 // HandlerWithDeadline cancels the request context if the request
 // takes longer than the specified timeout without having its
 // connection hijacked.
+//
+// If timeout is 0, there is no deadline: HandlerWithDeadline is a
+// no-op.
 func HandlerWithDeadline(timeout time.Duration, next http.Handler) http.Handler {
+	if timeout == 0 {
+		return next
+	}
 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 		ctx, cancel := context.WithCancel(r.Context())
 		defer cancel()

commit 4bec5aa50dc40924741221259bfcbb53056cb35c
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 23 23:57:34 2022 -0400

    19205: Report longest-running active and abandoned reqs in metrics.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 679cbede1..e08af9f61 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -147,9 +147,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 	instrumented := httpserver.Instrument(reg, log,
 		httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
 			httpserver.AddRequestIDs(
-				httpserver.LogRequests(
-					interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-						httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+				httpserver.Inspect(reg, cluster.ManagementToken,
+					httpserver.LogRequests(
+						interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+							httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
 	srv := &httpserver.Server{
 		Server: http.Server{
 			Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go
new file mode 100644
index 000000000..cb08acf96
--- /dev/null
+++ b/sdk/go/httpserver/inspect.go
@@ -0,0 +1,133 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+	"encoding/json"
+	"net/http"
+	"sort"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+// Inspect serves a report of current requests at "GET
+// /_inspect/requests", and passes other requests through to the next
+// handler.
+//
+// If registry is not nil, Inspect registers metrics about current
+// requests.
+func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+	type ent struct {
+		startTime  time.Time
+		hangupTime atomic.Value
+	}
+	current := map[*http.Request]*ent{}
+	mtx := sync.Mutex{}
+	if registry != nil {
+		registry.MustRegister(prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Namespace: "arvados",
+				Name:      "max_active_request_age_seconds",
+				Help:      "Age of oldest active request",
+			},
+			func() float64 {
+				mtx.Lock()
+				defer mtx.Unlock()
+				earliest := time.Time{}
+				any := false
+				for _, e := range current {
+					if _, ok := e.hangupTime.Load().(time.Time); ok {
+						// Don't count abandoned requests here
+						continue
+					}
+					if !any || e.startTime.Before(earliest) {
+						any = true
+						earliest = e.startTime
+					}
+				}
+				if !any {
+					return 0
+				}
+				return float64(time.Since(earliest).Seconds())
+			},
+		))
+		registry.MustRegister(prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Namespace: "arvados",
+				Name:      "max_abandoned_request_age_seconds",
+				Help:      "Maximum time since client hung up on a request whose processing thread is still running",
+			},
+			func() float64 {
+				mtx.Lock()
+				defer mtx.Unlock()
+				earliest := time.Time{}
+				any := false
+				for _, e := range current {
+					if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+						if !any || hangupTime.Before(earliest) {
+							any = true
+							earliest = hangupTime
+						}
+					}
+				}
+				if !any {
+					return 0
+				}
+				return float64(time.Since(earliest).Seconds())
+			},
+		))
+	}
+	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+		if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
+			if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+				Error(w, "unauthorized", http.StatusUnauthorized)
+				return
+			}
+			mtx.Lock()
+			defer mtx.Unlock()
+			type outrec struct {
+				RequestID  string
+				Method     string
+				Host       string
+				URL        string
+				RemoteAddr string
+				Elapsed    float64
+			}
+			now := time.Now()
+			outrecs := []outrec{}
+			for req, e := range current {
+				outrecs = append(outrecs, outrec{
+					RequestID:  req.Header.Get(HeaderRequestID),
+					Method:     req.Method,
+					Host:       req.Host,
+					URL:        req.URL.String(),
+					RemoteAddr: req.RemoteAddr,
+					Elapsed:    now.Sub(e.startTime).Seconds(),
+				})
+			}
+			sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+			w.Header().Set("Content-Type", "application/json")
+			json.NewEncoder(w).Encode(outrecs)
+		} else {
+			e := ent{startTime: time.Now()}
+			mtx.Lock()
+			current[req] = &e
+			mtx.Unlock()
+			go func() {
+				<-req.Context().Done()
+				e.hangupTime.Store(time.Now())
+			}()
+			defer func() {
+				mtx.Lock()
+				defer mtx.Unlock()
+				delete(current, req)
+			}()
+			next.ServeHTTP(w, req)
+		}
+	})
+}
diff --git a/sdk/go/httpserver/inspect_test.go b/sdk/go/httpserver/inspect_test.go
new file mode 100644
index 000000000..cab8a434d
--- /dev/null
+++ b/sdk/go/httpserver/inspect_test.go
@@ -0,0 +1,89 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+	"context"
+	"encoding/json"
+	"net/http"
+	"net/http/httptest"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+	check "gopkg.in/check.v1"
+)
+
+func (s *Suite) TestInspect(c *check.C) {
+	reg := prometheus.NewRegistry()
+	h := newTestHandler()
+	mh := Inspect(reg, "abcd", h)
+	handlerReturned := make(chan struct{})
+	reqctx, reqcancel := context.WithCancel(context.Background())
+	longreq := httptest.NewRequest("GET", "/test", nil).WithContext(reqctx)
+	go func() {
+		mh.ServeHTTP(httptest.NewRecorder(), longreq)
+		close(handlerReturned)
+	}()
+	<-h.inHandler
+
+	resp := httptest.NewRecorder()
+	req := httptest.NewRequest("GET", "/_inspect/requests", nil)
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+	c.Check(resp.Body.String(), check.Equals, `{"errors":["unauthorized"]}`+"\n")
+
+	resp = httptest.NewRecorder()
+	req.Header.Set("Authorization", "Bearer abcde")
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+	resp = httptest.NewRecorder()
+	req.Header.Set("Authorization", "Bearer abcd")
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	reqs := []map[string]interface{}{}
+	err := json.NewDecoder(resp.Body).Decode(&reqs)
+	c.Check(err, check.IsNil)
+	c.Check(reqs, check.HasLen, 1)
+	c.Check(reqs[0]["URL"], check.Equals, "/test")
+
+	// Request is active, so we should see active request age > 0
+	resp = httptest.NewRecorder()
+	mreq := httptest.NewRequest("GET", "/metrics", nil)
+	promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+	reqcancel()
+
+	// Request context is canceled but handler hasn't returned, so
+	// we should see max abandoned request age > 0
+	resp = httptest.NewRecorder()
+	promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+
+	h.okToProceed <- struct{}{}
+	<-handlerReturned
+
+	// Handler has returned, so we should see max abandoned
+	// request age == max active request age == 0
+	resp = httptest.NewRecorder()
+	promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+	// ...and no active requests at the /_monitor endpoint
+	resp = httptest.NewRecorder()
+	mh.ServeHTTP(resp, req)
+	c.Check(resp.Code, check.Equals, http.StatusOK)
+	reqs = nil
+	err = json.NewDecoder(resp.Body).Decode(&reqs)
+	c.Check(err, check.IsNil)
+	c.Assert(reqs, check.HasLen, 0)
+}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 64d1f3d4c..9258fbfa5 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -22,7 +22,7 @@ func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	<-h.okToProceed
 }
 
-func newTestHandler(maxReqs int) *testHandler {
+func newTestHandler() *testHandler {
 	return &testHandler{
 		inHandler:   make(chan struct{}),
 		okToProceed: make(chan struct{}),
@@ -30,7 +30,7 @@ func newTestHandler(maxReqs int) *testHandler {
 }
 
 func TestRequestLimiter1(t *testing.T) {
-	h := newTestHandler(10)
+	h := newTestHandler()
 	l := NewRequestLimiter(1, h, nil)
 	var wg sync.WaitGroup
 	resps := make([]*httptest.ResponseRecorder, 10)
@@ -90,7 +90,7 @@ func TestRequestLimiter1(t *testing.T) {
 }
 
 func TestRequestLimiter10(t *testing.T) {
-	h := newTestHandler(10)
+	h := newTestHandler()
 	l := NewRequestLimiter(10, h, nil)
 	var wg sync.WaitGroup
 	for i := 0; i < 10; i++ {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list