[ARVADOS] updated: 1535c8d141b07f246a501d288a0d3a903ce41a56

Git user git at public.curoverse.com
Fri Apr 29 13:01:02 EDT 2016


Summary of changes:
 build/run-tests.sh                                |   2 +
 doc/install/install-keepstore.html.textile.liquid |   2 +
 sdk/go/httpserver/request_limiter.go              |  29 ++++++
 sdk/go/httpserver/request_limiter_test.go         | 106 ++++++++++++++++++++++
 services/keepstore/handler_test.go                |   2 +-
 services/keepstore/keepstore.go                   |  20 +++-
 services/keepstore/logging_router.go              |   9 +-
 7 files changed, 157 insertions(+), 13 deletions(-)
 create mode 100644 sdk/go/httpserver/request_limiter.go
 create mode 100644 sdk/go/httpserver/request_limiter_test.go

       via  1535c8d141b07f246a501d288a0d3a903ce41a56 (commit)
       via  0dc0c5650ddcd8376aea84d32d2b81b1cdba0946 (commit)
      from  e1de889290360f6dd5b5fdeab10cea997bcc6962 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1535c8d141b07f246a501d288a0d3a903ce41a56
Merge: e1de889 0dc0c56
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Apr 29 13:00:11 2016 -0400

    Merge branch '9066-max-requests'
    
    refs #9066


commit 0dc0c5650ddcd8376aea84d32d2b81b1cdba0946
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Apr 28 09:11:33 2016 -0400

    9066: Add keepstore -max-requests argument.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 53df93c..c94f831 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -81,6 +81,7 @@ sdk/python
 sdk/ruby
 sdk/go/arvadosclient
 sdk/go/keepclient
+sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
 sdk/go/streamer
@@ -703,6 +704,7 @@ declare -a gostuff
 gostuff=(
     sdk/go/arvadosclient
     sdk/go/blockdigest
+    sdk/go/httpserver
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
diff --git a/doc/install/install-keepstore.html.textile.liquid b/doc/install/install-keepstore.html.textile.liquid
index b211ce6..6548422 100644
--- a/doc/install/install-keepstore.html.textile.liquid
+++ b/doc/install/install-keepstore.html.textile.liquid
@@ -47,6 +47,8 @@ Usage of ./keepstore:
   -enforce-permissions=false: Enforce permission signatures on requests.
   -listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
   -max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
+  -max-requests int
+   Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
   -never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
   -permission-key-file="": Synonym for -blob-signing-key-file.
   -permission-ttl=0: Synonym for -blob-signature-ttl.
diff --git a/sdk/go/httpserver/request_limiter.go b/sdk/go/httpserver/request_limiter.go
new file mode 100644
index 0000000..178ffb9
--- /dev/null
+++ b/sdk/go/httpserver/request_limiter.go
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+	"net/http"
+)
+
+type limiterHandler struct {
+	requests chan struct{}
+	handler  http.Handler
+}
+
+func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+	return &limiterHandler{
+		requests: make(chan struct{}, maxRequests),
+		handler:  handler,
+	}
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	select {
+	case h.requests <- struct{}{}:
+	default:
+		// reached max requests
+		resp.WriteHeader(http.StatusServiceUnavailable)
+		return
+	}
+	h.handler.ServeHTTP(resp, req)
+	<-h.requests
+}
diff --git a/sdk/go/httpserver/request_limiter_test.go b/sdk/go/httpserver/request_limiter_test.go
new file mode 100644
index 0000000..a8cc806
--- /dev/null
+++ b/sdk/go/httpserver/request_limiter_test.go
@@ -0,0 +1,106 @@
+package httpserver
+
+import (
+	"net/http"
+	"net/http/httptest"
+	"sync"
+	"testing"
+	"time"
+)
+
+type testHandler struct {
+	inHandler   chan struct{}
+	okToProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	h.inHandler <- struct{}{}
+	<-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+	return &testHandler{
+		inHandler:   make(chan struct{}),
+		okToProceed: make(chan struct{}),
+	}
+}
+
+func TestRequestLimiter1(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewRequestLimiter(1, h)
+	var wg sync.WaitGroup
+	resps := make([]*httptest.ResponseRecorder, 10)
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		resps[i] = httptest.NewRecorder()
+		go func(i int) {
+			l.ServeHTTP(resps[i], &http.Request{})
+			wg.Done()
+		}(i)
+	}
+	done := make(chan struct{})
+	go func() {
+		// Make sure one request has entered the handler
+		<-h.inHandler
+		// Make sure all unsuccessful requests finish (but don't wait
+		// for the one that's still waiting for okToProceed)
+		wg.Add(-1)
+		wg.Wait()
+		// Wait for the last goroutine
+		wg.Add(1)
+		h.okToProceed <- struct{}{}
+		wg.Wait()
+		done <- struct{}{}
+	}()
+	select {
+	case <-done:
+	case <-time.After(10 * time.Second):
+		t.Fatal("test timed out, probably deadlocked")
+	}
+	n200 := 0
+	n503 := 0
+	for i := 0; i < 10; i++ {
+		switch resps[i].Code {
+		case 200:
+			n200++
+		case 503:
+			n503++
+		default:
+			t.Fatalf("Unexpected response code %d", resps[i].Code)
+		}
+	}
+	if n200 != 1 || n503 != 9 {
+		t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+	}
+	// Now that all 10 are finished, an 11th request should
+	// succeed.
+	go func() {
+		<-h.inHandler
+		h.okToProceed <- struct{}{}
+	}()
+	resp := httptest.NewRecorder()
+	l.ServeHTTP(resp, &http.Request{})
+	if resp.Code != 200 {
+		t.Errorf("Got status %d on 11th request, want 200", resp.Code)
+	}
+}
+
+func TestRequestLimiter10(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewRequestLimiter(10, h)
+	var wg sync.WaitGroup
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+			wg.Done()
+		}()
+		// Make sure the handler starts before we initiate the
+		// next request, but don't let it finish yet.
+		<-h.inHandler
+	}
+	for i := 0; i < 10; i++ {
+		h.okToProceed <- struct{}{}
+	}
+	wg.Wait()
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..33d585a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
 	}
-	loggingRouter := MakeLoggingRESTRouter()
+	loggingRouter := MakeRESTRouter()
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..93ee43c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"flag"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/httpserver"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"io/ioutil"
 	"log"
@@ -145,6 +146,7 @@ func main() {
 		blobSigningKeyFile   string
 		permissionTTLSec     int
 		pidfile              string
+		maxRequests          int
 	)
 	flag.StringVar(
 		&dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
 		"listen",
 		DefaultAddr,
 		"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+	flag.IntVar(
+		&maxRequests,
+		"max-requests",
+		0,
+		"Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
 	flag.BoolVar(
 		&neverDelete,
 		"never-delete",
@@ -302,13 +309,18 @@ func main() {
 		}
 	}
 
+	if maxRequests <= 0 {
+		maxRequests = maxBuffers * 2
+		log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+	}
+
 	// Start a round-robin VolumeManager with the volumes we have found.
 	KeepVM = MakeRRVolumeManager(volumes)
 
-	// Tell the built-in HTTP server to direct all requests to the REST router.
-	loggingRouter := MakeLoggingRESTRouter()
-	http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-		loggingRouter.ServeHTTP(resp, req)
+	// Middleware stack: logger, maxRequests limiter, method handlers
+	http.Handle("/", &LoggingRESTRouter{
+		httpserver.NewRequestLimiter(maxRequests,
+			MakeRESTRouter()),
 	})
 
 	// Set up a TCP listener.
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..a93b72c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -4,7 +4,6 @@ package main
 // LoggingResponseWriter
 
 import (
-	"github.com/gorilla/mux"
 	"log"
 	"net/http"
 	"strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
 type LoggingRESTRouter struct {
-	router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
-	router := MakeRESTRouter()
-	return (&LoggingRESTRouter{router})
+	router http.Handler
 }
 
 func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list