[arvados] created: 2.6.0-265-g110ce6a7e

git repository hosting git at public.arvados.org
Thu Jun 8 18:33:05 UTC 2023


        at  110ce6a7e9c55fca0e2d43f8629be73fc0f8ba25 (commit)


commit 110ce6a7e9c55fca0e2d43f8629be73fc0f8ba25
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 8 14:30:23 2023 -0400

    20602: Disable queue when testing DumpRequests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 026a01eca..37eac86d2 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -213,7 +213,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 // JSON file in the specified directory.
 func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, reg *prometheus.Registry, srv *http.Server, logger logrus.FieldLogger) {
 	outdir := cluster.SystemLogs.RequestQueueDumpDirectory
-	if outdir == "" || cluster.ManagementToken == "" {
+	if outdir == "" || cluster.ManagementToken == "" || cluster.API.MaxConcurrentRequests < 1 {
 		return
 	}
 	logger = logger.WithField("worker", "RequestQueueDump")
diff --git a/lib/service/cmd_test.go b/lib/service/cmd_test.go
index 5a7fbc052..97a6bd8a4 100644
--- a/lib/service/cmd_test.go
+++ b/lib/service/cmd_test.go
@@ -210,7 +210,9 @@ Clusters:
  zzzzz:
   SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
   ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
-  API: {MaxConcurrentRequests: %d}
+  API:
+   MaxConcurrentRequests: %d
+   MaxQueuedRequests: 0
   SystemLogs: {RequestQueueDumpDirectory: %q}
   Services:
    Controller:
@@ -268,15 +270,15 @@ Clusters:
 			panic("timed out")
 		}
 	}
-	for {
-		time.Sleep(time.Second / 100)
+	for delay := time.Second / 100; ; delay = delay * 2 {
+		time.Sleep(delay)
 		j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
 		if os.IsNotExist(err) && deadline.After(time.Now()) {
 			continue
 		}
-		c.Check(err, check.IsNil)
-		c.Logf("%s", stderr.String())
-		c.Logf("%s", string(j))
+		c.Assert(err, check.IsNil)
+		c.Logf("stderr:\n%s", stderr.String())
+		c.Logf("json:\n%s", string(j))
 
 		var loaded []struct{ URL string }
 		err = json.Unmarshal(j, &loaded)

commit 19e9dda26118f915c8f2631a488eceed7943a9a4
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 8 10:31:11 2023 -0400

    20602: Prioritize backend traffic from keep-web as interactive.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index c73191103..c44a2eb73 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -141,6 +141,11 @@ func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSessi
 			return nil, nil, nil, err
 		}
 		sess.client.AuthToken = token
+		// A non-empty origin header tells controller to
+		// prioritize our traffic as interactive, which is
+		// true most of the time.
+		origin := c.cluster.Services.WebDAVDownload.ExternalURL
+		sess.client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
 		sess.arvadosclient, err = arvadosclient.New(sess.client)
 		if err != nil {
 			return nil, nil, nil, err

commit 96d8b9e1afecccae803ec4b956ada745dbe71d9f
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 8 10:30:35 2023 -0400

    20602: Attach assigned priority to response log entry.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 988643c74..2bb0a5673 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -11,6 +11,7 @@ import (
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/sirupsen/logrus"
 )
 
 // RequestLimiter wraps http.Handler, limiting the number of
@@ -258,6 +259,7 @@ func (rl *RequestLimiter) remove(ent *qent) {
 func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	rl.setupOnce.Do(rl.setup)
 	ent := rl.enqueue(req)
+	SetResponseLogFields(req.Context(), logrus.Fields{"priority": ent.priority})
 	var ok bool
 	select {
 	case <-req.Context().Done():

commit eaab5e4438da9239245a7c54a4b32bbe9e283780
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jun 7 17:13:30 2023 -0400

    20602: Prioritize interactive (browser) requests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 1073f421d..026a01eca 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -10,6 +10,7 @@ import (
 	"flag"
 	"fmt"
 	"io"
+	"math"
 	"net"
 	"net/http"
 	"net/http/httptest"
@@ -158,6 +159,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 								Handler:       handler,
 								MaxConcurrent: cluster.API.MaxConcurrentRequests,
 								MaxQueue:      cluster.API.MaxQueuedRequests,
+								Priority:      c.requestPriority,
 								Registry:      reg}))))))
 	srv := &httpserver.Server{
 		Server: http.Server{
@@ -254,6 +256,25 @@ func (c *command) requestQueueDumpCheck(cluster *arvados.Cluster, prog string, r
 	}
 }
 
+func (c *command) requestPriority(req *http.Request, queued time.Time) int64 {
+	switch {
+	case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/containers/") && strings.HasSuffix(req.URL.Path, "/lock"):
+		// Return 503 immediately instead of queueing. We want
+		// to send feedback to dispatchcloud ASAP to stop
+		// bringing up new containers.
+		return math.MinInt64
+	case req.Method == http.MethodPost && strings.HasPrefix(req.URL.Path, "/arvados/v1/logs"):
+		// "Create log entry" is the most harmless kind of
+		// request to drop.
+		return 0
+	case req.Header.Get("Origin") != "":
+		// Handle interactive requests first.
+		return 2
+	default:
+		return 1
+	}
+}
+
 // If an incoming request's target vhost has an embedded collection
 // UUID or PDH, handle it with hTrue, otherwise handle it with
 // hFalse.

commit 1c92ba23521c23d00f97fee0afa2649afd73b504
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jun 7 17:12:17 2023 -0400

    20602: Priority MinInt64 means never wait in queue.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 094f6a6ca..988643c74 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -5,6 +5,7 @@
 package httpserver
 
 import (
+	"math"
 	"net/http"
 	"sync"
 	"time"
@@ -231,6 +232,14 @@ func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
 		ent.ready <- true
 		return ent
 	}
+	if priority == math.MinInt64 {
+		// Priority func is telling us to return 503
+		// immediately instead of queueing, regardless of
+		// queue size, if we can't handle the request
+		// immediately.
+		ent.ready <- false
+		return ent
+	}
 	rl.queue.add(ent)
 	rl.trimqueue()
 	return ent
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 4023a76bc..bdc0401b3 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -6,6 +6,7 @@ package httpserver
 
 import (
 	"fmt"
+	"math"
 	"net/http"
 	"net/http/httptest"
 	"strconv"
@@ -134,6 +135,19 @@ func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
 		<-h.inHandler
 	}
 
+	c.Logf("starting %d priority=MinInt64 requests (should respond 503 immediately)", rl.MaxQueue)
+	var wgX sync.WaitGroup
+	for i := 0; i < rl.MaxQueue; i++ {
+		wgX.Add(1)
+		go func() {
+			defer wgX.Done()
+			resp := httptest.NewRecorder()
+			rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", math.MinInt64)}}})
+			c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+		}()
+	}
+	wgX.Wait()
+
 	c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
 	var wg1, wg2 sync.WaitGroup
 	wg1.Add(rl.MaxQueue)

commit 2ca9ee944a0bbeeb7226ea6fb6cd535b9ca8cbdb
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jun 7 12:37:40 2023 -0400

    20602: Add MaxQueuedRequests config.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 837ce896e..33c1e497d 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -227,6 +227,15 @@ Clusters:
       # in a single service process, or 0 for no limit.
       MaxConcurrentRequests: 64
 
+      # Maximum number of incoming requests to hold in a priority
+      # queue waiting for one of the MaxConcurrentRequests slots to be
+      # free. When the queue is longer than this, respond 503 to the
+      # lowest priority request.
+      #
+      # If MaxQueuedRequests is 0, respond 503 immediately to
+      # additional requests while at the MaxConcurrentRequests limit.
+      MaxQueuedRequests: 64
+
       # Fraction of MaxConcurrentRequests that can be "log create"
       # messages at any given time.  This is to prevent logging
       # updates from crowding out more important requests.
diff --git a/lib/config/export.go b/lib/config/export.go
index 44fd55941..565be2fb7 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -65,13 +65,14 @@ var whitelist = map[string]bool{
 	"API.FreezeProjectRequiresDescription":     true,
 	"API.FreezeProjectRequiresProperties":      true,
 	"API.FreezeProjectRequiresProperties.*":    true,
-	"API.LockBeforeUpdate":                     false,
 	"API.KeepServiceRequestTimeout":            false,
-	"API.MaxConcurrentRequests":                false,
+	"API.LockBeforeUpdate":                     false,
 	"API.LogCreateRequestFraction":             false,
+	"API.MaxConcurrentRequests":                false,
 	"API.MaxIndexDatabaseRead":                 false,
 	"API.MaxItemsPerResponse":                  true,
 	"API.MaxKeepBlobBuffers":                   false,
+	"API.MaxQueuedRequests":                    false,
 	"API.MaxRequestAmplification":              false,
 	"API.MaxRequestSize":                       true,
 	"API.MaxTokenLifetime":                     false,
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index 119d66191..1073f421d 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -157,6 +157,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 							&httpserver.RequestLimiter{
 								Handler:       handler,
 								MaxConcurrent: cluster.API.MaxConcurrentRequests,
+								MaxQueue:      cluster.API.MaxQueuedRequests,
 								Registry:      reg}))))))
 	srv := &httpserver.Server{
 		Server: http.Server{
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 4da851763..2e9abf2ec 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -100,6 +100,7 @@ type Cluster struct {
 		MaxIndexDatabaseRead             int
 		MaxItemsPerResponse              int
 		MaxConcurrentRequests            int
+		MaxQueuedRequests                int
 		LogCreateRequestFraction         float64
 		MaxKeepBlobBuffers               int
 		MaxRequestAmplification          int

commit 53c44ca2f2e9e2df9d5eee96e7806fbb7576eadd
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jun 7 12:37:25 2023 -0400

    20602: Add priority queue to RequestLimiter.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index a52d09f68..837ce896e 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -223,8 +223,8 @@ Clusters:
       # parameter higher than this value, this value is used instead.
       MaxItemsPerResponse: 1000
 
-      # Maximum number of concurrent requests to accept in a single
-      # service process, or 0 for no limit.
+      # Maximum number of concurrent requests to process concurrently
+      # in a single service process, or 0 for no limit.
       MaxConcurrentRequests: 64
 
       # Fraction of MaxConcurrentRequests that can be "log create"
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index cc6938cbc..119d66191 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -154,7 +154,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
 				httpserver.Inspect(reg, cluster.ManagementToken,
 					httpserver.LogRequests(
 						interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-							httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
+							&httpserver.RequestLimiter{
+								Handler:       handler,
+								MaxConcurrent: cluster.API.MaxConcurrentRequests,
+								Registry:      reg}))))))
 	srv := &httpserver.Server{
 		Server: http.Server{
 			Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
index 888945312..094f6a6ca 100644
--- a/sdk/go/httpserver/request_limiter.go
+++ b/sdk/go/httpserver/request_limiter.go
@@ -6,87 +6,270 @@ package httpserver
 
 import (
 	"net/http"
-	"sync/atomic"
+	"sync"
+	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
 )
 
-// RequestCounter is an http.Handler that tracks the number of
-// requests in progress.
-type RequestCounter interface {
-	http.Handler
+// RequestLimiter wraps http.Handler, limiting the number of
+// concurrent requests being handled by the wrapped Handler. Requests
+// that arrive when the handler is already at the specified
+// concurrency limit are queued and handled in the order indicated by
+// the Priority function.
+//
+// Caller must not modify any RequestLimiter fields after calling its
+// methods.
+type RequestLimiter struct {
+	Handler http.Handler
+
+	// Maximum number of requests being handled at once. Beyond
+	// this limit, requests will be queued.
+	MaxConcurrent int
+
+	// Maximum number of requests in the queue. Beyond this limit,
+	// the lowest priority requests will return 503.
+	MaxQueue int
 
-	// Current() returns the number of requests in progress.
-	Current() int
+	// Priority determines queue ordering. Requests with higher
+	// priority are handled first. Requests with equal priority
+	// are handled FIFO. If Priority is nil, all requests are
+	// handled FIFO.
+	Priority func(req *http.Request, queued time.Time) int64
 
-	// Max() returns the maximum number of concurrent requests
-	// that will be accepted.
-	Max() int
+	// "concurrent_requests", "max_concurrent_requests",
+	// "queued_requests", and "max_queued_requests" metrics are
+	// registered with Registry, if it is not nil.
+	Registry *prometheus.Registry
+
+	setupOnce sync.Once
+	mtx       sync.Mutex
+	handling  int
+	queue     heap
 }
 
-type limiterHandler struct {
-	requests chan struct{}
-	handler  http.Handler
-	count    int64 // only used if cap(requests)==0
+type qent struct {
+	queued   time.Time
+	priority int64
+	heappos  int
+	ready    chan bool // true = handle now; false = return 503 now
 }
 
-// NewRequestLimiter returns a RequestCounter that delegates up to
-// maxRequests at a time to the given handler, and responds 503 to all
-// incoming requests beyond that limit.
-//
-// "concurrent_requests" and "max_concurrent_requests" metrics are
-// registered with the given reg, if reg is not nil.
-func NewRequestLimiter(maxRequests int, handler http.Handler, reg *prometheus.Registry) RequestCounter {
-	h := &limiterHandler{
-		requests: make(chan struct{}, maxRequests),
-		handler:  handler,
+type heap []*qent
+
+func (h heap) Swap(i, j int) {
+	h[i], h[j] = h[j], h[i]
+	h[i].heappos, h[j].heappos = i, j
+}
+
+func (h heap) Less(i, j int) bool {
+	pi, pj := h[i].priority, h[j].priority
+	return pi > pj || (pi == pj && h[i].queued.Before(h[j].queued))
+}
+
+func (h heap) Len() int {
+	return len(h)
+}
+
+// Move element i to a correct position in the heap. When the heap is
+// empty, fix(0) is a no-op (does not crash).
+func (h heap) fix(i int) {
+	// If the initial position is a leaf (i.e., index is greater
+	// than the last node's parent index), we only need to move it
+	// up, not down.
+	uponly := i > (len(h)-2)/2
+	// Move the new entry toward the root until reaching a
+	// position where the parent already has higher priority.
+	for i > 0 {
+		parent := (i - 1) / 2
+		if h.Less(i, parent) {
+			h.Swap(i, parent)
+			i = parent
+		} else {
+			break
+		}
 	}
-	if reg != nil {
-		reg.MustRegister(prometheus.NewGaugeFunc(
+	// Move i away from the root until reaching a position where
+	// both children already have lower priority.
+	for !uponly {
+		child := i*2 + 1
+		if child+1 < len(h) && h.Less(child+1, child) {
+			// Right child has higher priority than left
+			// child. Choose right child.
+			child = child + 1
+		}
+		if child < len(h) && h.Less(child, i) {
+			// Chosen child has higher priority than i.
+			// Swap and continue down.
+			h.Swap(i, child)
+			i = child
+		} else {
+			break
+		}
+	}
+}
+
+func (h *heap) add(ent *qent) {
+	ent.heappos = len(*h)
+	*h = append(*h, ent)
+	h.fix(ent.heappos)
+}
+
+func (h *heap) removeMax() *qent {
+	ent := (*h)[0]
+	if len(*h) == 1 {
+		*h = (*h)[:0]
+	} else {
+		h.Swap(0, len(*h)-1)
+		*h = (*h)[:len(*h)-1]
+		h.fix(0)
+	}
+	ent.heappos = -1
+	return ent
+}
+
+func (h *heap) remove(i int) {
+	// Move the last leaf into i's place, then move it to a
+	// correct position.
+	h.Swap(i, len(*h)-1)
+	*h = (*h)[:len(*h)-1]
+	if i < len(*h) {
+		h.fix(i)
+	}
+}
+
+func (rl *RequestLimiter) setup() {
+	if rl.Registry != nil {
+		rl.Registry.MustRegister(prometheus.NewGaugeFunc(
 			prometheus.GaugeOpts{
 				Namespace: "arvados",
 				Name:      "concurrent_requests",
 				Help:      "Number of requests in progress",
 			},
-			func() float64 { return float64(h.Current()) },
+			func() float64 {
+				rl.mtx.Lock()
+				defer rl.mtx.Unlock()
+				return float64(rl.handling)
+			},
 		))
-		reg.MustRegister(prometheus.NewGaugeFunc(
+		rl.Registry.MustRegister(prometheus.NewGaugeFunc(
 			prometheus.GaugeOpts{
 				Namespace: "arvados",
 				Name:      "max_concurrent_requests",
 				Help:      "Maximum number of concurrent requests",
 			},
-			func() float64 { return float64(h.Max()) },
+			func() float64 { return float64(rl.MaxConcurrent) },
+		))
+		rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Namespace: "arvados",
+				Name:      "queued_requests",
+				Help:      "Number of requests in queue",
+			},
+			func() float64 {
+				rl.mtx.Lock()
+				defer rl.mtx.Unlock()
+				return float64(len(rl.queue))
+			},
 		))
+		rl.Registry.MustRegister(prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Namespace: "arvados",
+				Name:      "max_queued_requests",
+				Help:      "Maximum number of queued requests",
+			},
+			func() float64 { return float64(rl.MaxQueue) },
+		))
+	}
+}
+
+// caller must have lock
+func (rl *RequestLimiter) runqueue() {
+	// Handle entries from the queue as capacity permits
+	for len(rl.queue) > 0 && (rl.MaxConcurrent == 0 || rl.handling < rl.MaxConcurrent) {
+		rl.handling++
+		ent := rl.queue.removeMax()
+		ent.heappos = -1
+		ent.ready <- true
 	}
-	return h
 }
 
-func (h *limiterHandler) Current() int {
-	if cap(h.requests) == 0 {
-		return int(atomic.LoadInt64(&h.count))
+// If the queue is too full, fail and remove the lowest-priority
+// entry. Caller must have lock. Queue must not be empty.
+func (rl *RequestLimiter) trimqueue() {
+	if len(rl.queue) <= rl.MaxQueue {
+		return
 	}
-	return len(h.requests)
+	min := 0
+	for i := range rl.queue {
+		if i == 0 || rl.queue.Less(min, i) {
+			min = i
+		}
+	}
+	rl.queue[min].heappos = -1
+	rl.queue[min].ready <- false
+	rl.queue.remove(min)
 }
 
-func (h *limiterHandler) Max() int {
-	return cap(h.requests)
+func (rl *RequestLimiter) enqueue(req *http.Request) *qent {
+	rl.mtx.Lock()
+	defer rl.mtx.Unlock()
+	qtime := time.Now()
+	var priority int64
+	if rl.Priority != nil {
+		priority = rl.Priority(req, qtime)
+	}
+	ent := &qent{
+		queued:   qtime,
+		priority: priority,
+		ready:    make(chan bool, 1),
+		heappos:  -1,
+	}
+	if rl.MaxConcurrent == 0 || rl.MaxConcurrent > rl.handling {
+		// fast path, skip the queue
+		rl.handling++
+		ent.ready <- true
+		return ent
+	}
+	rl.queue.add(ent)
+	rl.trimqueue()
+	return ent
 }
 
-func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-	if cap(h.requests) == 0 {
-		atomic.AddInt64(&h.count, 1)
-		defer atomic.AddInt64(&h.count, -1)
-		h.handler.ServeHTTP(resp, req)
-		return
+func (rl *RequestLimiter) remove(ent *qent) {
+	rl.mtx.Lock()
+	defer rl.mtx.Unlock()
+	if ent.heappos >= 0 {
+		rl.queue.remove(ent.heappos)
+		ent.heappos = -1
+		ent.ready <- false
 	}
+}
+
+func (rl *RequestLimiter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	rl.setupOnce.Do(rl.setup)
+	ent := rl.enqueue(req)
+	var ok bool
 	select {
-	case h.requests <- struct{}{}:
-	default:
-		// reached max requests
+	case <-req.Context().Done():
+		rl.remove(ent)
+		// we still need to wait for ent.ready, because
+		// sometimes runqueue() will have already decided to
+		// send true before our rl.remove() call, and in that
+		// case we'll need to decrement rl.handling below.
+		ok = <-ent.ready
+	case ok = <-ent.ready:
+	}
+	if !ok {
 		resp.WriteHeader(http.StatusServiceUnavailable)
 		return
 	}
-	h.handler.ServeHTTP(resp, req)
-	<-h.requests
+	defer func() {
+		rl.mtx.Lock()
+		defer rl.mtx.Unlock()
+		rl.handling--
+		// unblock the next waiting request
+		rl.runqueue()
+	}()
+	rl.Handler.ServeHTTP(resp, req)
 }
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
index 9258fbfa5..4023a76bc 100644
--- a/sdk/go/httpserver/request_limiter_test.go
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -5,11 +5,14 @@
 package httpserver
 
 import (
+	"fmt"
 	"net/http"
 	"net/http/httptest"
+	"strconv"
 	"sync"
-	"testing"
 	"time"
+
+	check "gopkg.in/check.v1"
 )
 
 type testHandler struct {
@@ -29,9 +32,9 @@ func newTestHandler() *testHandler {
 	}
 }
 
-func TestRequestLimiter1(t *testing.T) {
+func (s *Suite) TestRequestLimiter1(c *check.C) {
 	h := newTestHandler()
-	l := NewRequestLimiter(1, h, nil)
+	l := RequestLimiter{MaxConcurrent: 1, Handler: h}
 	var wg sync.WaitGroup
 	resps := make([]*httptest.ResponseRecorder, 10)
 	for i := 0; i < 10; i++ {
@@ -59,7 +62,7 @@ func TestRequestLimiter1(t *testing.T) {
 	select {
 	case <-done:
 	case <-time.After(10 * time.Second):
-		t.Fatal("test timed out, probably deadlocked")
+		c.Fatal("test timed out, probably deadlocked")
 	}
 	n200 := 0
 	n503 := 0
@@ -70,11 +73,11 @@ func TestRequestLimiter1(t *testing.T) {
 		case 503:
 			n503++
 		default:
-			t.Fatalf("Unexpected response code %d", resps[i].Code)
+			c.Fatalf("Unexpected response code %d", resps[i].Code)
 		}
 	}
 	if n200 != 1 || n503 != 9 {
-		t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+		c.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
 	}
 	// Now that all 10 are finished, an 11th request should
 	// succeed.
@@ -85,13 +88,13 @@ func TestRequestLimiter1(t *testing.T) {
 	resp := httptest.NewRecorder()
 	l.ServeHTTP(resp, &http.Request{})
 	if resp.Code != 200 {
-		t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+		c.Errorf("Got status %d on 11th request, want 200", resp.Code)
 	}
 }
 
-func TestRequestLimiter10(t *testing.T) {
+func (*Suite) TestRequestLimiter10(c *check.C) {
 	h := newTestHandler()
-	l := NewRequestLimiter(10, h, nil)
+	l := RequestLimiter{MaxConcurrent: 10, Handler: h}
 	var wg sync.WaitGroup
 	for i := 0; i < 10; i++ {
 		wg.Add(1)
@@ -108,3 +111,62 @@ func TestRequestLimiter10(t *testing.T) {
 	}
 	wg.Wait()
 }
+
+func (*Suite) TestRequestLimiterQueuePriority(c *check.C) {
+	h := newTestHandler()
+	rl := RequestLimiter{
+		MaxConcurrent: 100,
+		MaxQueue:      20,
+		Handler:       h,
+		Priority: func(r *http.Request, _ time.Time) int64 {
+			p, _ := strconv.ParseInt(r.Header.Get("Priority"), 10, 64)
+			return p
+		}}
+
+	c.Logf("starting initial requests")
+	for i := 0; i < rl.MaxConcurrent; i++ {
+		go func() {
+			rl.ServeHTTP(httptest.NewRecorder(), &http.Request{Header: http.Header{"No-Priority": {"x"}}})
+		}()
+	}
+	c.Logf("waiting for initial requests to consume all MaxConcurrent slots")
+	for i := 0; i < rl.MaxConcurrent; i++ {
+		<-h.inHandler
+	}
+
+	c.Logf("starting %d priority=1 and %d priority=1 requests", rl.MaxQueue, rl.MaxQueue)
+	var wg1, wg2 sync.WaitGroup
+	wg1.Add(rl.MaxQueue)
+	wg2.Add(rl.MaxQueue)
+	for i := 0; i < rl.MaxQueue*2; i++ {
+		i := i
+		go func() {
+			pri := (i & 1) + 1
+			resp := httptest.NewRecorder()
+			rl.ServeHTTP(resp, &http.Request{Header: http.Header{"Priority": {fmt.Sprintf("%d", pri)}}})
+			if pri == 1 {
+				c.Check(resp.Code, check.Equals, http.StatusServiceUnavailable)
+				wg1.Done()
+			} else {
+				c.Check(resp.Code, check.Equals, http.StatusOK)
+				wg2.Done()
+			}
+		}()
+	}
+
+	c.Logf("waiting for queued priority=1 requests to fail")
+	wg1.Wait()
+
+	c.Logf("allowing initial requests to proceed")
+	for i := 0; i < rl.MaxConcurrent; i++ {
+		h.okToProceed <- struct{}{}
+	}
+
+	c.Logf("allowing queued priority=2 requests to proceed")
+	for i := 0; i < rl.MaxQueue; i++ {
+		<-h.inHandler
+		h.okToProceed <- struct{}{}
+	}
+	c.Logf("waiting for queued priority=2 requests to succeed")
+	wg2.Wait()
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list