[ARVADOS] created: b185fc94a543b5b1361497c8502e876d6fdc2838
git at public.curoverse.com
git at public.curoverse.com
Tue Aug 26 15:17:45 EDT 2014
at b185fc94a543b5b1361497c8502e876d6fdc2838 (commit)
commit b185fc94a543b5b1361497c8502e876d6fdc2838
Author: Tim Pierce <twp at curoverse.com>
Date: Fri Aug 22 11:12:53 2014 -0400
3414: adding PullHandler and Replicator
Added PullHandler and a "replicator" package to handle "PUT /pull" requests.
PUT /pull requests are routed to PullHandler, which authenticates the
request and validates the JSON in the request body. Valid requests are
sent to the replicator.
The Keepstore replicator runs a goroutine which repeatedly listens on
its input channel for a new pull list.
TestPullHandler tests each combination of: request from superuser;
request from ordinary user; properly formatted pull request; improperly
formatted pull request. It checks the state of the replicator when done
to make sure that it has the expected number of pull requests.
3414: add replicator.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 200e1b1..2b1f7cd 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -566,6 +566,111 @@ func TestDeleteHandler(t *testing.T) {
}
}
+// TestPullHandler
+//
+// Test handling of the PUT /pull statement.
+//
+// Cases tested: syntactically valid and invalid pull lists, from the
+// data manager and from unprivileged users:
+//
+// 1. Valid pull list from an ordinary user
+// (expected result: 401 Unauthorized)
+//
+// 2. Invalid pull request from an ordinary user
+// (expected result: 401 Unauthorized)
+//
+// 3. Valid pull request from the data manager
+// (expected result: 200 OK with request body "Received 3 pull
+// requests"
+//
+// 4. Invalid pull request from the data manager
+// (expected result: 400 Bad Request)
+//
+// Test that in the end, the replicator received a good pull list with
+// the expected number of requests.
+//
+// TODO(twp): test concurrency: launch 100 goroutines to update the
+// pull list simultaneously. Make sure that none of them return 400
+// Bad Request and that replica.Dump() returns a valid list.
+//
+func TestPullHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up a REST router for testing the handlers.
+ rest := MakeRESTRouter()
+
+ var user_token = "USER TOKEN"
+ data_manager_token = "DATA MANAGER TOKEN"
+
+ good_json := []byte(`[
+ {
+ "locator":"locator_with_two_servers",
+ "servers":[
+ "server1",
+ "server2"
+ ]
+ },
+ {
+ "locator":"locator_with_no_servers",
+ "servers":[]
+ },
+ {
+ "locator":"",
+ "servers":["empty_locator"]
+ }
+ ]`)
+
+ bad_json := []byte(`{ "key":"I'm a little teapot" }`)
+
+ type pullTest struct {
+ name string
+ req RequestTester
+ response_code int
+ response_body string
+ }
+ var testcases = []pullTest{
+ {
+ "user token, good request",
+ RequestTester{"/pull", user_token, "PUT", good_json},
+ http.StatusUnauthorized,
+ "Unauthorized\n",
+ },
+ {
+ "user token, bad request",
+ RequestTester{"/pull", user_token, "PUT", bad_json},
+ http.StatusUnauthorized,
+ "Unauthorized\n",
+ },
+ {
+ "data manager token, good request",
+ RequestTester{"/pull", data_manager_token, "PUT", good_json},
+ http.StatusOK,
+ "Received 3 pull requests\n",
+ },
+ {
+ "data manager token, bad request",
+ RequestTester{"/pull", data_manager_token, "PUT", bad_json},
+ http.StatusBadRequest,
+ "Bad Request\n",
+ },
+ }
+
+ for _, tst := range testcases {
+ response := IssueRequest(rest, &tst.req)
+ ExpectStatusCode(t, tst.name, tst.response_code, response)
+ ExpectBody(t, tst.name, tst.response_body, response)
+ }
+
+ // The Keep replicator should have received one good list with 3
+ // requests on it.
+ var saved_pull_list = replica.Dump()
+ if len(saved_pull_list) != 3 {
+ t.Errorf(
+ "saved_pull_list: expected 3 elements, got %d\nsaved_pull_list = %v",
+ len(saved_pull_list), saved_pull_list)
+ }
+}
+
// ====================
// Helper functions
// ====================
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 039b2ac..9d4c617 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -13,6 +13,7 @@ import (
"crypto/md5"
"encoding/json"
"fmt"
+ "git.curoverse.com/arvados.git/services/keepstore/replicator"
"github.com/gorilla/mux"
"io"
"log"
@@ -58,6 +59,11 @@ func MakeRESTRouter() *mux.Router {
`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
+ // The PullHandler processes "PUT /pull" commands from Data Manager.
+ // It parses the JSON list of pull requests in the request body, and
+ // delivers them to the PullBlocks goroutine for replication.
+ rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -397,6 +403,68 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
}
}
+/* PullHandler processes "PUT /pull" requests for the data manager.
+ The request body is a JSON message containing a list of pull
+ requests in the following format:
+
+ [
+ {
+ "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
+ "servers":[
+ "keep0.qr1hi.arvadosapi.com:25107",
+ "keep1.qr1hi.arvadosapi.com:25108"
+ ]
+ },
+ {
+ "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
+ "servers":[
+ "10.0.1.5:25107",
+ "10.0.1.6:25107",
+ "10.0.1.7:25108"
+ ]
+ },
+ ...
+ ]
+
+ Each pull request in the list consists of a block locator string
+ and an ordered list of servers. Keepstore should try to fetch the
+ block from each server in turn.
+
+ If the request has not been sent by the Data Manager, return 401
+ Unauthorized.
+
+ If the JSON unmarshalling fails, return 400 Bad Request.
+*/
+
+func PullHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ api_token := GetApiToken(req)
+ if !IsDataManagerToken(api_token) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ // Parse the request body.
+ var pull_list []replicator.PullRequest
+ r := json.NewDecoder(req.Body)
+ if err := r.Decode(&pull_list); err != nil {
+ http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ return
+ }
+
+ // We have a properly formatted pull list sent from the data
+ // manager. Report success and send the list to the keep
+ // replicator for further handling.
+ resp.WriteHeader(http.StatusOK)
+ resp.Write([]byte(
+ fmt.Sprintf("Received %d pull requests\n", len(pull_list))))
+
+ if replica == nil {
+ replica = replicator.New()
+ }
+ replica.Pull(pull_list)
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
@@ -606,10 +674,7 @@ func CanDelete(api_token string) bool {
}
// Blocks may be deleted only when Keep has been configured with a
// data manager.
- if data_manager_token == "" {
- return false
- }
- if api_token == data_manager_token {
+ if IsDataManagerToken(api_token) {
return true
}
// TODO(twp): look up api_token with the API server
@@ -617,3 +682,9 @@ func CanDelete(api_token string) bool {
// has unlimited scope
return false
}
+
+// IsDataManagerToken returns true if api_token represents the data
+// manager's token.
+func IsDataManagerToken(api_token string) bool {
+ return data_manager_token != "" && api_token == data_manager_token
+}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 9556185..f1e7aa9 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/services/keepstore/replicator"
"io/ioutil"
"log"
"net"
@@ -34,10 +35,6 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
var PROC_MOUNTS = "/proc/mounts"
-// The Keep VolumeManager maintains a list of available volumes.
-// Initialized by the --volumes flag (or by FindKeepVolumes).
-var KeepVM VolumeManager
-
// enforce_permissions controls whether permission signatures
// should be enforced (affecting GET and DELETE requests).
// Initialized by the --enforce-permissions flag.
@@ -67,6 +64,7 @@ type KeepError struct {
var (
BadRequestError = &KeepError{400, "Bad Request"}
+ UnauthorizedError = &KeepError{401, "Unauthorized"}
CollisionError = &KeepError{500, "Collision"}
RequestHashError = &KeepError{422, "Hash mismatch in request"}
PermissionError = &KeepError{403, "Forbidden"}
@@ -83,6 +81,23 @@ func (e *KeepError) Error() string {
return e.ErrMsg
}
+// ========================
+// Internal data structures
+//
+// These global variables are used by multiple parts of the
+// program. They are good candidates for moving into their own
+// packages.
+
+// The Keep VolumeManager maintains a list of available volumes.
+// Initialized by the --volumes flag (or by FindKeepVolumes).
+var KeepVM VolumeManager
+
+// The KeepReplica is responsible for pulling blocks from other
+// Keep servers to ensure replication. When Keep receives a new
+// "pull list" from Data Manager, KeepReplica is responsible for
+// fetching blocks on the list.
+var replica *replicator.Replicator
+
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
// of command line flags (identifying Keep volumes and initializing
diff --git a/services/keepstore/replicator/replica.go b/services/keepstore/replicator/replica.go
new file mode 100644
index 0000000..b9995c1
--- /dev/null
+++ b/services/keepstore/replicator/replica.go
@@ -0,0 +1,79 @@
+package replicator
+
+/* The Keep replicator package fulfills replication pull requests sent
+ by Data Manager.
+
+ The interface is:
+
+ replicator.New() launches a replication goroutine and returns the
+ new Replicator object.
+
+ replicator.Pull() assigns a new pull list to the goroutine.
+
+ replicator.Dump() reports the goroutine's current pull list.
+
+ replicator.Close() shuts down the replicator.
+*/
+
+type PullRequest struct {
+ Locator string
+ Servers []string
+}
+
+type Replicator struct {
+ queue chan []PullRequest
+ dump chan []PullRequest
+}
+
+// New returns a new Replicator object. It launches a goroutine that
+// waits for pull requests.
+//
+func New() *Replicator {
+ r := Replicator{
+ make(chan []PullRequest),
+ make(chan []PullRequest),
+ }
+ go r.listen()
+ return &r
+}
+
+// Pull sends a new list of pull requests to the replicator goroutine.
+// The replicator will discard any outstanding pull requests and begin
+// working on the new list.
+//
+func (r *Replicator) Pull(pr []PullRequest) {
+ r.queue <- pr
+}
+
+// Dump reports the contents of the current pull list.
+func (r *Replicator) Dump() []PullRequest {
+ return <-r.dump
+}
+
+// Close shuts down the replicator and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+//
+func (r *Replicator) Close() {
+ close(r.queue)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+func (r *Replicator) listen() {
+ var current []PullRequest
+ for {
+ select {
+ case p, ok := <-r.queue:
+ if ok {
+ current = p
+ } else {
+ // The input channel is closed; time to shut down
+ close(r.dump)
+ return
+ }
+ case r.dump <- current:
+ // no-op
+ }
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list