[arvados] updated: 2.6.0-267-gb30e070d2
git repository hosting
git at public.arvados.org
Mon Jun 12 19:04:40 UTC 2023
Summary of changes:
lib/service/cmd.go | 3 +-
sdk/go/httpserver/request_limiter.go | 96 ++++++++++---------------------
sdk/go/httpserver/request_limiter_test.go | 9 ++-
3 files changed, 34 insertions(+), 74 deletions(-)
via b30e070d2cb99ccbe60eb9684bf0627dcbc3779b (commit)
via b9f32ccecc7835729d6ff20bd0a82b113bb00727 (commit)
from 110ce6a7e9c55fca0e2d43f8629be73fc0f8ba25 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b30e070d2cb99ccbe60eb9684bf0627dcbc3779b
Author: Tom Clegg <tom at curii.com>
Date: Mon Jun 12 14:47:30 2023 -0400
20602: Add IneligibleForQueuePriority const.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 37eac86d2..c66c27943 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -10,7 +10,6 @@ import (
"flag"
"fmt"
"io"
- "math"
"net"
"net/http"
"net/http/httptest"
@@ -262,7 +261,7 @@ func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
// Return 503 immediately instead of queueing. We want
// to send feedback to dispatchcloud ASAP to stop
// bringing up new containers.
- return math.MinInt64
+ return httpserver.IneligibleForQueuePriority
case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
// "Create log entry" is the most harmless kind of
// request to drop.
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 402de3e10..cd928e6ea 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -15,6 +15,8 @@ import (
"github.com/sirupsen/logrus"
)
+const IneligibleForQueuePriority = math.MinInt64
+
// RequestLimiter wraps http.Handler, limiting the number of
// concurrent requests being handled by the wrapped Handler. Requests
// that arrive when the handler is already at the specified
@@ -194,7 +196,7 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
ent.ready <- true
return ent
}
- if priority == math.MinInt64 {
+ if priority == IneligibleForQueuePriority {
// Priority func is telling us to return 503
// immediately instead of queueing, regardless of
// queue size, if we can't handle the request
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index fd09c9305..b04ff57cc 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -6,7 +6,6 @@ package httpserver
import (
"fmt"
- "math"
"net/http"
"net/http/httptest"
"strconv"
@@ -135,14 +134,14 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
<-h.inHandler
}
- c.Logf("starting %d priority=MinInt64 requests (should respond 503 immediately)", rl.MaxQueue)
+ c.Logf("starting %d priority=IneligibleForQueuePriority requests (should respond 503 immediately)", rl.MaxQueue)
var wgX sync.WaitGroup
for i := 0; i < rl.MaxQueue; i++ {
wgX.Add(1)
go func() {
defer wgX.Done()
resp := httptest.NewRecorder()
- rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", math.MinInt64)}}})
+ rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", IneligibleForQueuePriority)}}})
c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
}()
}
commit b9f32ccecc7835729d6ff20bd0a82b113bb00727
Author: Tom Clegg <tom at curii.com>
Date: Mon Jun 12 14:43:39 2023 -0400
20602: Use stdlib heap implementation.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 2bb0a5673..402de3e10 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -5,6 +5,7 @@
package httpserver
import (
+ "container/heap"
"math"
"net/http"
"sync"
@@ -47,7 +48,7 @@ type RequestLimiter struct {
setupOnce sync.Once
mtx sync.Mutex
handling int
- queue heap
+ queue queue
}
type qent struct {
@@ -57,87 +58,49 @@ type qent struct {
ready chan bool // true = handle now; false = return 503 now
}
-type heap []*qent
+type queue []*qent
-func (h heap) Swap(i, j int) {
+func (h queue) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].heappos, h[j].heappos = i, j
}
-func (h heap) Less(i, j int) bool {
+func (h queue) Less(i, j int) bool {
pi, pj := h[i].priority, h[j].priority
return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
}
-func (h heap) Len() int {
+func (h queue) Len() int {
return len(h)
}
-// Move element i to a correct position in the heap. When the heap is
-// empty, fix(0) is a no-op (does not crash).
-func (h heap) fix(i int) {
- // If the initial position is a leaf (i.e., index is greater
- // than the last node's parent index), we only need to move it
- // up, not down.
- uponly := i > (len(h)-2)/2
- // Move the new entry toward the root until reaching a
- // position where the parent already has higher priority.
- for i > 0 {
- parent := (i - 1) / 2
- if h.Less(i, parent) {
- h.Swap(i, parent)
- i = parent
- } else {
- break
- }
- }
- // Move i away from the root until reaching a position where
- // both children already have lower priority.
- for !uponly {
- child := i*2 + 1
- if child+1 < len(h) && h.Less(child+1, child) {
- // Right child has higher priority than left
- // child. Choose right child.
- child = child + 1
- }
- if child < len(h) && h.Less(child, i) {
- // Chosen child has higher priority than i.
- // Swap and continue down.
- h.Swap(i, child)
- i = child
- } else {
- break
- }
- }
-}
-
-func (h *heap) add(ent *qent) {
- ent.heappos = len(*h)
+func (h *queue) Push(x interface{}) {
+ n := len(*h)
+ ent := x.(*qent)
+ ent.heappos = n
*h = append(*h, ent)
- h.fix(ent.heappos)
}
-func (h *heap) removeMax() *qent {
- ent := (*h)[0]
- if len(*h) == 1 {
- *h = (*h)[:0]
- } else {
- h.Swap(0, len(*h)-1)
- *h = (*h)[:len(*h)-1]
- h.fix(0)
- }
+func (h *queue) Pop() interface{} {
+ n := len(*h)
+ ent := (*h)[n-1]
ent.heappos = -1
+ (*h)[n-1] = nil
+ *h = (*h)[0 : n-1]
return ent
}
-func (h *heap) remove(i int) {
- // Move the last leaf into i's place, then move it to a
- // correct position.
- h.Swap(i, len(*h)-1)
- *h = (*h)[:len(*h)-1]
- if i < len(*h) {
- h.fix(i)
- }
+func (h *queue) add(ent *qent) {
+ ent.heappos = h.Len()
+ h.Push(ent)
+}
+
+func (h *queue) removeMax() *qent {
+ return heap.Pop(h).(*qent)
+}
+
+func (h *queue) remove(i int) {
+ heap.Remove(h, i)
}
func (rl *RequestLimiter) setup() {
@@ -191,7 +154,6 @@ func (rl *RequestLimiter) runqueue() {
for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
rl.handling++
ent := rl.queue.removeMax()
- ent.heappos = -1
ent.ready <- true
}
}
@@ -208,7 +170,6 @@ func (rl *RequestLimiter) trimqueue() {
min = i
}
}
- rl.queue[min].heappos = -1
rl.queue[min].ready <- false
rl.queue.remove(min)
}
@@ -251,7 +212,6 @@ func (rl *RequestLimiter) remove(ent *qent) {
defer rl.mtx.Unlock()
if ent.heappos >= 0 {
rl.queue.remove(ent.heappos)
- ent.heappos = -1
ent.ready <- false
}
}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index bdc0401b3..fd09c9305 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -116,8 +116,8 @@ func (*Suite) TestRequestLimiter10(c *check.C) {
func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
h := newTestHandler()
rl := RequestLimiter{
- MaxConcurrent: 100,
- MaxQueue: 20,
+ MaxConcurrent: 1000,
+ MaxQueue: 200,
Handler: h,
Priority: func(r *http.Request, _ time.Time) int64 {
p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list