[ARVADOS] created: 7084a7602d84ac20e72d281aa9e59724745428c4
Git user
git at public.curoverse.com
Wed Apr 27 17:40:29 EDT 2016
at 7084a7602d84ac20e72d281aa9e59724745428c4 (commit)
commit 7084a7602d84ac20e72d281aa9e59724745428c4
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Apr 27 17:40:25 2016 -0400
9066: Add keepstore -max-clients argument.
diff --git a/sdk/go/httpserver/connection_limiter.go b/sdk/go/httpserver/connection_limiter.go
new file mode 100644
index 0000000..f49e49c
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter.go
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+ "net/http"
+)
+
+type limiterHandler struct {
+ clients chan struct{}
+ handler http.Handler
+}
+
+func NewConnectionLimiter(maxClients int, handler http.Handler) http.Handler {
+ return &limiterHandler{
+ clients: make(chan struct{}, maxClients),
+ handler: handler,
+ }
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ select {
+ case h.clients <- struct{}{}:
+ default:
+ // reached max clients
+ resp.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ h.handler.ServeHTTP(resp, req)
+ <- h.clients
+}
diff --git a/sdk/go/httpserver/connection_limiter_test.go b/sdk/go/httpserver/connection_limiter_test.go
new file mode 100644
index 0000000..49ad66b
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter_test.go
@@ -0,0 +1,59 @@
+package httpserver
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "testing"
+)
+
+type testHandler struct {
+ arrived chan struct{}
+ okToProceed chan struct{}
+ didProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ h.arrived <- struct{}{}
+ h.didProceed <- <-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+ return &testHandler{
+ arrived: make(chan struct{}, maxReqs+1),
+ okToProceed: make(chan struct{}),
+ didProceed: make(chan struct{}),
+ }
+}
+
+func TestConnectionLimiter1(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewConnectionLimiter(1, h)
+ for i := 0; i < 10; i++ {
+ go l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+ }
+ for i := 0; i < 10; i++ {
+ h.okToProceed <- struct{}{}
+ if arr := len(h.arrived); arr != i+1 {
+ t.Fatalf("len(h.arrived)==%d, expected %d", arr, i+1)
+ }
+ <-h.didProceed
+ }
+
+}
+
+func TestConnectionLimiter10(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewConnectionLimiter(10, h)
+ for i := 0; i < 10; i++ {
+ go l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+ }
+ for i := 0; i < 10; i++ {
+ h.okToProceed <- struct{}{}
+ }
+ if arr := len(h.arrived); arr != 10 {
+ t.Fatalf("len(h.arrived)==%d, expected %d", arr, 10)
+ }
+ for i := 0; i < 10; i++ {
+ <-h.didProceed
+ }
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..33d585a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeLoggingRESTRouter()
+ loggingRouter := MakeRESTRouter()
loggingRouter.ServeHTTP(response, req)
return response
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..57ac557 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"io/ioutil"
"log"
"net"
@@ -145,6 +146,7 @@ func main() {
blobSigningKeyFile string
permissionTTLSec int
pidfile string
+ maxClients int
)
flag.StringVar(
&dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
"listen",
DefaultAddr,
"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+ flag.IntVar(
+ &maxClients,
+ "max-clients",
+ 0,
+ "Maximum concurrent client connections. Respond 503 to new requests when this limit is reached.")
flag.BoolVar(
&neverDelete,
"never-delete",
@@ -302,13 +309,18 @@ func main() {
}
}
+ if maxClients <= 0 {
+ maxClients = maxBuffers * 2
+ log.Printf("-max-clients <1 or not specified; defaulting to maxBuffers * 2 == %d", maxClients)
+ }
+
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(volumes)
- // Tell the built-in HTTP server to direct all requests to the REST router.
- loggingRouter := MakeLoggingRESTRouter()
- http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
- loggingRouter.ServeHTTP(resp, req)
+ // Middleware stack: logger, maxClients limiter, method handlers
+ http.Handle("/", &LoggingRESTRouter{
+ httpserver.NewConnectionLimiter(maxClients,
+ MakeRESTRouter()),
})
// Set up a TCP listener.
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..a93b72c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -4,7 +4,6 @@ package main
// LoggingResponseWriter
import (
- "github.com/gorilla/mux"
"log"
"net/http"
"strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
// LoggingRESTRouter is used to add logging capabilities to mux.Router
type LoggingRESTRouter struct {
- router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
- router := MakeRESTRouter()
- return (&LoggingRESTRouter{router})
+ router http.Handler
}
func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list