[arvados] created: 2.6.0-265-g110ce6a7e
git repository hosting
git at public.arvados.org
Thu Jun 8 18:33:05 UTC 2023
at 110ce6a7e9c55fca0e2d43f8629be73fc0f8ba25 (commit)
commit 110ce6a7e9c55fca0e2d43f8629be73fc0f8ba25
Author: Tom Clegg <tom at curii.com>
Date: Thu Jun 8 14:30:23 2023 -0400
20602: Disable queue when testing DumpRequests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 026a01eca..37eac86d2 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -213,7 +213,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
// JSON file in the specified directory.
func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
outdir := cluster.SystemLogs.RequestQueueDumpDirectory
- if outdir == "" || cluster.ManagementToken == "" {
+ if outdir == "" || cluster.ManagementToken == "" || cluster.API.MaxConcurrentRequests < 1 {
return
}
logger = logger.WithField("worker", "RequestQueueDump")
diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go
index 5a7fbc052..97a6bd8a4 100644
--- a/lib/service/cmd_test.go
+++ b/lib/service/cmd_test.go
@@ -210,7 +210,9 @@ Clusters:
zzzzz:
SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
- API: {MaxConcurrentRequests: %d}
+ API:
+ MaxConcurrentRequests: %d
+ MaxQueuedRequests: 0
SystemLogs: {RequestQueueDumpDirectory: %q}
Services:
Controller:
@@ -268,15 +270,15 @@ Clusters:
panic("timed out")
}
}
- for {
- time.Sleep(time.Second / 100)
+ for delay := time.Second / 100; ; delay = delay * 2 {
+ time.Sleep(delay)
j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
if os.IsNotExist(err) && deadline.After(time.Now()) {
continue
}
- c.Check(err, check.IsNil)
- c.Logf("%s", stderr.String())
- c.Logf("%s", string(j))
+ c.Assert(err, check.IsNil)
+ c.Logf("stderr:\n%s", stderr.String())
+ c.Logf("json:\n%s", string(j))
var loaded []struct{ URL string }
err = json.Unmarshal(j, &loaded)
commit 19e9dda26118f915c8f2631a488eceed7943a9a4
Author: Tom Clegg <tom at curii.com>
Date: Thu Jun 8 10:31:11 2023 -0400
20602: Prioritize backend traffic from keep-web as interactive.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index c73191103..c44a2eb73 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -141,6 +141,11 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
return nil, nil, nil, err
}
sess.client.AuthToken = token
+ // A non-empty origin header tells controller to
+ // prioritize our traffic as interactive, which is
+ // true most of the time.
+ origin := c.cluster.Services.WebDAVDownload.ExternalURL
+ sess.client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
sess.arvadosclient, err = arvadosclient.New(sess.client)
if err != nil {
return nil, nil, nil, err
commit 96d8b9e1afecccae803ec4b956ada745dbe71d9f
Author: Tom Clegg <tom at curii.com>
Date: Thu Jun 8 10:30:35 2023 -0400
20602: Attach assigned priority to response log entry.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 988643c74..2bb0a5673 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -11,6 +11,7 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
// RequestLimiter wraps http.Handler, limiting the number of
@@ -258,6 +259,7 @@ func (rl *RequestLimiter) remove(ent *qent) {
func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rl.setupOnce.Do(rl.setup)
ent := rl.enqueue(req)
+ SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
var ok bool
select {
case <-req.Context().Done():
commit eaab5e4438da9239245a7c54a4b32bbe9e283780
Author: Tom Clegg <tom at curii.com>
Date: Wed Jun 7 17:13:30 2023 -0400
20602: Prioritize interactive (browser) requests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 1073f421d..026a01eca 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -10,6 +10,7 @@ import (
"flag"
"fmt"
"io"
+ "math"
"net"
"net/http"
"net/http/httptest"
@@ -158,6 +159,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
Handler: handler,
MaxConcurrent: cluster.API.MaxConcurrentRequests,
MaxQueue: cluster.API.MaxQueuedRequests,
+ Priority: c.requestPriority,
Registry: reg}))))))
srv := &httpserver.Server{
Server: http.Server{
@@ -254,6 +256,25 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, r
}
}
+func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
+ switch {
+ case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
+ // Return 503 immediately instead of queueing. We want
+ // to send feedback to dispatchcloud ASAP to stop
+ // bringing up new containers.
+ return math.MinInt64
+ case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
+ // "Create log entry" is the most harmless kind of
+ // request to drop.
+ return 0
+ case req.Header.Get("Origin") != "":
+ // Handle interactive requests first.
+ return 2
+ default:
+ return 1
+ }
+}
+
// If an incoming request's target vhost has an embedded collection
// UUID or PDH, handle it with hTrue, otherwise handle it with
// hFalse.
commit 1c92ba23521c23d00f97fee0afa2649afd73b504
Author: Tom Clegg <tom at curii.com>
Date: Wed Jun 7 17:12:17 2023 -0400
20602: Priority MinInt64 means never wait in queue.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 094f6a6ca..988643c74 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -5,6 +5,7 @@
package httpserver
import (
+ "math"
"net/http"
"sync"
"time"
@@ -231,6 +232,14 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
ent.ready <- true
return ent
}
+ if priority == math.MinInt64 {
+ // Priority func is telling us to return 503
+ // immediately instead of queueing, regardless of
+ // queue size, if we can't handle the request
+ // immediately.
+ ent.ready <- false
+ return ent
+ }
rl.queue.add(ent)
rl.trimqueue()
return ent
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 4023a76bc..bdc0401b3 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -6,6 +6,7 @@ package httpserver
import (
"fmt"
+ "math"
"net/http"
"net/http/httptest"
"strconv"
@@ -134,6 +135,19 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
<-h.inHandler
}
+ c.Logf("starting %d priority=MinInt64 requests (should respond 503 immediately)", rl.MaxQueue)
+ var wgX sync.WaitGroup
+ for i := 0; i < rl.MaxQueue; i++ {
+ wgX.Add(1)
+ go func() {
+ defer wgX.Done()
+ resp := httptest.NewRecorder()
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", math.MinInt64)}}})
+ c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+ }()
+ }
+ wgX.Wait()
+
c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
var wg1, wg2 sync.WaitGroup
wg1.Add(rl.MaxQueue)
commit 2ca9ee944a0bbeeb7226ea6fb6cd535b9ca8cbdb
Author: Tom Clegg <tom at curii.com>
Date: Wed Jun 7 12:37:40 2023 -0400
20602: Add MaxQueuedRequests config.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 837ce896e..33c1e497d 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -227,6 +227,15 @@ Clusters:
# in a single service process, or 0 for no limit.
MaxConcurrentRequests: 64
+ # Maximum number of incoming requests to hold in a priority
+ # queue waiting for one of the MaxConcurrentRequests slots to be
+ # free. When the queue is longer than this, respond 503 to the
+ # lowest priority request.
+ #
+ # If MaxQueuedRequests is 0, respond 503 immediately to
+ # additional requests while at the MaxConcurrentRequests limit.
+ MaxQueuedRequests: 64
+
# Fraction of MaxConcurrentRequests that can be "log create"
# messages at any given time. This is to prevent logging
# updates from crowding out more important requests.
diff --git a/lib/config/export.go b/lib/config/export.go
index 44fd55941..565be2fb7 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -65,13 +65,14 @@ var whitelist = map[string]bool{
"API.FreezeProjectRequiresDescription": true,
"API.FreezeProjectRequiresProperties": true,
"API.FreezeProjectRequiresProperties.*": true,
- "API.LockBeforeUpdate": false,
"API.KeepServiceRequestTimeout": false,
- "API.MaxConcurrentRequests": false,
+ "API.LockBeforeUpdate": false,
"API.LogCreateRequestFraction": false,
+ "API.MaxConcurrentRequests": false,
"API.MaxIndexDatabaseRead": false,
"API.MaxItemsPerResponse": true,
"API.MaxKeepBlobBuffers": false,
+ "API.MaxQueuedRequests": false,
"API.MaxRequestAmplification": false,
"API.MaxRequestSize": true,
"API.MaxTokenLifetime": false,
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 119d66191..1073f421d 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -157,6 +157,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
&httpserver.RequestLimiter{
Handler: handler,
MaxConcurrent: cluster.API.MaxConcurrentRequests,
+ MaxQueue: cluster.API.MaxQueuedRequests,
Registry: reg}))))))
srv := &httpserver.Server{
Server: http.Server{
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 4da851763..2e9abf2ec 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -100,6 +100,7 @@ type Cluster struct {
MaxIndexDatabaseRead int
MaxItemsPerResponse int
MaxConcurrentRequests int
+ MaxQueuedRequests int
LogCreateRequestFraction float64
MaxKeepBlobBuffers int
MaxRequestAmplification int
commit 53c44ca2f2e9e2df9d5eee96e7806fbb7576eadd
Author: Tom Clegg <tom at curii.com>
Date: Wed Jun 7 12:37:25 2023 -0400
20602: Add priority queue to RequestLimiter.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index a52d09f68..837ce896e 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -223,8 +223,8 @@ Clusters:
# parameter higher than this value, this value is used instead.
MaxItemsPerResponse: 1000
- # Maximum number of concurrent requests to accept in a single
- # service process, or 0 for no limit.
+ # Maximum number of concurrent requests to process concurrently
+ # in a single service process, or 0 for no limit.
MaxConcurrentRequests: 64
# Fraction of MaxConcurrentRequests that can be "log create"
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index cc6938cbc..119d66191 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -154,7 +154,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
httpserver.Inspect(reg, cluster.ManagementToken,
httpserver.LogRequests(
interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
- httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
+ &httpserver.RequestLimiter{
+ Handler: handler,
+ MaxConcurrent: cluster.API.MaxConcurrentRequests,
+ Registry: reg}))))))
srv := &httpserver.Server{
Server: http.Server{
Handler: ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 888945312..094f6a6ca 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -6,87 +6,270 @@ package httpserver
import (
"net/http"
- "sync/atomic"
+ "sync"
+ "time"
"github.com/prometheus/client_golang/prometheus"
)
-// RequestCounter is an http.Handler that tracks the number of
-// requests in progress.
-type RequestCounter interface {
- http.Handler
+// RequestLimiter wraps http.Handler, limiting the number of
+// concurrent requests being handled by the wrapped Handler. Requests
+// that arrive when the handler is already at the specified
+// concurrency limit are queued and handled in the order indicated by
+// the Priority function.
+//
+// Caller must not modify any RequestLimiter fields after calling its
+// methods.
+type RequestLimiter struct {
+ Handler http.Handler
+
+ // Maximum number of requests being handled at once. Beyond
+ // this limit, requests will be queued.
+ MaxConcurrent int
+
+ // Maximum number of requests in the queue. Beyond this limit,
+ // the lowest priority requests will return 503.
+ MaxQueue int
- // Current() returns the number of requests in progress.
- Current() int
+ // Priority determines queue ordering. Requests with higher
+ // priority are handled first. Requests with equal priority
+ // are handled FIFO. If Priority is nil, all requests are
+ // handled FIFO.
+ Priority func(req *http.Request, queued time.Time) int64
- // Max() returns the maximum number of concurrent requests
- // that will be accepted.
- Max() int
+ // "concurrent_requests", "max_concurrent_requests",
+ // "queued_requests", and "max_queued_requests" metrics are
+ // registered with Registry, if it is not nil.
+ Registry *prometheus.Registry
+
+ setupOnce sync.Once
+ mtx sync.Mutex
+ handling int
+ queue heap
}
-type limiterHandler struct {
- requests chan struct{}
- handler http.Handler
- count int64 // only used if cap(requests)==0
+type qent struct {
+ queued time.Time
+ priority int64
+ heappos int
+ ready chan bool // true = handle now; false = return 503 now
}
-// NewRequestLimiter returns a RequestCounter that delegates up to
-// maxRequests at a time to the given handler, and responds 503 to all
-// incoming requests beyond that limit.
-//
-// "concurrent_requests" and "max_concurrent_requests" metrics are
-// registered with the given reg, if reg is not nil.
-func NewRequestLimiter(maxRequests int, handler http.Handler, reg *prometheus.Registry) RequestCounter {
- h := &limiterHandler{
- requests: make(chan struct{}, maxRequests),
- handler: handler,
+type heap []*qent
+
+func (h heap) Swap(i, j int) {
+ h[i], h[j] = h[j], h[i]
+ h[i].heappos, h[j].heappos = i, j
+}
+
+func (h heap) Less(i, j int) bool {
+ pi, pj := h[i].priority, h[j].priority
+ return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
+}
+
+func (h heap) Len() int {
+ return len(h)
+}
+
+// Move element i to a correct position in the heap. When the heap is
+// empty, fix(0) is a no-op (does not crash).
+func (h heap) fix(i int) {
+ // If the initial position is a leaf (i.e., index is greater
+ // than the last node's parent index), we only need to move it
+ // up, not down.
+ uponly := i > (len(h)-2)/2
+ // Move the new entry toward the root until reaching a
+ // position where the parent already has higher priority.
+ for i > 0 {
+ parent := (i - 1) / 2
+ if h.Less(i, parent) {
+ h.Swap(i, parent)
+ i = parent
+ } else {
+ break
+ }
}
- if reg != nil {
- reg.MustRegister(prometheus.NewGaugeFunc(
+ // Move i away from the root until reaching a position where
+ // both children already have lower priority.
+ for !uponly {
+ child := i*2 + 1
+ if child+1 < len(h) && h.Less(child+1, child) {
+ // Right child has higher priority than left
+ // child. Choose right child.
+ child = child + 1
+ }
+ if child < len(h) && h.Less(child, i) {
+ // Chosen child has higher priority than i.
+ // Swap and continue down.
+ h.Swap(i, child)
+ i = child
+ } else {
+ break
+ }
+ }
+}
+
+func (h *heap) add(ent *qent) {
+ ent.heappos = len(*h)
+ *h = append(*h, ent)
+ h.fix(ent.heappos)
+}
+
+func (h *heap) removeMax() *qent {
+ ent := (*h)[0]
+ if len(*h) == 1 {
+ *h = (*h)[:0]
+ } else {
+ h.Swap(0, len(*h)-1)
+ *h = (*h)[:len(*h)-1]
+ h.fix(0)
+ }
+ ent.heappos = -1
+ return ent
+}
+
+func (h *heap) remove(i int) {
+ // Move the last leaf into i's place, then move it to a
+ // correct position.
+ h.Swap(i, len(*h)-1)
+ *h = (*h)[:len(*h)-1]
+ if i < len(*h) {
+ h.fix(i)
+ }
+}
+
+func (rl *RequestLimiter) setup() {
+ if rl.Registry != nil {
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Name: "concurrent_requests",
Help: "Number of requests in progress",
},
- func() float64 { return float64(h.Current()) },
+ func() float64 {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ return float64(rl.handling)
+ },
))
- reg.MustRegister(prometheus.NewGaugeFunc(
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "arvados",
Name: "max_concurrent_requests",
Help: "Maximum number of concurrent requests",
},
- func() float64 { return float64(h.Max()) },
+ func() float64 { return float64(rl.MaxConcurrent) },
+ ))
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "queued_requests",
+ Help: "Number of requests in queue",
+ },
+ func() float64 {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ return float64(len(rl.queue))
+ },
))
+ rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Name: "max_queued_requests",
+ Help: "Maximum number of queued requests",
+ },
+ func() float64 { return float64(rl.MaxQueue) },
+ ))
+ }
+}
+
+// caller must have lock
+func (rl *RequestLimiter) runqueue() {
+ // Handle entries from the queue as capacity permits
+ for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
+ rl.handling++
+ ent := rl.queue.removeMax()
+ ent.heappos = -1
+ ent.ready <- true
}
- return h
}
-func (h *limiterHandler) Current() int {
- if cap(h.requests) == 0 {
- return int(atomic.LoadInt64(&h.count))
+// If the queue is too full, fail and remove the lowest-priority
+// entry. Caller must have lock. Queue must not be empty.
+func (rl *RequestLimiter) trimqueue() {
+ if len(rl.queue) <= rl.MaxQueue {
+ return
}
- return len(h.requests)
+ min := 0
+ for i := range rl.queue {
+ if i == 0 || rl.queue.Less(min, i) {
+ min = i
+ }
+ }
+ rl.queue[min].heappos = -1
+ rl.queue[min].ready <- false
+ rl.queue.remove(min)
}
-func (h *limiterHandler) Max() int {
- return cap(h.requests)
+func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ qtime := time.Now()
+ var priority int64
+ if rl.Priority != nil {
+ priority = rl.Priority(req, qtime)
+ }
+ ent := &qent{
+ queued: qtime,
+ priority: priority,
+ ready: make(chan bool, 1),
+ heappos: -1,
+ }
+ if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
+ // fast path, skip the queue
+ rl.handling++
+ ent.ready <- true
+ return ent
+ }
+ rl.queue.add(ent)
+ rl.trimqueue()
+ return ent
}
-func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if cap(h.requests) == 0 {
- atomic.AddInt64(&h.count, 1)
- defer atomic.AddInt64(&h.count, -1)
- h.handler.ServeHTTP(resp, req)
- return
+func (rl *RequestLimiter) remove(ent *qent) {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ if ent.heappos >= 0 {
+ rl.queue.remove(ent.heappos)
+ ent.heappos = -1
+ ent.ready <- false
}
+}
+
+func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ rl.setupOnce.Do(rl.setup)
+ ent := rl.enqueue(req)
+ var ok bool
select {
- case h.requests <- struct{}{}:
- default:
- // reached max requests
+ case <-req.Context().Done():
+ rl.remove(ent)
+ // we still need to wait for ent.ready, because
+ // sometimes runqueue() will have already decided to
+ // send true before our rl.remove() call, and in that
+ // case we'll need to decrement rl.handling below.
+ ok = <-ent.ready
+ case ok = <-ent.ready:
+ }
+ if !ok {
resp.WriteHeader(http.StatusServiceUnavailable)
return
}
- h.handler.ServeHTTP(resp, req)
- <-h.requests
+ defer func() {
+ rl.mtx.Lock()
+ defer rl.mtx.Unlock()
+ rl.handling--
+ // unblock the next waiting request
+ rl.runqueue()
+ }()
+ rl.Handler.ServeHTTP(resp, req)
}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 9258fbfa5..4023a76bc 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -5,11 +5,14 @@
package httpserver
import (
+ "fmt"
"net/http"
"net/http/httptest"
+ "strconv"
"sync"
- "testing"
"time"
+
+ check "gopkg.in/check.v1"
)
type testHandler struct {
@@ -29,9 +32,9 @@ func newTestHandler() *testHandler {
}
}
-func TestRequestLimiter1(t *testing.T) {
+func (s *Suite) TestRequestLimiter1(c *check.C) {
h := newTestHandler()
- l := NewRequestLimiter(1, h, nil)
+ l := RequestLimiter{MaxConcurrent: 1, Handler: h}
var wg sync.WaitGroup
resps := make([]*httptest.ResponseRecorder, 10)
for i := 0; i < 10; i++ {
@@ -59,7 +62,7 @@ func TestRequestLimiter1(t *testing.T) {
select {
case <-done:
case <-time.After(10 * time.Second):
- t.Fatal("test timed out, probably deadlocked")
+ c.Fatal("test timed out, probably deadlocked")
}
n200 := 0
n503 := 0
@@ -70,11 +73,11 @@ func TestRequestLimiter1(t *testing.T) {
case 503:
n503++
default:
- t.Fatalf("Unexpected response code %d", resps[i].Code)
+ c.Fatalf("Unexpected response code %d", resps[i].Code)
}
}
if n200 != 1 || n503 != 9 {
- t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+ c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
}
// Now that all 10 are finished, an 11th request should
// succeed.
@@ -85,13 +88,13 @@ func TestRequestLimiter1(t *testing.T) {
resp := httptest.NewRecorder()
l.ServeHTTP(resp, &http.Request{})
if resp.Code != 200 {
- t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+ c.Errorf("Got status %d on 11th request, want 200", resp.Code)
}
}
-func TestRequestLimiter10(t *testing.T) {
+func (*Suite) TestRequestLimiter10(c *check.C) {
h := newTestHandler()
- l := NewRequestLimiter(10, h, nil)
+ l := RequestLimiter{MaxConcurrent: 10, Handler: h}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
@@ -108,3 +111,62 @@ func TestRequestLimiter10(t *testing.T) {
}
wg.Wait()
}
+
+func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
+ h := newTestHandler()
+ rl := RequestLimiter{
+ MaxConcurrent: 100,
+ MaxQueue: 20,
+ Handler: h,
+ Priority: func(r *http.Request, _ time.Time) int64 {
+ p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
+ return p
+ }}
+
+ c.Logf("starting initial requests")
+ for i := 0; i < rl.MaxConcurrent; i++ {
+ go func() {
+ rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
+ }()
+ }
+ c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
+ for i := 0; i < rl.MaxConcurrent; i++ {
+ <-h.inHandler
+ }
+
+ c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
+ var wg1, wg2 sync.WaitGroup
+ wg1.Add(rl.MaxQueue)
+ wg2.Add(rl.MaxQueue)
+ for i := 0; i < rl.MaxQueue*2; i++ {
+ i := i
+ go func() {
+ pri := (i & 1) + 1
+ resp := httptest.NewRecorder()
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
+ if pri == 1 {
+ c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+ wg1.Done()
+ } else {
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ wg2.Done()
+ }
+ }()
+ }
+
+ c.Logf("waiting for queued priority=1 requests to fail")
+ wg1.Wait()
+
+ c.Logf("allowing initial requests to proceed")
+ for i := 0; i < rl.MaxConcurrent; i++ {
+ h.okToProceed <- struct{}{}
+ }
+
+ c.Logf("allowing queued priority=2 requests to proceed")
+ for i := 0; i < rl.MaxQueue; i++ {
+ <-h.inHandler
+ h.okToProceed <- struct{}{}
+ }
+ c.Logf("waiting for queued priority=2 requests to succeed")
+ wg2.Wait()
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list