[ARVADOS] updated: 1535c8d141b07f246a501d288a0d3a903ce41a56
Git user
git at public.curoverse.com
Fri Apr 29 13:01:02 EDT 2016
Summary of changes:
build/run-tests.sh | 2 +
doc/install/install-keepstore.html.textile.liquid | 2 +
sdk/go/httpserver/request_limiter.go | 29 ++++++
sdk/go/httpserver/request_limiter_test.go | 106 ++++++++++++++++++++++
services/keepstore/handler_test.go | 2 +-
services/keepstore/keepstore.go | 20 +++-
services/keepstore/logging_router.go | 9 +-
7 files changed, 157 insertions(+), 13 deletions(-)
create mode 100644 sdk/go/httpserver/request_limiter.go
create mode 100644 sdk/go/httpserver/request_limiter_test.go
via 1535c8d141b07f246a501d288a0d3a903ce41a56 (commit)
via 0dc0c5650ddcd8376aea84d32d2b81b1cdba0946 (commit)
from e1de889290360f6dd5b5fdeab10cea997bcc6962 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1535c8d141b07f246a501d288a0d3a903ce41a56
Merge: e1de889 0dc0c56
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Apr 29 13:00:11 2016 -0400
Merge branch '9066-max-requests'
refs #9066
commit 0dc0c5650ddcd8376aea84d32d2b81b1cdba0946
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Apr 28 09:11:33 2016 -0400
9066: Add keepstore -max-requests argument.
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 53df93c..c94f831 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -81,6 +81,7 @@ sdk/python
sdk/ruby
sdk/go/arvadosclient
sdk/go/keepclient
+sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
sdk/go/streamer
@@ -703,6 +704,7 @@ declare -a gostuff
gostuff=(
sdk/go/arvadosclient
sdk/go/blockdigest
+ sdk/go/httpserver
sdk/go/manifest
sdk/go/streamer
sdk/go/crunchrunner
diff --git a/doc/install/install-keepstore.html.textile.liquid b/doc/install/install-keepstore.html.textile.liquid
index b211ce6..6548422 100644
--- a/doc/install/install-keepstore.html.textile.liquid
+++ b/doc/install/install-keepstore.html.textile.liquid
@@ -47,6 +47,8 @@ Usage of ./keepstore:
-enforce-permissions=false: Enforce permission signatures on requests.
-listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
-max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
+ -max-requests int
+ Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
-never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
-permission-key-file="": Synonym for -blob-signing-key-file.
-permission-ttl=0: Synonym for -blob-signature-ttl.
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
new file mode 100644
index 0000000..178ffb9
--- /dev/null
+++ b/sdk/go/httpserver/request_limiter.go
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+ "net/http"
+)
+
+type limiterHandler struct {
+ requests chan struct{}
+ handler http.Handler
+}
+
+func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+ return &limiterHandler{
+ requests: make(chan struct{}, maxRequests),
+ handler: handler,
+ }
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ select {
+ case h.requests <- struct{}{}:
+ default:
+ // reached max requests
+ resp.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ h.handler.ServeHTTP(resp, req)
+ <-h.requests
+}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
new file mode 100644
index 0000000..a8cc806
--- /dev/null
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -0,0 +1,106 @@
+package httpserver
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "sync"
+ "testing"
+ "time"
+)
+
+type testHandler struct {
+ inHandler chan struct{}
+ okToProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ h.inHandler <- struct{}{}
+ <-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+ return &testHandler{
+ inHandler: make(chan struct{}),
+ okToProceed: make(chan struct{}),
+ }
+}
+
+func TestRequestLimiter1(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewRequestLimiter(1, h)
+ var wg sync.WaitGroup
+ resps := make([]*httptest.ResponseRecorder, 10)
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ resps[i] = httptest.NewRecorder()
+ go func(i int) {
+ l.ServeHTTP(resps[i], &http.Request{})
+ wg.Done()
+ }(i)
+ }
+ done := make(chan struct{})
+ go func() {
+ // Make sure one request has entered the handler
+ <-h.inHandler
+ // Make sure all unsuccessful requests finish (but don't wait
+ // for the one that's still waiting for okToProceed)
+ wg.Add(-1)
+ wg.Wait()
+ // Wait for the last goroutine
+ wg.Add(1)
+ h.okToProceed <- struct{}{}
+ wg.Wait()
+ done <- struct{}{}
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timed out, probably deadlocked")
+ }
+ n200 := 0
+ n503 := 0
+ for i := 0; i < 10; i++ {
+ switch resps[i].Code {
+ case 200:
+ n200++
+ case 503:
+ n503++
+ default:
+ t.Fatalf("Unexpected response code %d", resps[i].Code)
+ }
+ }
+ if n200 != 1 || n503 != 9 {
+ t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+ }
+ // Now that all 10 are finished, an 11th request should
+ // succeed.
+ go func() {
+ <-h.inHandler
+ h.okToProceed <- struct{}{}
+ }()
+ resp := httptest.NewRecorder()
+ l.ServeHTTP(resp, &http.Request{})
+ if resp.Code != 200 {
+ t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+ }
+}
+
+func TestRequestLimiter10(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewRequestLimiter(10, h)
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+ wg.Done()
+ }()
+ // Make sure the handler starts before we initiate the
+ // next request, but don't let it finish yet.
+ <-h.inHandler
+ }
+ for i := 0; i < 10; i++ {
+ h.okToProceed <- struct{}{}
+ }
+ wg.Wait()
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..33d585a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeLoggingRESTRouter()
+ loggingRouter := MakeRESTRouter()
loggingRouter.ServeHTTP(response, req)
return response
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..93ee43c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
@@ -145,6 +146,7 @@ func main() {
blobSigningKeyFile string
permissionTTLSec int
pidfile string
+ maxRequests int
)
flag.StringVar(
&dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
"listen",
DefaultAddr,
"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+ flag.IntVar(
+ &maxRequests,
+ "max-requests",
+ 0,
+ "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
flag.BoolVar(
&neverDelete,
"never-delete",
@@ -302,13 +309,18 @@ func main() {
}
}
+ if maxRequests <= 0 {
+ maxRequests = maxBuffers * 2
+ log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+ }
+
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(volumes)
- // Tell the built-in HTTP server to direct all requests to the REST router.
- loggingRouter := MakeLoggingRESTRouter()
- http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
- loggingRouter.ServeHTTP(resp, req)
+ // Middleware stack: logger, maxRequests limiter, method handlers
+ http.Handle("/", &LoggingRESTRouter{
+ httpserver.NewRequestLimiter(maxRequests,
+ MakeRESTRouter()),
})
// Set up a TCP listener.
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..a93b72c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -4,7 +4,6 @@ package main
// LoggingResponseWriter
import (
- "github.com/gorilla/mux"
"log"
"net/http"
"strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
// LoggingRESTRouter is used to add logging capabilities to mux.Router
type LoggingRESTRouter struct {
- router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
- router := MakeRESTRouter()
- return (&LoggingRESTRouter{router})
+ router http.Handler
}
func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list