[ARVADOS] updated: 4749496bc249d340da93047dd25e3be5b610279e

Git user git at public.curoverse.com
Wed Apr 27 18:30:25 EDT 2016


Summary of changes:
 build/run-tests.sh                           |  1 +
 sdk/go/httpserver/connection_limiter_test.go | 64 +++++++++++++++++++---------
 2 files changed, 46 insertions(+), 19 deletions(-)

  discards  7084a7602d84ac20e72d281aa9e59724745428c4 (commit)
       via  4749496bc249d340da93047dd25e3be5b610279e (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (7084a7602d84ac20e72d281aa9e59724745428c4)
            \
             N -- N -- N (4749496bc249d340da93047dd25e3be5b610279e)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4749496bc249d340da93047dd25e3be5b610279e
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Apr 27 18:30:12 2016 -0400

    9066: Add keepstore -max-clients argument.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 53df93c..bbd8898 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -81,6 +81,7 @@ sdk/python
 sdk/ruby
 sdk/go/arvadosclient
 sdk/go/keepclient
+sdk/go/httpserver
 sdk/go/manifest
 sdk/go/blockdigest
 sdk/go/streamer
diff --git a/sdk/go/httpserver/connection_limiter.go b/sdk/go/httpserver/connection_limiter.go
new file mode 100644
index 0000000..f49e49c
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter.go
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+	"net/http"
+)
+
+type limiterHandler struct {
+         clients chan struct{}
+         handler http.Handler
+}
+
+func NewConnectionLimiter(maxClients int, handler http.Handler) http.Handler {
+        return &limiterHandler{
+                clients: make(chan struct{}, maxClients),
+                handler: handler,
+        }
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+        select {
+        case h.clients <- struct{}{}:
+        default:
+               // reached max clients
+               resp.WriteHeader(http.StatusServiceUnavailable)
+               return
+        }
+        h.handler.ServeHTTP(resp, req)
+        <- h.clients
+}
diff --git a/sdk/go/httpserver/connection_limiter_test.go b/sdk/go/httpserver/connection_limiter_test.go
new file mode 100644
index 0000000..bb610f1
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter_test.go
@@ -0,0 +1,85 @@
+package httpserver
+
+import (
+	"net/http"
+	"net/http/httptest"
+	"sync"
+	"testing"
+)
+
+type testHandler struct {
+	inHandler  chan struct{}
+	okToProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	h.inHandler <- struct{}{}
+	<-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+	return &testHandler{
+		inHandler:  make(chan struct{}),
+		okToProceed: make(chan struct{}),
+	}
+}
+
+func TestConnectionLimiter1(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewConnectionLimiter(1, h)
+	var wg sync.WaitGroup
+	resps := make([]*httptest.ResponseRecorder, 10)
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		resps[i] = httptest.NewRecorder()
+		go func(i int) {
+			l.ServeHTTP(resps[i], &http.Request{})
+			wg.Done()
+		}(i)
+	}
+	// Make sure one request has entered the handler
+	<- h.inHandler
+	// Make sure all unsuccessful requests finish (but don't wait
+	// for the one that's still waiting for okToProceed)
+	wg.Add(-1)
+	wg.Wait()
+	// Wait for the last goroutine
+	wg.Add(1)
+	h.okToProceed <- struct{}{}
+	wg.Wait()
+	n200 := 0
+	n503 := 0
+	for i := 0; i < 10; i++ {
+		switch resps[i].Code {
+		case 200:
+			n200++
+		case 503:
+			n503++
+		default:
+			t.Fatalf("Unexpected response code %d", resps[i].Code)
+		}
+	}
+	if n200 != 1 || n503 != 9 {
+		t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+	}
+}
+
+func TestConnectionLimiter10(t *testing.T) {
+	h := newTestHandler(10)
+	l := NewConnectionLimiter(10, h)
+	var wg sync.WaitGroup
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+			wg.Done()
+		}()
+		// Make sure the handler starts before we initiate the
+		// next request, but don't let it finish yet.
+		<- h.inHandler
+	}
+	for i := 0; i < 10; i++ {
+		h.okToProceed <- struct{}{}
+	}
+	wg.Wait()
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..33d585a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
 	}
-	loggingRouter := MakeLoggingRESTRouter()
+	loggingRouter := MakeRESTRouter()
 	loggingRouter.ServeHTTP(response, req)
 	return response
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..57ac557 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -5,6 +5,7 @@ import (
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/httpserver"
 	"io/ioutil"
 	"log"
 	"net"
@@ -145,6 +146,7 @@ func main() {
 		blobSigningKeyFile   string
 		permissionTTLSec     int
 		pidfile              string
+		maxClients           int
 	)
 	flag.StringVar(
 		&dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
 		"listen",
 		DefaultAddr,
 		"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+	flag.IntVar(
+		&maxClients,
+		"max-clients",
+		0,
+		"Maximum concurrent client connections. Respond 503 to new requests when this limit is reached.")
 	flag.BoolVar(
 		&neverDelete,
 		"never-delete",
@@ -302,13 +309,18 @@ func main() {
 		}
 	}
 
+	if maxClients <= 0 {
+		maxClients = maxBuffers * 2
+		log.Printf("-max-clients <1 or not specified; defaulting to maxBuffers * 2 == %d", maxClients)
+	}
+
 	// Start a round-robin VolumeManager with the volumes we have found.
 	KeepVM = MakeRRVolumeManager(volumes)
 
-	// Tell the built-in HTTP server to direct all requests to the REST router.
-	loggingRouter := MakeLoggingRESTRouter()
-	http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-		loggingRouter.ServeHTTP(resp, req)
+	// Middleware stack: logger, maxClients limiter, method handlers
+	http.Handle("/", &LoggingRESTRouter{
+		httpserver.NewConnectionLimiter(maxClients,
+			MakeRESTRouter()),
 	})
 
 	// Set up a TCP listener.
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..a93b72c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -4,7 +4,6 @@ package main
 // LoggingResponseWriter
 
 import (
-	"github.com/gorilla/mux"
 	"log"
 	"net/http"
 	"strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
 type LoggingRESTRouter struct {
-	router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
-	router := MakeRESTRouter()
-	return (&LoggingRESTRouter{router})
+	router http.Handler
 }
 
 func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list