[ARVADOS] created: b185fc94a543b5b1361497c8502e876d6fdc2838

git at public.curoverse.com git at public.curoverse.com
Tue Aug 26 15:17:45 EDT 2014

        at  b185fc94a543b5b1361497c8502e876d6fdc2838 (commit)

commit b185fc94a543b5b1361497c8502e876d6fdc2838
Author: Tim Pierce <twp at curoverse.com>
Date:   Fri Aug 22 11:12:53 2014 -0400

    3414: adding PullHandler and Replicator
    Added PullHandler and a "replicator" package to handle "PUT /pull" requests.
    PUT /pull requests are routed to PullHandler, which authenticates the
    request and validates the JSON in the request body. Valid requests are
    sent to the replicator.
    The Keepstore replicator runs a goroutine which repeatedly listens on
    its input channel for a new pull list.
    TestPullHandler tests each combination of: request from superuser;
    request from ordinary user; properly formatted pull request; improperly
    formatted pull request.  It checks the state of the replicator when done
    to make sure that it has the expected number of pull requests.
    3414: add replicator.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 200e1b1..2b1f7cd 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -566,6 +566,111 @@ func TestDeleteHandler(t *testing.T) {
+// TestPullHandler
+// Test handling of the PUT /pull statement.
+// Cases tested: syntactically valid and invalid pull lists, from the
+// data manager and from unprivileged users:
+//   1. Valid pull list from an ordinary user
+//      (expected result: 401 Unauthorized)
+//   2. Invalid pull request from an ordinary user
+//      (expected result: 401 Unauthorized)
+//   3. Valid pull request from the data manager
+//      (expected result: 200 OK with request body "Received 3 pull
+//      requests"
+//   4. Invalid pull request from the data manager
+//      (expected result: 400 Bad Request)
+// Test that in the end, the replicator received a good pull list with
+// the expected number of requests.
+// TODO(twp): test concurrency: launch 100 goroutines to update the
+// pull list simultaneously.  Make sure that none of them return 400
+// Bad Request and that replica.Dump() returns a valid list.
+func TestPullHandler(t *testing.T) {
+	defer teardown()
+	// Set up a REST router for testing the handlers.
+	rest := MakeRESTRouter()
+	var user_token = "USER TOKEN"
+	data_manager_token = "DATA MANAGER TOKEN"
+	good_json := []byte(`[
+		{
+			"locator":"locator_with_two_servers",
+			"servers":[
+				"server1",
+				"server2"
+		 	]
+		},
+		{
+			"locator":"locator_with_no_servers",
+			"servers":[]
+		},
+		{
+			"locator":"",
+			"servers":["empty_locator"]
+		}
+	]`)
+	bad_json := []byte(`{ "key":"I'm a little teapot" }`)
+	type pullTest struct {
+		name          string
+		req           RequestTester
+		response_code int
+		response_body string
+	}
+	var testcases = []pullTest{
+		{
+			"user token, good request",
+			RequestTester{"/pull", user_token, "PUT", good_json},
+			http.StatusUnauthorized,
+			"Unauthorized\n",
+		},
+		{
+			"user token, bad request",
+			RequestTester{"/pull", user_token, "PUT", bad_json},
+			http.StatusUnauthorized,
+			"Unauthorized\n",
+		},
+		{
+			"data manager token, good request",
+			RequestTester{"/pull", data_manager_token, "PUT", good_json},
+			http.StatusOK,
+			"Received 3 pull requests\n",
+		},
+		{
+			"data manager token, bad request",
+			RequestTester{"/pull", data_manager_token, "PUT", bad_json},
+			http.StatusBadRequest,
+			"Bad Request\n",
+		},
+	}
+	for _, tst := range testcases {
+		response := IssueRequest(rest, &tst.req)
+		ExpectStatusCode(t, tst.name, tst.response_code, response)
+		ExpectBody(t, tst.name, tst.response_body, response)
+	}
+	// The Keep replicator should have received one good list with 3
+	// requests on it.
+	var saved_pull_list = replica.Dump()
+	if len(saved_pull_list) != 3 {
+		t.Errorf(
+			"saved_pull_list: expected 3 elements, got %d\nsaved_pull_list = %v",
+			len(saved_pull_list), saved_pull_list)
+	}
 // ====================
 // Helper functions
 // ====================
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 039b2ac..9d4c617 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -13,6 +13,7 @@ import (
+	"git.curoverse.com/arvados.git/services/keepstore/replicator"
@@ -58,6 +59,11 @@ func MakeRESTRouter() *mux.Router {
 		`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
 	rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
+	// The PullHandler processes "PUT /pull" commands from Data Manager.
+	// It parses the JSON list of pull requests in the request body, and
+	// delivers them to the PullBlocks goroutine for replication.
+	rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
 	// Any request which does not match any of these routes gets
 	// 400 Bad Request.
 	rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -397,6 +403,68 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
+/* PullHandler processes "PUT /pull" requests for the data manager.
+   The request body is a JSON message containing a list of pull
+   requests in the following format:
+   [
+      {
+         "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
+         "servers":[
+			"keep0.qr1hi.arvadosapi.com:25107",
+			"keep1.qr1hi.arvadosapi.com:25108"
+		 ]
+	  },
+	  {
+		 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
+		 "servers":[
+			"",
+			"",
+			""
+		 ]
+	  },
+	  ...
+   ]
+   Each pull request in the list consists of a block locator string
+   and an ordered list of servers.  Keepstore should try to fetch the
+   block from each server in turn.
+   If the request has not been sent by the Data Manager, return 401
+   Unauthorized.
+   If the JSON unmarshalling fails, return 400 Bad Request.
+func PullHandler(resp http.ResponseWriter, req *http.Request) {
+	// Reject unauthorized requests.
+	api_token := GetApiToken(req)
+	if !IsDataManagerToken(api_token) {
+		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+		return
+	}
+	// Parse the request body.
+	var pull_list []replicator.PullRequest
+	r := json.NewDecoder(req.Body)
+	if err := r.Decode(&pull_list); err != nil {
+		http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+		return
+	}
+	// We have a properly formatted pull list sent from the data
+	// manager.  Report success and send the list to the keep
+	// replicator for further handling.
+	resp.WriteHeader(http.StatusOK)
+	resp.Write([]byte(
+		fmt.Sprintf("Received %d pull requests\n", len(pull_list))))
+	if replica == nil {
+		replica = replicator.New()
+	}
+	replica.Pull(pull_list)
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
@@ -606,10 +674,7 @@ func CanDelete(api_token string) bool {
 	// Blocks may be deleted only when Keep has been configured with a
 	// data manager.
-	if data_manager_token == "" {
-		return false
-	}
-	if api_token == data_manager_token {
+	if IsDataManagerToken(api_token) {
 		return true
 	// TODO(twp): look up api_token with the API server
@@ -617,3 +682,9 @@ func CanDelete(api_token string) bool {
 	// has unlimited scope
 	return false
+// IsDataManagerToken returns true if api_token represents the data
+// manager's token.
+func IsDataManagerToken(api_token string) bool {
+	return data_manager_token != "" && api_token == data_manager_token
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 9556185..f1e7aa9 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
+	"git.curoverse.com/arvados.git/services/keepstore/replicator"
@@ -34,10 +35,6 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
 var PROC_MOUNTS = "/proc/mounts"
-// The Keep VolumeManager maintains a list of available volumes.
-// Initialized by the --volumes flag (or by FindKeepVolumes).
-var KeepVM VolumeManager
 // enforce_permissions controls whether permission signatures
 // should be enforced (affecting GET and DELETE requests).
 // Initialized by the --enforce-permissions flag.
@@ -67,6 +64,7 @@ type KeepError struct {
 var (
 	BadRequestError     = &KeepError{400, "Bad Request"}
+	UnauthorizedError   = &KeepError{401, "Unauthorized"}
 	CollisionError      = &KeepError{500, "Collision"}
 	RequestHashError    = &KeepError{422, "Hash mismatch in request"}
 	PermissionError     = &KeepError{403, "Forbidden"}
@@ -83,6 +81,23 @@ func (e *KeepError) Error() string {
 	return e.ErrMsg
+// ========================
+// Internal data structures
+// These global variables are used by multiple parts of the
+// program. They are good candidates for moving into their own
+// packages.
+// The Keep VolumeManager maintains a list of available volumes.
+// Initialized by the --volumes flag (or by FindKeepVolumes).
+var KeepVM VolumeManager
+// The KeepReplica is responsible for pulling blocks from other
+// Keep servers to ensure replication. When Keep receives a new
+// "pull list" from Data Manager, KeepReplica is responsible for
+// fetching blocks on the list.
+var replica *replicator.Replicator
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
 // of command line flags (identifying Keep volumes and initializing
diff --git a/services/keepstore/replicator/replica.go b/services/keepstore/replicator/replica.go
new file mode 100644
index 0000000..b9995c1
--- /dev/null
+++ b/services/keepstore/replicator/replica.go
@@ -0,0 +1,79 @@
+package replicator
+/* The Keep replicator package fulfills replication pull requests sent
+   by Data Manager.
+   The interface is:
+   replicator.New() launches a replication goroutine and returns the
+   new Replicator object.
+   replicator.Pull() assigns a new pull list to the goroutine.
+   replicator.Dump() reports the goroutine's current pull list.
+   replicator.Close() shuts down the replicator.
+type PullRequest struct {
+	Locator string
+	Servers []string
+type Replicator struct {
+	queue chan []PullRequest
+	dump  chan []PullRequest
+// New returns a new Replicator object.  It launches a goroutine that
+// waits for pull requests.
+func New() *Replicator {
+	r := Replicator{
+		make(chan []PullRequest),
+		make(chan []PullRequest),
+	}
+	go r.listen()
+	return &r
+// Pull sends a new list of pull requests to the replicator goroutine.
+// The replicator will discard any outstanding pull requests and begin
+// working on the new list.
+func (r *Replicator) Pull(pr []PullRequest) {
+	r.queue <- pr
+// Dump reports the contents of the current pull list.
+func (r *Replicator) Dump() []PullRequest {
+	return <-r.dump
+// Close shuts down the replicator and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+func (r *Replicator) Close() {
+	close(r.queue)
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+func (r *Replicator) listen() {
+	var current []PullRequest
+	for {
+		select {
+		case p, ok := <-r.queue:
+			if ok {
+				current = p
+			} else {
+				// The input channel is closed; time to shut down
+				close(r.dump)
+				return
+			}
+		case r.dump <- current:
+			// no-op
+		}
+	}



More information about the arvados-commits mailing list