[arvados] created: 2.6.0-285-g1fd838ddc
git repository hosting
git at public.arvados.org
Mon Jun 19 20:41:49 UTC 2023
at 1fd838ddcc3274448908227c285d1b7f233207d9 (commit)
commit 1fd838ddcc3274448908227c285d1b7f233207d9
Author: Tom Clegg <tom at curii.com>
Date: Mon Jun 19 16:41:18 2023 -0400
20602: Allow lock requests to queue up to 2s (configurable).
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 33c1e497d..0fb4a2bab 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -236,6 +236,10 @@ Clusters:
# additional requests while at the MaxConcurrentRequests limit.
MaxQueuedRequests: 64
+ # Maximum time a "lock container" request is allowed to wait in
+ # the incoming request queue before returning 503.
+ MaxQueueTimeForLockRequests: 2s
+
# Fraction of MaxConcurrentRequests that can be "log create"
# messages at any given time. This is to prevent logging
# updates from crowding out more important requests.
diff --git a/lib/config/export.go b/lib/config/export.go
index 565be2fb7..f46f5b6f8 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -73,6 +73,7 @@ var whitelist = map[string]bool{
"API.MaxItemsPerResponse": true,
"API.MaxKeepBlobBuffers": false,
"API.MaxQueuedRequests": false,
+ "API.MaxQueueTimeForLockRequests": false,
"API.MaxRequestAmplification": false,
"API.MaxRequestSize": true,
"API.MaxTokenLifetime": false,
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index f4d4ec8ef..854b94861 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -155,11 +155,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
httpserver.LogRequests(
interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
&httpserver.RequestLimiter{
- Handler: handler,
- MaxConcurrent: cluster.API.MaxConcurrentRequests,
- MaxQueue: cluster.API.MaxQueuedRequests,
- Priority: c.requestPriority,
- Registry: reg}))))))
+ Handler: handler,
+ MaxConcurrent: cluster.API.MaxConcurrentRequests,
+ MaxQueue: cluster.API.MaxQueuedRequests,
+ MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+ Priority: c.requestPriority,
+ Registry: reg}))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -261,7 +262,7 @@ func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
// Return 503 immediately instead of queueing. We want
// to send feedback to dispatchcloud ASAP to stop
// bringing up new containers.
- return httpserver.IneligibleForQueuePriority
+ return httpserver.MinPriority
case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
// "Create log entry" is the most harmless kind of
// request to drop. Negative priority is called "low"
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 2e9abf2ec..0fafa41f9 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -101,6 +101,7 @@ type Cluster struct {
MaxItemsPerResponse int
MaxConcurrentRequests int
MaxQueuedRequests int
+ MaxQueueTimeForLockRequests Duration
LogCreateRequestFraction float64
MaxKeepBlobBuffers int
MaxRequestAmplification int
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 33a2eb614..9d501ab0e 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -15,7 +15,7 @@ import (
"github.com/sirupsen/logrus"
)
-const IneligibleForQueuePriority = math.MinInt64
+const MinPriority = math.MinInt64
// Prometheus typically polls every 10 seconds, but it doesn't cost us
// much to also accommodate higher frequency collection by updating
@@ -48,6 +48,11 @@ type RequestLimiter struct {
// handled FIFO.
Priority func(req *http.Request, queued time.Time) int64
+ // Return 503 for any request for which Priority() returns
+ // MinPriority if it spends longer than this in the queue
+ // before starting processing.
+ MaxQueueTimeForMinPriority time.Duration
+
// "concurrent_requests", "max_concurrent_requests",
// "queued_requests", and "max_queued_requests" metrics are
// registered with Registry, if it is not nil.
@@ -233,14 +238,6 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
ent.ready <- true
return ent
}
- if priority == IneligibleForQueuePriority {
- // Priority func is telling us to return 503
- // immediately instead of queueing, regardless of
- // queue size, if we can't handle the request
- // immediately.
- ent.ready <- false
- return ent
- }
rl.queue.add(ent)
rl.trimqueue()
return ent
@@ -259,6 +256,16 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
rl.setupOnce.Do(rl.setup)
ent := rl.enqueue(req)
SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+ if ent.priority == MinPriority {
+ // Note that MaxQueueTime==0 does not cancel a req
+ // that skips the queue, because in that case
+ // rl.enqueue() has already fired ready<-true and
+ // rl.remove() is a no-op.
+ go func() {
+ time.Sleep(rl.MaxQueueTimeForMinPriority)
+ rl.remove(ent)
+ }()
+ }
var ok bool
select {
case <-req.Context().Done():
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index b04ff57cc..55f13b462 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -134,19 +134,40 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
<-h.inHandler
}
- c.Logf("starting %d priority=IneligibleForQueuePriority requests (should respond 503 immediately)", rl.MaxQueue)
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue)
var wgX sync.WaitGroup
for i := 0; i < rl.MaxQueue; i++ {
wgX.Add(1)
go func() {
defer wgX.Done()
resp := httptest.NewRecorder()
- rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", IneligibleForQueuePriority)}}})
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
}()
}
wgX.Wait()
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue)
+ // Usage docs say the caller isn't allowed to change fields
+ // after first use, but we secretly know it's OK to change
+ // this field on the fly as long as no requests are arriving
+ // concurrently.
+ rl.MaxQueueTimeForMinPriority = time.Millisecond * 100
+ for i := 0; i < rl.MaxQueue; i++ {
+ wgX.Add(1)
+ go func() {
+ defer wgX.Done()
+ resp := httptest.NewRecorder()
+ t0 := time.Now()
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
+ c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+ elapsed := time.Since(t0)
+ c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true)
+ c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true)
+ }()
+ }
+ wgX.Wait()
+
c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
var wg1, wg2 sync.WaitGroup
wg1.Add(rl.MaxQueue)
commit 2f59cb8dd12e80a6a82f834a6599fd9d8bb0f586
Author: Tom Clegg <tom at curii.com>
Date: Mon Jun 19 14:37:50 2023 -0400
20602: Report queue usage bucketed by priority.
Also report queue time stats for requests that are abandoned before
getting a processing slot.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 724a26fa2..33a2eb614 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -17,6 +17,12 @@ import (
const IneligibleForQueuePriority = math.MinInt64
+// Prometheus typically polls every 10 seconds, but it doesn't cost us
+// much to also accommodate higher frequency collection by updating
+// internal stats more frequently. (This limits time resolution only
+// for the metrics that aren't generated on the fly.)
+const metricsUpdateInterval = time.Second
+
// RequestLimiter wraps http.Handler, limiting the number of
// concurrent requests being handled by the wrapped Handler. Requests
// that arrive when the handler is already at the specified
@@ -47,11 +53,13 @@ type RequestLimiter struct {
// registered with Registry, if it is not nil.
Registry *prometheus.Registry
- setupOnce sync.Once
- mQueueDelay *prometheus.SummaryVec
- mtx sync.Mutex
- handling int
- queue queue
+ setupOnce sync.Once
+ mQueueDelay *prometheus.SummaryVec
+ mQueueTimeout *prometheus.SummaryVec
+ mQueueUsage *prometheus.GaugeVec
+ mtx sync.Mutex
+ handling int
+ queue queue
}
type qent struct {
@@ -128,18 +136,12 @@ func (rl *RequestLimiter) setup() {
},
func() float64 { return float64(rl.MaxConcurrent) },
))
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "queued_requests",
- Help: "Number of requests in queue",
- },
- func() float64 {
- rl.mtx.Lock()
- defer rl.mtx.Unlock()
- return float64(len(rl.queue))
- },
- ))
+ rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "queued_requests",
+ Help: "Number of requests in queue",
+ }, []string{"priority"})
+ rl.Registry.MustRegister(rl.mQueueUsage)
rl.Registry.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
@@ -151,10 +153,37 @@ func (rl *RequestLimiter) setup() {
rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "arvados",
Name: "queue_delay_seconds",
- Help: "Time spent in the incoming request queue",
+ Help: "Time spent in the incoming request queue before start of processing",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
}, []string{"priority"})
rl.Registry.MustRegister(rl.mQueueDelay)
+ rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Name: "queue_timeout_seconds",
+ Help: "Time spent in the incoming request queue before client timed out or disconnected",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ }, []string{"priority"})
+ rl.Registry.MustRegister(rl.mQueueTimeout)
+ go func() {
+ for range time.NewTicker(metricsUpdateInterval).C {
+ var low, normal, high int
+ rl.mtx.Lock()
+ for _, ent := range rl.queue {
+ switch {
+ case ent.priority < 0:
+ low++
+ case ent.priority > 0:
+ high++
+ default:
+ normal++
+ }
+ }
+ rl.mtx.Unlock()
+ rl.mQueueUsage.WithLabelValues("low").Set(float64(low))
+ rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal))
+ rl.mQueueUsage.WithLabelValues("high").Set(float64(high))
+ }
+ }()
}
}
@@ -231,10 +260,8 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
ent := rl.enqueue(req)
SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
var ok bool
- var abandoned bool
select {
case <-req.Context().Done():
- abandoned = true
rl.remove(ent)
// we still need to wait for ent.ready, because
// sometimes runqueue() will have already decided to
@@ -244,21 +271,27 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
case ok = <-ent.ready:
}
- // report time spent in queue
- var qlabel string
- switch {
- case abandoned:
- case !ok && ent.priority == IneligibleForQueuePriority:
- // don't pollute stats
- case ent.priority < 0:
- qlabel = "low"
- case ent.priority > 0:
- qlabel = "high"
- default:
- qlabel = "normal"
+ // Report time spent in queue in the appropriate bucket:
+ // mQueueDelay if the request actually got processed,
+ // mQueueTimeout if it was abandoned or cancelled before
+ // getting a processing slot.
+ var series *prometheus.SummaryVec
+ if ok {
+ series = rl.mQueueDelay
+ } else {
+ series = rl.mQueueTimeout
}
- if qlabel != "" && rl.mQueueDelay != nil {
- rl.mQueueDelay.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+ if series != nil {
+ var qlabel string
+ switch {
+ case ent.priority < 0:
+ qlabel = "low"
+ case ent.priority > 0:
+ qlabel = "high"
+ default:
+ qlabel = "normal"
+ }
+ series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
}
if !ok {
commit 557264f16adb0a02f733c296baf507e3e7d98478
Author: Tom Clegg <tom at curii.com>
Date: Mon Jun 19 11:53:38 2023 -0400
20602: Report queue time for pos/neg/zero-priority reqs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index c66c27943..f4d4ec8ef 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -264,13 +264,17 @@ func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
return httpserver.IneligibleForQueuePriority
case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
// "Create log entry" is the most harmless kind of
- // request to drop.
- return 0
+ // request to drop. Negative priority is called "low"
+ // in aggregate metrics.
+ return -1
case req.Header.Get("Origin") != "":
- // Handle interactive requests first.
- return 2
- default:
+ // Handle interactive requests first. Positive
+ // priority is called "high" in aggregate metrics.
return 1
+ default:
+ // Zero priority is called "normal" in aggregate
+ // metrics.
+ return 0
}
}
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index f9f94ff98..724a26fa2 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -47,10 +47,11 @@ type RequestLimiter struct {
// registered with Registry, if it is not nil.
Registry *prometheus.Registry
- setupOnce sync.Once
- mtx sync.Mutex
- handling int
- queue queue
+ setupOnce sync.Once
+ mQueueDelay *prometheus.SummaryVec
+ mtx sync.Mutex
+ handling int
+ queue queue
}
type qent struct {
@@ -147,13 +148,13 @@ func (rl *RequestLimiter) setup() {
},
func() float64 { return float64(rl.MaxQueue) },
))
- rl.mQueueDelay = prometheus.NewSummary(prometheus.SummaryOpts{
+ rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "arvados",
Name: "queue_delay_seconds",
- Help: "Number of seconds spent in the incoming request queue",
+ Help: "Time spent in the incoming request queue",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
- })
- reg.MustRegister(rl.mQueueDelay)
+ }, []string{"priority"})
+ rl.Registry.MustRegister(rl.mQueueDelay)
}
}
@@ -230,8 +231,10 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
ent := rl.enqueue(req)
SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
var ok bool
+ var abandoned bool
select {
case <-req.Context().Done():
+ abandoned = true
rl.remove(ent)
// we still need to wait for ent.ready, because
// sometimes runqueue() will have already decided to
@@ -240,6 +243,24 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
ok = <-ent.ready
case ok = <-ent.ready:
}
+
+ // report time spent in queue
+ var qlabel string
+ switch {
+ case abandoned:
+ case !ok && ent.priority == IneligibleForQueuePriority:
+ // don't pollute stats
+ case ent.priority < 0:
+ qlabel = "low"
+ case ent.priority > 0:
+ qlabel = "high"
+ default:
+ qlabel = "normal"
+ }
+ if qlabel != "" && rl.mQueueDelay != nil {
+ rl.mQueueDelay.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+ }
+
if !ok {
resp.WriteHeader(http.StatusServiceUnavailable)
return
commit 032958b6a12030fff3151784f6970842281ca076
Author: Tom Clegg <tom at curii.com>
Date: Fri Jun 16 13:58:16 2023 -0400
20602: Add queue metrics.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index cd928e6ea..f9f94ff98 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -147,6 +147,13 @@ func (rl *RequestLimiter) setup() {
},
func() float64 { return float64(rl.MaxQueue) },
))
+ rl.mQueueDelay = prometheus.NewSummary(prometheus.SummaryOpts{
+ Namespace: "arvados",
+ Name: "queue_delay_seconds",
+ Help: "Number of seconds spent in the incoming request queue",
+ Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
+ })
+ reg.MustRegister(rl.mQueueDelay)
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list