[ARVADOS] updated: 4749496bc249d340da93047dd25e3be5b610279e
Git user
git at public.curoverse.com
Wed Apr 27 18:30:25 EDT 2016
Summary of changes:
build/run-tests.sh | 1 +
sdk/go/httpserver/connection_limiter_test.go | 64 +++++++++++++++++++---------
2 files changed, 46 insertions(+), 19 deletions(-)
discards 7084a7602d84ac20e72d281aa9e59724745428c4 (commit)
via 4749496bc249d340da93047dd25e3be5b610279e (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (7084a7602d84ac20e72d281aa9e59724745428c4)
\
N -- N -- N (4749496bc249d340da93047dd25e3be5b610279e)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 4749496bc249d340da93047dd25e3be5b610279e
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Apr 27 18:30:12 2016 -0400
9066: Add keepstore -max-clients argument.
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 53df93c..bbd8898 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -81,6 +81,7 @@ sdk/python
sdk/ruby
sdk/go/arvadosclient
sdk/go/keepclient
+sdk/go/httpserver
sdk/go/manifest
sdk/go/blockdigest
sdk/go/streamer
diff --git a/sdk/go/httpserver/connection_limiter.go b/sdk/go/httpserver/connection_limiter.go
new file mode 100644
index 0000000..f49e49c
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter.go
@@ -0,0 +1,29 @@
+package httpserver
+
+import (
+ "net/http"
+)
+
+type limiterHandler struct {
+ clients chan struct{}
+ handler http.Handler
+}
+
+func NewConnectionLimiter(maxClients int, handler http.Handler) http.Handler {
+ return &limiterHandler{
+ clients: make(chan struct{}, maxClients),
+ handler: handler,
+ }
+}
+
+func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ select {
+ case h.clients <- struct{}{}:
+ default:
+ // reached max clients
+ resp.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ h.handler.ServeHTTP(resp, req)
+ <- h.clients
+}
diff --git a/sdk/go/httpserver/connection_limiter_test.go b/sdk/go/httpserver/connection_limiter_test.go
new file mode 100644
index 0000000..bb610f1
--- /dev/null
+++ b/sdk/go/httpserver/connection_limiter_test.go
@@ -0,0 +1,85 @@
+package httpserver
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "sync"
+ "testing"
+)
+
+type testHandler struct {
+ inHandler chan struct{}
+ okToProceed chan struct{}
+}
+
+func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ h.inHandler <- struct{}{}
+ <-h.okToProceed
+}
+
+func newTestHandler(maxReqs int) *testHandler {
+ return &testHandler{
+ inHandler: make(chan struct{}),
+ okToProceed: make(chan struct{}),
+ }
+}
+
+func TestConnectionLimiter1(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewConnectionLimiter(1, h)
+ var wg sync.WaitGroup
+ resps := make([]*httptest.ResponseRecorder, 10)
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ resps[i] = httptest.NewRecorder()
+ go func(i int) {
+ l.ServeHTTP(resps[i], &http.Request{})
+ wg.Done()
+ }(i)
+ }
+ // Make sure one request has entered the handler
+ <- h.inHandler
+ // Make sure all unsuccessful requests finish (but don't wait
+ // for the one that's still waiting for okToProceed)
+ wg.Add(-1)
+ wg.Wait()
+ // Wait for the last goroutine
+ wg.Add(1)
+ h.okToProceed <- struct{}{}
+ wg.Wait()
+ n200 := 0
+ n503 := 0
+ for i := 0; i < 10; i++ {
+ switch resps[i].Code {
+ case 200:
+ n200++
+ case 503:
+ n503++
+ default:
+ t.Fatalf("Unexpected response code %d", resps[i].Code)
+ }
+ }
+ if n200 != 1 || n503 != 9 {
+ t.Fatalf("Got %d 200 responses, %d 503 responses (expected 1, 9)", n200, n503)
+ }
+}
+
+func TestConnectionLimiter10(t *testing.T) {
+ h := newTestHandler(10)
+ l := NewConnectionLimiter(10, h)
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ l.ServeHTTP(httptest.NewRecorder(), &http.Request{})
+ wg.Done()
+ }()
+ // Make sure the handler starts before we initiate the
+ // next request, but don't let it finish yet.
+ <- h.inHandler
+ }
+ for i := 0; i < 10; i++ {
+ h.okToProceed <- struct{}{}
+ }
+ wg.Wait()
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..33d585a 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -814,7 +814,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeLoggingRESTRouter()
+ loggingRouter := MakeRESTRouter()
loggingRouter.ServeHTTP(response, req)
return response
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..57ac557 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"io/ioutil"
"log"
"net"
@@ -145,6 +146,7 @@ func main() {
blobSigningKeyFile string
permissionTTLSec int
pidfile string
+ maxClients int
)
flag.StringVar(
&dataManagerTokenFile,
@@ -162,6 +164,11 @@ func main() {
"listen",
DefaultAddr,
"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+ flag.IntVar(
+ &maxClients,
+ "max-clients",
+ 0,
+ "Maximum concurrent client connections. Respond 503 to new requests when this limit is reached.")
flag.BoolVar(
&neverDelete,
"never-delete",
@@ -302,13 +309,18 @@ func main() {
}
}
+ if maxClients <= 0 {
+ maxClients = maxBuffers * 2
+ log.Printf("-max-clients <1 or not specified; defaulting to maxBuffers * 2 == %d", maxClients)
+ }
+
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(volumes)
- // Tell the built-in HTTP server to direct all requests to the REST router.
- loggingRouter := MakeLoggingRESTRouter()
- http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
- loggingRouter.ServeHTTP(resp, req)
+ // Middleware stack: logger, maxClients limiter, method handlers
+ http.Handle("/", &LoggingRESTRouter{
+ httpserver.NewConnectionLimiter(maxClients,
+ MakeRESTRouter()),
})
// Set up a TCP listener.
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..a93b72c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -4,7 +4,6 @@ package main
// LoggingResponseWriter
import (
- "github.com/gorilla/mux"
"log"
"net/http"
"strings"
@@ -44,13 +43,7 @@ func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
// LoggingRESTRouter is used to add logging capabilities to mux.Router
type LoggingRESTRouter struct {
- router *mux.Router
-}
-
-// MakeLoggingRESTRouter initializes LoggingRESTRouter
-func MakeLoggingRESTRouter() *LoggingRESTRouter {
- router := MakeRESTRouter()
- return (&LoggingRESTRouter{router})
+ router http.Handler
}
func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list