[arvados] created: 2.6.0-285-g1fd838ddc

git repository hosting git at public.arvados.org
Mon Jun 19 20:41:49 UTC 2023


        at  1fd838ddcc3274448908227c285d1b7f233207d9 (commit)


commit 1fd838ddcc3274448908227c285d1b7f233207d9
Author: Tom Clegg <tom at curii.com>
Date:   Mon Jun 19 16:41:18 2023 -0400

    20602: Allow lock requests to queue up to 2s (configurable).
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 33c1e497d..0fb4a2bab 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -236,6 +236,10 @@ Clusters:
       # additional requests while at the MaxConcurrentRequests limit.
       MaxQueuedRequests: 64
 
+      # Maximum time a "lock container" request is allowed to wait in
+      # the incoming request queue before returning 503.
+      MaxQueueTimeForLockRequests: 2s
+
       # Fraction of MaxConcurrentRequests that can be "log create"
       # messages at any given time.  This is to prevent logging
       # updates from crowding out more important requests.
diff --git a/lib/config/export.go b/lib/config/export.go
index 565be2fb7..f46f5b6f8 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -73,6 +73,7 @@ var whitelist = map[string]bool{
 	"API.MaxItemsPerResponse":                  true,
 	"API.MaxKeepBlobBuffers":                   false,
 	"API.MaxQueuedRequests":                    false,
+	"API.MaxQueueTimeForLockRequests":          false,
 	"API.MaxRequestAmplification":              false,
 	"API.MaxRequestSize":                       true,
 	"API.MaxTokenLifetime":                     false,
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index f4d4ec8ef..854b94861 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -155,11 +155,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 					httpserver.LogRequests(
 						interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
 							&httpserver.RequestLimiter{
-								Handler:       handler,
-								MaxConcurrent: cluster.API.MaxConcurrentRequests,
-								MaxQueue:      cluster.API.MaxQueuedRequests,
-								Priority:      c.requestPriority,
-								Registry:      reg}))))))
+								Handler:                    handler,
+								MaxConcurrent:              cluster.API.MaxConcurrentRequests,
+								MaxQueue:                   cluster.API.MaxQueuedRequests,
+								MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+								Priority:                   c.requestPriority,
+								Registry:                   reg}))))))
 	srv := &httpserver.Server{
 		Server: http.Server{
 			Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -261,7 +262,7 @@ func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
 		// Return 503 immediately instead of queueing. We want
 		// to send feedback to dispatchcloud ASAP to stop
 		// bringing up new containers.
-		return httpserver.IneligibleForQueuePriority
+		return httpserver.MinPriority
 	case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
 		// "Create log entry" is the most harmless kind of
 		// request to drop. Negative priority is called "low"
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 2e9abf2ec..0fafa41f9 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -101,6 +101,7 @@ type Cluster struct {
 		MaxItemsPerResponse              int
 		MaxConcurrentRequests            int
 		MaxQueuedRequests                int
+		MaxQueueTimeForLockRequests      Duration
 		LogCreateRequestFraction         float64
 		MaxKeepBlobBuffers               int
 		MaxRequestAmplification          int
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 33a2eb614..9d501ab0e 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -15,7 +15,7 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
 
 // Prometheus typically polls every 10 seconds, but it doesn't cost us
 // much to also accommodate higher frequency collection by updating
@@ -48,6 +48,11 @@ type RequestLimiter struct {
 	// handled FIFO.
 	Priority func(req *http.Request, queued time.Time) int64
 
+	// Return 503 for any request for which Priority() returns
+	// MinPriority if it spends longer than this in the queue
+	// before starting processing.
+	MaxQueueTimeForMinPriority time.Duration
+
 	// "concurrent_requests", "max_concurrent_requests",
 	// "queued_requests", and "max_queued_requests" metrics are
 	// registered with Registry, if it is not nil.
@@ -233,14 +238,6 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
 		ent.ready <- true
 		return ent
 	}
-	if priority == IneligibleForQueuePriority {
-		// Priority func is telling us to return 503
-		// immediately instead of queueing, regardless of
-		// queue size, if we can't handle the request
-		// immediately.
-		ent.ready <- false
-		return ent
-	}
 	rl.queue.add(ent)
 	rl.trimqueue()
 	return ent
@@ -259,6 +256,16 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 	rl.setupOnce.Do(rl.setup)
 	ent := rl.enqueue(req)
 	SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+	if ent.priority == MinPriority {
+		// Note that MaxQueueTime==0 does not cancel a req
+		// that skips the queue, because in that case
+		// rl.enqueue() has already fired ready<-true and
+		// rl.remove() is a no-op.
+		go func() {
+			time.Sleep(rl.MaxQueueTimeForMinPriority)
+			rl.remove(ent)
+		}()
+	}
 	var ok bool
 	select {
 	case <-req.Context().Done():
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index b04ff57cc..55f13b462 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -134,19 +134,40 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
 		<-h.inHandler
 	}
 
-	c.Logf("starting %d priority=IneligibleForQueuePriority requests (should respond 503 immediately)", rl.MaxQueue)
+	c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue)
 	var wgX sync.WaitGroup
 	for i := 0; i < rl.MaxQueue; i++ {
 		wgX.Add(1)
 		go func() {
 			defer wgX.Done()
 			resp := httptest.NewRecorder()
-			rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", IneligibleForQueuePriority)}}})
+			rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
 			c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
 		}()
 	}
 	wgX.Wait()
 
+	c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue)
+	// Usage docs say the caller isn't allowed to change fields
+	// after first use, but we secretly know it's OK to change
+	// this field on the fly as long as no requests are arriving
+	// concurrently.
+	rl.MaxQueueTimeForMinPriority = time.Millisecond * 100
+	for i := 0; i < rl.MaxQueue; i++ {
+		wgX.Add(1)
+		go func() {
+			defer wgX.Done()
+			resp := httptest.NewRecorder()
+			t0 := time.Now()
+			rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
+			c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+			elapsed := time.Since(t0)
+			c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true)
+			c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true)
+		}()
+	}
+	wgX.Wait()
+
 	c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
 	var wg1, wg2 sync.WaitGroup
 	wg1.Add(rl.MaxQueue)

commit 2f59cb8dd12e80a6a82f834a6599fd9d8bb0f586
Author: Tom Clegg <tom at curii.com>
Date:   Mon Jun 19 14:37:50 2023 -0400

    20602: Report queue usage bucketed by priority.
    
    Also report queue time stats for requests that are abandoned before
    getting a processing slot.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 724a26fa2..33a2eb614 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -17,6 +17,12 @@ import (
 
 const IneligibleForQueuePriority = math.MinInt64
 
+// Prometheus typically polls every 10 seconds, but it doesn't cost us
+// much to also accommodate higher frequency collection by updating
+// internal stats more frequently. (This limits time resolution only
+// for the metrics that aren't generated on the fly.)
+const metricsUpdateInterval = time.Second
+
 // RequestLimiter wraps http.Handler, limiting the number of
 // concurrent requests being handled by the wrapped Handler. Requests
 // that arrive when the handler is already at the specified
@@ -47,11 +53,13 @@ type RequestLimiter struct {
 	// registered with Registry, if it is not nil.
 	Registry *prometheus.Registry
 
-	setupOnce   sync.Once
-	mQueueDelay *prometheus.SummaryVec
-	mtx         sync.Mutex
-	handling    int
-	queue       queue
+	setupOnce     sync.Once
+	mQueueDelay   *prometheus.SummaryVec
+	mQueueTimeout *prometheus.SummaryVec
+	mQueueUsage   *prometheus.GaugeVec
+	mtx           sync.Mutex
+	handling      int
+	queue         queue
 }
 
 type qent struct {
@@ -128,18 +136,12 @@ func (rl *RequestLimiter) setup() {
 			},
 			func() float64 { return float64(rl.MaxConcurrent) },
 		))
-		rl.Registry.MustRegister(prometheus.NewGaugeFunc(
-			prometheus.GaugeOpts{
-				Namespace: "arvados",
-				Name:      "queued_requests",
-				Help:      "Number of requests in queue",
-			},
-			func() float64 {
-				rl.mtx.Lock()
-				defer rl.mtx.Unlock()
-				return float64(len(rl.queue))
-			},
-		))
+		rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Namespace: "arvados",
+			Name:      "queued_requests",
+			Help:      "Number of requests in queue",
+		}, []string{"priority"})
+		rl.Registry.MustRegister(rl.mQueueUsage)
 		rl.Registry.MustRegister(prometheus.NewGaugeFunc(
 			prometheus.GaugeOpts{
 				Namespace: "arvados",
@@ -151,10 +153,37 @@ func (rl *RequestLimiter) setup() {
 		rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
 			Namespace:  "arvados",
 			Name:       "queue_delay_seconds",
-			Help:       "Time spent in the incoming request queue",
+			Help:       "Time spent in the incoming request queue before start of processing",
 			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
 		}, []string{"priority"})
 		rl.Registry.MustRegister(rl.mQueueDelay)
+		rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+			Namespace:  "arvados",
+			Name:       "queue_timeout_seconds",
+			Help:       "Time spent in the incoming request queue before client timed out or disconnected",
+			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+		}, []string{"priority"})
+		rl.Registry.MustRegister(rl.mQueueTimeout)
+		go func() {
+			for range time.NewTicker(metricsUpdateInterval).C {
+				var low, normal, high int
+				rl.mtx.Lock()
+				for _, ent := range rl.queue {
+					switch {
+					case ent.priority < 0:
+						low++
+					case ent.priority > 0:
+						high++
+					default:
+						normal++
+					}
+				}
+				rl.mtx.Unlock()
+				rl.mQueueUsage.WithLabelValues("low").Set(float64(low))
+				rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal))
+				rl.mQueueUsage.WithLabelValues("high").Set(float64(high))
+			}
+		}()
 	}
 }
 
@@ -231,10 +260,8 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 	ent := rl.enqueue(req)
 	SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
 	var ok bool
-	var abandoned bool
 	select {
 	case <-req.Context().Done():
-		abandoned = true
 		rl.remove(ent)
 		// we still need to wait for ent.ready, because
 		// sometimes runqueue() will have already decided to
@@ -244,21 +271,27 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 	case ok = <-ent.ready:
 	}
 
-	// report time spent in queue
-	var qlabel string
-	switch {
-	case abandoned:
-	case !ok && ent.priority == IneligibleForQueuePriority:
-		// don't pollute stats
-	case ent.priority < 0:
-		qlabel = "low"
-	case ent.priority > 0:
-		qlabel = "high"
-	default:
-		qlabel = "normal"
+	// Report time spent in queue in the appropriate bucket:
+	// mQueueDelay if the request actually got processed,
+	// mQueueTimeout if it was abandoned or cancelled before
+	// getting a processing slot.
+	var series *prometheus.SummaryVec
+	if ok {
+		series = rl.mQueueDelay
+	} else {
+		series = rl.mQueueTimeout
 	}
-	if qlabel != "" && rl.mQueueDelay != nil {
-		rl.mQueueDelay.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+	if series != nil {
+		var qlabel string
+		switch {
+		case ent.priority < 0:
+			qlabel = "low"
+		case ent.priority > 0:
+			qlabel = "high"
+		default:
+			qlabel = "normal"
+		}
+		series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
 	}
 
 	if !ok {

commit 557264f16adb0a02f733c296baf507e3e7d98478
Author: Tom Clegg <tom at curii.com>
Date:   Mon Jun 19 11:53:38 2023 -0400

    20602: Report queue time for pos/neg/zero-priority reqs.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index c66c27943..f4d4ec8ef 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -264,13 +264,17 @@ func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
 		return httpserver.IneligibleForQueuePriority
 	case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
 		// "Create log entry" is the most harmless kind of
-		// request to drop.
-		return 0
+		// request to drop. Negative priority is called "low"
+		// in aggregate metrics.
+		return -1
 	case req.Header.Get("Origin") != "":
-		// Handle interactive requests first.
-		return 2
-	default:
+		// Handle interactive requests first. Positive
+		// priority is called "high" in aggregate metrics.
 		return 1
+	default:
+		// Zero priority is called "normal" in aggregate
+		// metrics.
+		return 0
 	}
 }
 
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index f9f94ff98..724a26fa2 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -47,10 +47,11 @@ type RequestLimiter struct {
 	// registered with Registry, if it is not nil.
 	Registry *prometheus.Registry
 
-	setupOnce sync.Once
-	mtx       sync.Mutex
-	handling  int
-	queue     queue
+	setupOnce   sync.Once
+	mQueueDelay *prometheus.SummaryVec
+	mtx         sync.Mutex
+	handling    int
+	queue       queue
 }
 
 type qent struct {
@@ -147,13 +148,13 @@ func (rl *RequestLimiter) setup() {
 			},
 			func() float64 { return float64(rl.MaxQueue) },
 		))
-		rl.mQueueDelay = prometheus.NewSummary(prometheus.SummaryOpts{
+		rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
 			Namespace:  "arvados",
 			Name:       "queue_delay_seconds",
-			Help:       "Number of seconds spent in the incoming request queue",
+			Help:       "Time spent in the incoming request queue",
 			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
-		})
-		reg.MustRegister(rl.mQueueDelay)
+		}, []string{"priority"})
+		rl.Registry.MustRegister(rl.mQueueDelay)
 	}
 }
 
@@ -230,8 +231,10 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 	ent := rl.enqueue(req)
 	SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
 	var ok bool
+	var abandoned bool
 	select {
 	case <-req.Context().Done():
+		abandoned = true
 		rl.remove(ent)
 		// we still need to wait for ent.ready, because
 		// sometimes runqueue() will have already decided to
@@ -240,6 +243,24 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 		ok = <-ent.ready
 	case ok = <-ent.ready:
 	}
+
+	// report time spent in queue
+	var qlabel string
+	switch {
+	case abandoned:
+	case !ok && ent.priority == IneligibleForQueuePriority:
+		// don't pollute stats
+	case ent.priority < 0:
+		qlabel = "low"
+	case ent.priority > 0:
+		qlabel = "high"
+	default:
+		qlabel = "normal"
+	}
+	if qlabel != "" && rl.mQueueDelay != nil {
+		rl.mQueueDelay.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+	}
+
 	if !ok {
 		resp.WriteHeader(http.StatusServiceUnavailable)
 		return

commit 032958b6a12030fff3151784f6970842281ca076
Author: Tom Clegg <tom at curii.com>
Date:   Fri Jun 16 13:58:16 2023 -0400

    20602: Add queue metrics.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index cd928e6ea..f9f94ff98 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -147,6 +147,13 @@ func (rl *RequestLimiter) setup() {
 			},
 			func() float64 { return float64(rl.MaxQueue) },
 		))
+		rl.mQueueDelay = prometheus.NewSummary(prometheus.SummaryOpts{
+			Namespace:  "arvados",
+			Name:       "queue_delay_seconds",
+			Help:       "Number of seconds spent in the incoming request queue",
+			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+		})
+		reg.MustRegister(rl.mQueueDelay)
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list