[arvados] created: 2.7.0-5655-g9e6bcccc19
git repository hosting
git at public.arvados.org
Wed Dec 27 22:51:27 UTC 2023
at 9e6bcccc19bd00d76bf536769037a9a978207180 (commit)
commit 9e6bcccc19bd00d76bf536769037a9a978207180
Author: Tom Clegg <tom at curii.com>
Date: Wed Dec 27 17:50:45 2023 -0500
21285: Use separate request limit/queue for gateway tunnel requests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 05bc1309cd..3924090ca9 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -231,6 +231,10 @@ Clusters:
# also effectively limited by MaxConcurrentRailsRequests (see
# below) because most controller requests proxy through to the
# RailsAPI service.
+ #
+ # HTTP proxies and load balancers downstream of arvados services
+ # should be configured to allow at least {MaxConcurrentRequest +
+ # MaxQueuedRequests + MaxGatewayTunnels} concurrent requests.
MaxConcurrentRequests: 64
# Maximum number of concurrent requests to process concurrently
@@ -250,6 +254,12 @@ Clusters:
# the incoming request queue before returning 503.
MaxQueueTimeForLockRequests: 2s
+ # Maximum number of active gateway tunnel connections. A slot is
+ # consumed by each running container, and by each incoming
+ # "container shell" connection. These do not count toward
+ # MaxConcurrentRequests.
+ MaxGatewayTunnels: 1000
+
# Fraction of MaxConcurrentRequests that can be "log create"
# messages at any given time. This is to prevent logging
# updates from crowding out more important requests.
diff --git a/lib/config/export.go b/lib/config/export.go
index e51e6fc32c..4b6c142ff2 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -70,6 +70,7 @@ var whitelist = map[string]bool{
"API.LogCreateRequestFraction": false,
"API.MaxConcurrentRailsRequests": false,
"API.MaxConcurrentRequests": false,
+ "API.MaxGatewayTunnels": false,
"API.MaxIndexDatabaseRead": false,
"API.MaxItemsPerResponse": true,
"API.MaxKeepBlobBuffers": false,
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 725f86f3bd..e40b47acbb 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -148,32 +148,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
return 1
}
- maxReqs := cluster.API.MaxConcurrentRequests
- if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
- (maxRails < maxReqs || maxReqs == 0) &&
- strings.HasSuffix(prog, "controller") {
- // Ideally, we would accept up to
- // MaxConcurrentRequests, and apply the
- // MaxConcurrentRailsRequests limit only for requests
- // that require calling upstream to RailsAPI. But for
- // now we make the simplifying assumption that every
- // controller request causes an upstream RailsAPI
- // request.
- maxReqs = maxRails
- }
instrumented := httpserver.Instrument(reg, log,
httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
httpserver.AddRequestIDs(
httpserver.Inspect(reg, cluster.ManagementToken,
httpserver.LogRequests(
interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
- &httpserver.RequestLimiter{
- Handler: handler,
- MaxConcurrent: maxReqs,
- MaxQueue: cluster.API.MaxQueuedRequests,
- MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
- Priority: c.requestPriority,
- Registry: reg}))))))
+ c.requestLimiter(handler, cluster, reg)))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
@@ -212,7 +193,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
<-handler.Done()
srv.Close()
}()
- go c.requestQueueDumpCheck(cluster, maxReqs, prog, reg, &srv.Server, logger)
+ go c.requestQueueDumpCheck(cluster, prog, reg, &srv.Server, logger)
err = srv.Wait()
if err != nil {
return 1
@@ -221,12 +202,13 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
}
// If SystemLogs.RequestQueueDumpDirectory is set, monitor the
-// server's incoming HTTP request queue size. When it exceeds 90% of
-// API.MaxConcurrentRequests, write the /_inspect/requests data to a
-// JSON file in the specified directory.
-func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
+// server's incoming HTTP request limiters. When the number of
+// concurrent requests in any queue ("api" or "tunnel") exceeds 90% of
+// its maximum slots, write the /_inspect/requests data to a JSON file
+// in the specified directory.
+func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
outdir := cluster.SystemLogs.RequestQueueDumpDirectory
- if outdir == "" || cluster.ManagementToken == "" || maxReqs < 1 {
+ if outdir == "" || cluster.ManagementToken == "" {
return
}
logger = logger.WithField("worker", "RequestQueueDump")
@@ -237,16 +219,29 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p
logger.WithError(err).Warn("error getting metrics")
continue
}
- dump := false
+ cur := map[string]int{} // queue label => current
+ max := map[string]int{} // queue label => max
for _, mf := range mfs {
- if mf.Name != nil && *mf.Name == "arvados_concurrent_requests" && len(mf.Metric) == 1 {
- n := int(mf.Metric[0].GetGauge().GetValue())
- if n > 0 && n >= maxReqs*9/10 {
- dump = true
- break
+ for _, m := range mf.GetMetric() {
+ for _, ml := range m.GetLabel() {
+ if ml.GetName() == "queue" {
+ n := int(m.GetGauge().GetValue())
+ if name := mf.GetName(); name == "arvados_concurrent_requests" {
+ cur[*ml.Value] = n
+ } else if name == "arvados_max_concurrent_requests" {
+ max[*ml.Value] = n
+ }
+ }
}
}
}
+ dump := false
+ for queue, n := range cur {
+ if n > 0 && max[queue] > 0 && n >= max[queue]*9/10 {
+ dump = true
+ break
+ }
+ }
if dump {
req, err := http.NewRequest("GET", "/_inspect/requests", nil)
if err != nil {
@@ -269,6 +264,48 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, maxReqs int, p
}
}
+// Set up a httpserver.RequestLimiter with separate queues/streams for
+// API requests (obeying MaxConcurrentRequests etc) and gateway tunnel
+// requests (obeying MaxGatewayTunnels).
+func (c *command) requestLimiter(handler http.Handler, cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
+ maxReqs := cluster.API.MaxConcurrentRequests
+ if maxRails := cluster.API.MaxConcurrentRailsRequests; maxRails > 0 &&
+ (maxRails < maxReqs || maxReqs == 0) &&
+ c.svcName == arvados.ServiceNameController {
+ // Ideally, we would accept up to
+ // MaxConcurrentRequests, and apply the
+ // MaxConcurrentRailsRequests limit only for requests
+ // that require calling upstream to RailsAPI. But for
+ // now we make the simplifying assumption that every
+ // controller request causes an upstream RailsAPI
+ // request.
+ maxReqs = maxRails
+ }
+ rqAPI := &httpserver.RequestQueue{
+ Label: "api",
+ MaxConcurrent: maxReqs,
+ MaxQueue: cluster.API.MaxQueuedRequests,
+ MaxQueueTimeForMinPriority: cluster.API.MaxQueueTimeForLockRequests.Duration(),
+ }
+ rqTunnel := &httpserver.RequestQueue{
+ Label: "tunnel",
+ MaxConcurrent: cluster.API.MaxGatewayTunnels,
+ MaxQueue: 0,
+ }
+ return &httpserver.RequestLimiter{
+ Handler: handler,
+ Priority: c.requestPriority,
+ Registry: reg,
+ Queue: func(req *http.Request) *httpserver.RequestQueue {
+ if strings.HasPrefix(req.URL.Path, "/arvados/v1/connect/") {
+ return rqTunnel
+ } else {
+ return rqAPI
+ }
+ },
+ }
+}
+
func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
switch {
case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go
index 08b3a239dc..0266752f38 100644
--- a/lib/service/cmd_test.go
+++ b/lib/service/cmd_test.go
@@ -17,6 +17,8 @@ import (
"net/url"
"os"
"strings"
+ "sync"
+ "sync/atomic"
"testing"
"time"
@@ -198,15 +200,15 @@ func (*Suite) TestCommand(c *check.C) {
c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
}
-func (s *Suite) TestDumpRequestsKeepweb(c *check.C) {
- s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
+func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
+ s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
}
-func (s *Suite) TestDumpRequestsController(c *check.C) {
- s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
+func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) {
+ s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
}
-func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
+func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
requestQueueDumpCheckInterval = time.Second / 10
@@ -218,6 +220,7 @@ func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxR
defer cf.Close()
max := 24
+ maxTunnels := 30
fmt.Fprintf(cf, `
Clusters:
zzzzz:
@@ -225,7 +228,8 @@ Clusters:
ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
API:
`+maxReqsConfigKey+`: %d
- MaxQueuedRequests: 0
+ MaxQueuedRequests: 1
+ MaxGatewayTunnels: %d
SystemLogs: {RequestQueueDumpDirectory: %q}
Services:
Controller:
@@ -234,14 +238,18 @@ Clusters:
WebDAV:
ExternalURL: "http://localhost:`+port+`"
InternalURLs: {"http://localhost:`+port+`": {}}
-`, max, tmpdir)
+`, max, maxTunnels, tmpdir)
cf.Close()
started := make(chan bool, max+1)
hold := make(chan bool)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- started <- true
- <-hold
+ if strings.HasPrefix(r.URL.Path, "/arvados/v1/connect") {
+ <-hold
+ } else {
+ started <- true
+ <-hold
+ }
})
healthCheck := make(chan bool, 1)
ctx, cancel := context.WithCancel(context.Background())
@@ -267,15 +275,50 @@ Clusters:
}
client := http.Client{}
deadline := time.Now().Add(time.Second * 2)
+ var activeReqs sync.WaitGroup
+
+ // Start some API reqs
+ var apiResp200, apiResp503 int64
for i := 0; i < max+1; i++ {
+ activeReqs.Add(1)
go func() {
+ defer activeReqs.Done()
resp, err := client.Get("http://localhost:" + port + "/testpath")
for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
time.Sleep(time.Second / 100)
resp, err = client.Get("http://localhost:" + port + "/testpath")
}
if c.Check(err, check.IsNil) {
- c.Logf("resp StatusCode %d", resp.StatusCode)
+ if resp.StatusCode == http.StatusOK {
+ atomic.AddInt64(&apiResp200, 1)
+ } else if resp.StatusCode == http.StatusServiceUnavailable {
+ atomic.AddInt64(&apiResp503, 1)
+ }
+ }
+ }()
+ }
+
+ // Start some gateway tunnel reqs that don't count toward our
+ // API req limit
+ extraTunnelReqs := 20
+ var tunnelResp200, tunnelResp503 int64
+ for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
+ activeReqs.Add(1)
+ go func() {
+ defer activeReqs.Done()
+ resp, err := client.Get("http://localhost:" + port + "/arvados/v1/connect/...")
+ for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
+ time.Sleep(time.Second / 100)
+ resp, err = client.Get("http://localhost:" + port + "/arvados/v1/connect/...")
+ }
+ if c.Check(err, check.IsNil) {
+ if resp.StatusCode == http.StatusOK {
+ atomic.AddInt64(&tunnelResp200, 1)
+ } else if resp.StatusCode == http.StatusServiceUnavailable {
+ atomic.AddInt64(&tunnelResp503, 1)
+ } else {
+ c.Errorf("tunnel response code %d", resp.StatusCode)
+ }
}
}()
}
@@ -300,6 +343,20 @@ Clusters:
var loaded []struct{ URL string }
err = json.Unmarshal(j, &loaded)
c.Check(err, check.IsNil)
+
+ for i := 0; i < len(loaded); i++ {
+ if strings.HasPrefix(loaded[i].URL, "/arvados/v1/connect/") {
+ // Filter out a gateway tunnel req
+ // that doesn't count toward our API
+ // req limit
+ if i < len(loaded)-1 {
+ copy(loaded[i:], loaded[i+1:])
+ i--
+ }
+ loaded = loaded[:len(loaded)-1]
+ }
+ }
+
if len(loaded) < max {
// Dumped when #requests was >90% but <100% of
// limit. If we stop now, we won't be able to
@@ -309,7 +366,7 @@ Clusters:
c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
continue
}
- c.Check(loaded, check.HasLen, max)
+ c.Check(loaded, check.HasLen, max+1)
c.Check(loaded[0].URL, check.Equals, "/testpath")
break
}
@@ -328,7 +385,8 @@ Clusters:
c.Check(err, check.IsNil)
switch path {
case "/metrics":
- c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
+ c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`)
+ c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`)
case "/_inspect/requests":
c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
default:
@@ -336,6 +394,11 @@ Clusters:
}
}
close(hold)
+ activeReqs.Wait()
+ c.Check(int(apiResp200), check.Equals, max+1)
+ c.Check(int(apiResp503), check.Equals, 0)
+ c.Check(int(tunnelResp200), check.Equals, maxTunnels)
+ c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs)
cancel()
}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 6301ed047a..16d789daf5 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -102,6 +102,7 @@ type Cluster struct {
MaxConcurrentRailsRequests int
MaxConcurrentRequests int
MaxQueuedRequests int
+ MaxGatewayTunnels int
MaxQueueTimeForLockRequests Duration
LogCreateRequestFraction float64
MaxKeepBlobBuffers int
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 9d501ab0eb..1e3316ed48 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -34,13 +34,8 @@ const metricsUpdateInterval = time.Second
type RequestLimiter struct {
Handler http.Handler
- // Maximum number of requests being handled at once. Beyond
- // this limit, requests will be queued.
- MaxConcurrent int
-
- // Maximum number of requests in the queue. Beyond this limit,
- // the lowest priority requests will return 503.
- MaxQueue int
+ // Queue determines which queue a request is assigned to.
+ Queue func(req *http.Request) *RequestQueue
// Priority determines queue ordering. Requests with higher
// priority are handled first. Requests with equal priority
@@ -48,11 +43,6 @@ type RequestLimiter struct {
// handled FIFO.
Priority func(req *http.Request, queued time.Time) int64
- // Return 503 for any request for which Priority() returns
- // MinPriority if it spends longer than this in the queue
- // before starting processing.
- MaxQueueTimeForMinPriority time.Duration
-
// "concurrent_requests", "max_concurrent_requests",
// "queued_requests", and "max_queued_requests" metrics are
// registered with Registry, if it is not nil.
@@ -63,11 +53,32 @@ type RequestLimiter struct {
mQueueTimeout *prometheus.SummaryVec
mQueueUsage *prometheus.GaugeVec
mtx sync.Mutex
- handling int
- queue queue
+ rqs map[*RequestQueue]bool // all RequestQueues in use
+}
+
+type RequestQueue struct {
+ // Label for metrics. No two queues should have the same label.
+ Label string
+
+ // Maximum number of requests being handled at once. Beyond
+ // this limit, requests will be queued.
+ MaxConcurrent int
+
+ // Maximum number of requests in the queue. Beyond this limit,
+ // the lowest priority requests will return 503.
+ MaxQueue int
+
+ // Return 503 for any request for which Priority() returns
+ // MinPriority if it spends longer than this in the queue
+ // before starting processing.
+ MaxQueueTimeForMinPriority time.Duration
+
+ queue queue
+ handling int
}
type qent struct {
+ rq *RequestQueue
queued time.Time
priority int64
heappos int
@@ -121,101 +132,96 @@ func (h *queue) remove(i int) {
func (rl *RequestLimiter) setup() {
if rl.Registry != nil {
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "concurrent_requests",
- Help: "Number of requests in progress",
- },
- func() float64 {
- rl.mtx.Lock()
- defer rl.mtx.Unlock()
- return float64(rl.handling)
- },
- ))
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "max_concurrent_requests",
- Help: "Maximum number of concurrent requests",
- },
- func() float64 { return float64(rl.MaxConcurrent) },
- ))
+ mCurrentReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "concurrent_requests",
+ Help: "Number of requests in progress",
+ }, []string{"queue"})
+ rl.Registry.MustRegister(mCurrentReqs)
+ mMaxReqs := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_concurrent_requests",
+ Help: "Maximum number of concurrent requests",
+ }, []string{"queue"})
+ rl.Registry.MustRegister(mMaxReqs)
+ mMaxQueue := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_queued_requests",
+ Help: "Maximum number of queued requests",
+ }, []string{"queue"})
+ rl.Registry.MustRegister(mMaxQueue)
rl.mQueueUsage = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Name: "queued_requests",
Help: "Number of requests in queue",
- }, []string{"priority"})
+ }, []string{"queue", "priority"})
rl.Registry.MustRegister(rl.mQueueUsage)
- rl.Registry.MustRegister(prometheus.NewGaugeFunc(
- prometheus.GaugeOpts{
- Namespace: "arvados",
- Name: "max_queued_requests",
- Help: "Maximum number of queued requests",
- },
- func() float64 { return float64(rl.MaxQueue) },
- ))
rl.mQueueDelay = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "arvados",
Name: "queue_delay_seconds",
Help: "Time spent in the incoming request queue before start of processing",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
- }, []string{"priority"})
+ }, []string{"queue", "priority"})
rl.Registry.MustRegister(rl.mQueueDelay)
rl.mQueueTimeout = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "arvados",
Name: "queue_timeout_seconds",
Help: "Time spent in the incoming request queue before client timed out or disconnected",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
- }, []string{"priority"})
+ }, []string{"queue", "priority"})
rl.Registry.MustRegister(rl.mQueueTimeout)
go func() {
for range time.NewTicker(metricsUpdateInterval).C {
- var low, normal, high int
rl.mtx.Lock()
- for _, ent := range rl.queue {
- switch {
- case ent.priority < 0:
- low++
- case ent.priority > 0:
- high++
- default:
- normal++
+ for rq := range rl.rqs {
+ var low, normal, high int
+ for _, ent := range rq.queue {
+ switch {
+ case ent.priority < 0:
+ low++
+ case ent.priority > 0:
+ high++
+ default:
+ normal++
+ }
}
+ mCurrentReqs.WithLabelValues(rq.Label).Set(float64(rq.handling))
+ mMaxReqs.WithLabelValues(rq.Label).Set(float64(rq.MaxConcurrent))
+ mMaxQueue.WithLabelValues(rq.Label).Set(float64(rq.MaxQueue))
+ rl.mQueueUsage.WithLabelValues(rq.Label, "low").Set(float64(low))
+ rl.mQueueUsage.WithLabelValues(rq.Label, "normal").Set(float64(normal))
+ rl.mQueueUsage.WithLabelValues(rq.Label, "high").Set(float64(high))
}
rl.mtx.Unlock()
- rl.mQueueUsage.WithLabelValues("low").Set(float64(low))
- rl.mQueueUsage.WithLabelValues("normal").Set(float64(normal))
- rl.mQueueUsage.WithLabelValues("high").Set(float64(high))
}
}()
}
}
// caller must have lock
-func (rl *RequestLimiter) runqueue() {
+func (rq *RequestQueue) runqueue() {
// Handle entries from the queue as capacity permits
- for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
- rl.handling++
- ent := rl.queue.removeMax()
+ for len(rq.queue) > 0 && (rq.MaxConcurrent == 0 || rq.handling < rq.MaxConcurrent) {
+ rq.handling++
+ ent := rq.queue.removeMax()
ent.ready <- true
}
}
// If the queue is too full, fail and remove the lowest-priority
// entry. Caller must have lock. Queue must not be empty.
-func (rl *RequestLimiter) trimqueue() {
- if len(rl.queue) <= rl.MaxQueue {
+func (rq *RequestQueue) trimqueue() {
+ if len(rq.queue) <= rq.MaxQueue {
return
}
min := 0
- for i := range rl.queue {
- if i == 0 || rl.queue.Less(min, i) {
+ for i := range rq.queue {
+ if i == 0 || rq.queue.Less(min, i) {
min = i
}
}
- rl.queue[min].ready <- false
- rl.queue.remove(min)
+ rq.queue[min].ready <- false
+ rq.queue.remove(min)
}
func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
@@ -227,19 +233,24 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
priority = rl.Priority(req, qtime)
}
ent := &qent{
+ rq: rl.Queue(req),
queued: qtime,
priority: priority,
ready: make(chan bool, 1),
heappos: -1,
}
- if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
+ if rl.rqs == nil {
+ rl.rqs = map[*RequestQueue]bool{}
+ }
+ rl.rqs[ent.rq] = true
+ if ent.rq.MaxConcurrent == 0 || ent.rq.MaxConcurrent > ent.rq.handling {
// fast path, skip the queue
- rl.handling++
+ ent.rq.handling++
ent.ready <- true
return ent
}
- rl.queue.add(ent)
- rl.trimqueue()
+ ent.rq.queue.add(ent)
+ ent.rq.trimqueue()
return ent
}
@@ -247,7 +258,7 @@ func (rl *RequestLimiter) remove(ent *qent) {
rl.mtx.Lock()
defer rl.mtx.Unlock()
if ent.heappos >= 0 {
- rl.queue.remove(ent.heappos)
+ ent.rq.queue.remove(ent.heappos)
ent.ready <- false
}
}
@@ -255,14 +266,14 @@ func (rl *RequestLimiter) remove(ent *qent) {
func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rl.setupOnce.Do(rl.setup)
ent := rl.enqueue(req)
- SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
+ SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority, "queue": ent.rq.Label})
if ent.priority == MinPriority {
// Note that MaxQueueTime==0 does not cancel a req
// that skips the queue, because in that case
// rl.enqueue() has already fired ready<-true and
// rl.remove() is a no-op.
go func() {
- time.Sleep(rl.MaxQueueTimeForMinPriority)
+ time.Sleep(ent.rq.MaxQueueTimeForMinPriority)
rl.remove(ent)
}()
}
@@ -273,7 +284,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
// we still need to wait for ent.ready, because
// sometimes runqueue() will have already decided to
// send true before our rl.remove() call, and in that
- // case we'll need to decrement rl.handling below.
+ // case we'll need to decrement ent.rq.handling below.
ok = <-ent.ready
case ok = <-ent.ready:
}
@@ -298,7 +309,7 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
default:
qlabel = "normal"
}
- series.WithLabelValues(qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
+ series.WithLabelValues(ent.rq.Label, qlabel).Observe(time.Now().Sub(ent.queued).Seconds())
}
if !ok {
@@ -308,9 +319,9 @@ func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request)
defer func() {
rl.mtx.Lock()
defer rl.mtx.Unlock()
- rl.handling--
+ ent.rq.handling--
// unblock the next waiting request
- rl.runqueue()
+ ent.rq.runqueue()
}()
rl.Handler.ServeHTTP(resp, req)
}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 55f13b4625..7366e1426b 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -34,7 +34,11 @@ func newTestHandler() *testHandler {
func (s *Suite) TestRequestLimiter1(c *check.C) {
h := newTestHandler()
- l := RequestLimiter{MaxConcurrent: 1, Handler: h}
+ rq := &RequestQueue{
+ MaxConcurrent: 1}
+ l := RequestLimiter{
+ Queue: func(*http.Request) *RequestQueue { return rq },
+ Handler: h}
var wg sync.WaitGroup
resps := make([]*httptest.ResponseRecorder, 10)
for i := 0; i < 10; i++ {
@@ -94,7 +98,11 @@ func (s *Suite) TestRequestLimiter1(c *check.C) {
func (*Suite) TestRequestLimiter10(c *check.C) {
h := newTestHandler()
- l := RequestLimiter{MaxConcurrent: 10, Handler: h}
+ rq := &RequestQueue{
+ MaxConcurrent: 10}
+ l := RequestLimiter{
+ Queue: func(*http.Request) *RequestQueue { return rq },
+ Handler: h}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
@@ -114,29 +122,32 @@ func (*Suite) TestRequestLimiter10(c *check.C) {
func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
h := newTestHandler()
- rl := RequestLimiter{
+ rq := &RequestQueue{
MaxConcurrent: 1000,
MaxQueue: 200,
- Handler: h,
+ }
+ rl := RequestLimiter{
+ Handler: h,
+ Queue: func(*http.Request) *RequestQueue { return rq },
Priority: func(r *http.Request, _ time.Time) int64 {
p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
return p
}}
c.Logf("starting initial requests")
- for i := 0; i < rl.MaxConcurrent; i++ {
+ for i := 0; i < rq.MaxConcurrent; i++ {
go func() {
rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
}()
}
c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
- for i := 0; i < rl.MaxConcurrent; i++ {
+ for i := 0; i < rq.MaxConcurrent; i++ {
<-h.inHandler
}
- c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rl.MaxQueue)
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 immediately)", rq.MaxQueue)
var wgX sync.WaitGroup
- for i := 0; i < rl.MaxQueue; i++ {
+ for i := 0; i < rq.MaxQueue; i++ {
wgX.Add(1)
go func() {
defer wgX.Done()
@@ -147,13 +158,13 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
}
wgX.Wait()
- c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rl.MaxQueue)
+ c.Logf("starting %d priority=MinPriority requests (should respond 503 after 100 ms)", rq.MaxQueue)
// Usage docs say the caller isn't allowed to change fields
// after first use, but we secretly know it's OK to change
// this field on the fly as long as no requests are arriving
// concurrently.
- rl.MaxQueueTimeForMinPriority = time.Millisecond * 100
- for i := 0; i < rl.MaxQueue; i++ {
+ rq.MaxQueueTimeForMinPriority = time.Millisecond * 100
+ for i := 0; i < rq.MaxQueue; i++ {
wgX.Add(1)
go func() {
defer wgX.Done()
@@ -162,17 +173,17 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", MinPriority)}}})
c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
elapsed := time.Since(t0)
- c.Check(elapsed > rl.MaxQueueTimeForMinPriority, check.Equals, true)
- c.Check(elapsed < rl.MaxQueueTimeForMinPriority*10, check.Equals, true)
+ c.Check(elapsed > rq.MaxQueueTimeForMinPriority, check.Equals, true)
+ c.Check(elapsed < rq.MaxQueueTimeForMinPriority*10, check.Equals, true)
}()
}
wgX.Wait()
- c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
+ c.Logf("starting %d priority=1 and %d priority=1 requests", rq.MaxQueue, rq.MaxQueue)
var wg1, wg2 sync.WaitGroup
- wg1.Add(rl.MaxQueue)
- wg2.Add(rl.MaxQueue)
- for i := 0; i < rl.MaxQueue*2; i++ {
+ wg1.Add(rq.MaxQueue)
+ wg2.Add(rq.MaxQueue)
+ for i := 0; i < rq.MaxQueue*2; i++ {
i := i
go func() {
pri := (i & 1) + 1
@@ -192,12 +203,12 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
wg1.Wait()
c.Logf("allowing initial requests to proceed")
- for i := 0; i < rl.MaxConcurrent; i++ {
+ for i := 0; i < rq.MaxConcurrent; i++ {
h.okToProceed <- struct{}{}
}
c.Logf("allowing queued priority=2 requests to proceed")
- for i := 0; i < rl.MaxQueue; i++ {
+ for i := 0; i < rq.MaxQueue; i++ {
<-h.inHandler
h.okToProceed <- struct{}{}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list