[ARVADOS] created: c956e6d7e42b0ea4d6194f02866a48090526d55b

Git user git at public.curoverse.com
Thu Apr 28 09:15:00 EDT 2016


        at  c956e6d7e42b0ea4d6194f02866a48090526d55b (commit)


commit c956e6d7e42b0ea4d6194f02866a48090526d55b
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Apr 28 09:11:33 2016 -0400

    9066: Add keepstore -max-requests argument.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 53df93c..c94f831 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -81,6 +81,7 @@ sdk/python
 sdk/ruby
 sdk/go/arvadosclient
 sdk/go/keepclient
+sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
 sdk/go/streamer
@@ -703,6 +704,7 @@ declare -a gostuff
 gostuff=(
     sdk/go/arvadosclient
     sdk/go/blockdigest
+    sdk/go/httpserver
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
diff --git a/doc/install/install-keepstore.html.textile.liquid b/doc/install/install-keepstore.html.textile.liquid
index b211ce6..6548422 100644
--- a/doc/install/install-keepstore.html.textile.liquid
+++ b/doc/install/install-keepstore.html.textile.liquid
@@ -47,6 +47,8 @@ Usage of ./keepstore:
   -enforce-permissions=false: Enforce permission signatures on requests.
   -listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
   -max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
+  -max-requests int
+   Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
   -never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
   -permission-key-file="": Synonym for -blob-signing-key-file.
   -permission-ttl=0: Synonym for -blob-signature-ttl.
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
new file mode 100644
index 0000000..178ffb9
--- /dev/null
+++ b/sdk/go/httpserver/request_limiter.go
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+	"net/http"
+)
+
+type limiterHandler struct {
+	requests chan struct{}
+	handler  http.Handler
+}
+
+func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+	return &limiterHandler{
+		requests: make(chan struct{}, maxRequests),
+		handler:  handler,
+	}
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	select {
+	case h.requests <- struct{}{}:
+	default:
+		// reached max requests
+		resp.WriteHeader(http.StatusServiceUnavailable)
+		return
+	}
+	h.handler.ServeHTTP(resp, req)
+	<-h.requests
+}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
new file mode 100644
index 0000000..839c56e
--- /dev/null
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -0,0 +1,95 @@
+package httpserver
+
+import (
+	"net/http"
+	"net/http/httptest"
+	"sync"
+	"testing"
+	"time"
+)
+
+type testHandler struct {
+	inHandler   chan struct{}
+	okToProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	h.inHandler <- struct{}{}
+	<-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+	return &testHandler{
+		inHandler:   make(chan struct{}),
+		okToProceed: make(chan struct{}),
+	}
+}
+
+func TestRequestLimiter1(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewRequestLimiter(1, h)
+	var wg sync.WaitGroup
+	resps := make([]*httptest.ResponseRecorder, 10)
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		resps[i] = httptest.NewRecorder()
+		go func(i int) {
+			l.ServeHTTP(resps[i], &http.Request{})
+			wg.Done()
+		}(i)
+	}
+	done := make(chan struct{})
+	go func() {
+		// Make sure one request has entered the handler
+		<-h.inHandler
+		// Make sure all unsuccessful requests finish (but don't wait
+		// for the one that's still waiting for okToProceed)
+		wg.Add(-1)
+		wg.Wait()
+		// Wait for the last goroutine
+		wg.Add(1)
+		h.okToProceed <- struct{}{}
+		wg.Wait()
+		done <- struct{}{}
+	}()
+	select {
+	case <-done:
+	case <-time.After(10 * time.Second):
+		t.Fatal("test timed out, probably deadlocked")
+	}
+	n200 := 0
+	n503 := 0
+	for i := 0; i < 10; i++ {
+		switch resps[i].Code {
+		case 200:
+			n200++
+		case 503:
+			n503++
+		default:
+			t.Fatalf("Unexpected response code %d", resps[i].Code)
+		}
+	}
+	if n200 != 1 || n503 != 9 {
+		t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+	}
+}
+
+func TestRequestLimiter10(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewRequestLimiter(10, h)
+	var wg sync.WaitGroup
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+			wg.Done()
+		}()
+		// Make sure the handler starts before we initiate the
+		// next request, but don't let it finish yet.
+		<-h.inHandler
+	}
+	for i := 0; i < 10; i++ {
+		h.okToProceed <- struct{}{}
+	}
+	wg.Wait()
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..33d585a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
 	}
-	loggingRouter := MakeLoggingRESTRouter()
+	loggingRouter := MakeRESTRouter()
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..93ee43c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"flag"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/httpserver"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"io/ioutil"
 	"log"
@@ -145,6 +146,7 @@ func main() {
 		blobSigningKeyFile   string
 		permissionTTLSec     int
 		pidfile              string
+		maxRequests          int
 	)
 	flag.StringVar(
 		&dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
 		"listen",
 		DefaultAddr,
 		"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+	flag.IntVar(
+		&maxRequests,
+		"max-requests",
+		0,
+		"Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
 	flag.BoolVar(
 		&neverDelete,
 		"never-delete",
@@ -302,13 +309,18 @@ func main() {
 		}
 	}
 
+	if maxRequests <= 0 {
+		maxRequests = maxBuffers * 2
+		log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+	}
+
 	// Start a round-robin VolumeManager with the volumes we have found.
 	KeepVM = MakeRRVolumeManager(volumes)
 
-	// Tell the built-in HTTP server to direct all requests to the REST router.
-	loggingRouter := MakeLoggingRESTRouter()
-	http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-		loggingRouter.ServeHTTP(resp, req)
+	// Middleware stack: logger, maxRequests limiter, method handlers
+	http.Handle("/", &LoggingRESTRouter{
+		httpserver.NewRequestLimiter(maxRequests,
+			MakeRESTRouter()),
 	})
 
 	// Set up a TCP listener.
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..a93b72c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -4,7 +4,6 @@ package main
 // LoggingResponseWriter
 
 import (
-	"github.com/gorilla/mux"
 	"log"
 	"net/http"
 	"strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
 type LoggingRESTRouter struct {
-	router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
-	router := MakeRESTRouter()
-	return (&LoggingRESTRouter{router})
+	router http.Handler
 }
 
 func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list