[ARVADOS] created: 7084a7602d84ac20e72d281aa9e59724745428c4

Git user git at public.curoverse.com
Wed Apr 27 17:40:29 EDT 2016


        at  7084a7602d84ac20e72d281aa9e59724745428c4 (commit)


commit 7084a7602d84ac20e72d281aa9e59724745428c4
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Apr 27 17:40:25 2016 -0400

    9066: Add keepstore -max-clients argument.

diff --git a/sdk/go/httpserver/connection_limiter.go b/sdk/go/httpserver/connection_limiter.go
new file mode 100644
index 0000000..f49e49c
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter.go
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+	"net/http"
+)
+
+type limiterHandler struct {
+         clients chan struct{}
+         handler http.Handler
+}
+
+func NewConnectionLimiter(maxClients int, handler http.Handler) http.Handler {
+        return &limiterHandler{
+                clients: make(chan struct{}, maxClients),
+                handler: handler,
+        }
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+        select {
+        case h.clients <- struct{}{}:
+        default:
+               // reached max clients
+               resp.WriteHeader(http.StatusServiceUnavailable)
+               return
+        }
+        h.handler.ServeHTTP(resp, req)
+        <- h.clients
+}
diff --git a/sdk/go/httpserver/connection_limiter_test.go b/sdk/go/httpserver/connection_limiter_test.go
new file mode 100644
index 0000000..49ad66b
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter_test.go
@@ -0,0 +1,59 @@
+package httpserver
+
+import (
+	"net/http"
+	"net/http/httptest"
+	"testing"
+)
+
+type testHandler struct {
+	arrived     chan struct{}
+	okToProceed chan struct{}
+	didProceed  chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	h.arrived <- struct{}{}
+	h.didProceed <- <-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+	return &testHandler{
+		arrived:     make(chan struct{}, maxReqs+1),
+		okToProceed: make(chan struct{}),
+		didProceed:  make(chan struct{}),
+	}
+}
+
+func TestConnectionLimiter1(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewConnectionLimiter(1, h)
+	for i := 0; i < 10; i++ {
+		go l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+	}
+	for i := 0; i < 10; i++ {
+		h.okToProceed <- struct{}{}
+		if arr := len(h.arrived); arr != i+1 {
+			t.Fatalf("len(h.arrived)==%d, expected %d", arr, i+1)
+		}
+		<-h.didProceed
+	}
+
+}
+
+func TestConnectionLimiter10(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewConnectionLimiter(10, h)
+	for i := 0; i < 10; i++ {
+		go l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+	}
+	for i := 0; i < 10; i++ {
+		h.okToProceed <- struct{}{}
+	}
+	if arr := len(h.arrived); arr != 10 {
+		t.Fatalf("len(h.arrived)==%d, expected %d", arr, 10)
+	}
+	for i := 0; i < 10; i++ {
+		<-h.didProceed
+	}
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..33d585a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
 	}
-	loggingRouter := MakeLoggingRESTRouter()
+	loggingRouter := MakeRESTRouter()
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..57ac557 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -5,6 +5,7 @@ import (
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/httpserver"
 	"io/ioutil"
 	"log"
 	"net"
@@ -145,6 +146,7 @@ func main() {
 		blobSigningKeyFile   string
 		permissionTTLSec     int
 		pidfile              string
+		maxClients           int
 	)
 	flag.StringVar(
 		&dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
 		"listen",
 		DefaultAddr,
 		"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+	flag.IntVar(
+		&maxClients,
+		"max-clients",
+		0,
+		"Maximum concurrent client connections. Respond 503 to new requests when this limit is reached.")
 	flag.BoolVar(
 		&neverDelete,
 		"never-delete",
@@ -302,13 +309,18 @@ func main() {
 		}
 	}
 
+	if maxClients <= 0 {
+		maxClients = maxBuffers * 2
+		log.Printf("-max-clients <1 or not specified; defaulting to maxBuffers * 2 == %d", maxClients)
+	}
+
 	// Start a round-robin VolumeManager with the volumes we have found.
 	KeepVM = MakeRRVolumeManager(volumes)
 
-	// Tell the built-in HTTP server to direct all requests to the REST router.
-	loggingRouter := MakeLoggingRESTRouter()
-	http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-		loggingRouter.ServeHTTP(resp, req)
+	// Middleware stack: logger, maxClients limiter, method handlers
+	http.Handle("/", &LoggingRESTRouter{
+		httpserver.NewConnectionLimiter(maxClients,
+			MakeRESTRouter()),
 	})
 
 	// Set up a TCP listener.
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..a93b72c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -4,7 +4,6 @@ package main
 // LoggingResponseWriter
 
 import (
-	"github.com/gorilla/mux"
 	"log"
 	"net/http"
 	"strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
 type LoggingRESTRouter struct {
-	router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
-	router := MakeRESTRouter()
-	return (&LoggingRESTRouter{router})
+	router http.Handler
 }
 
 func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list