[ARVADOS] created: cbd11b4bfd5bcc637abe0e7678239dd1e7a2fbd2
git at public.curoverse.com
git at public.curoverse.com
Fri Sep 19 16:20:33 EDT 2014
at cbd11b4bfd5bcc637abe0e7678239dd1e7a2fbd2 (commit)
commit cbd11b4bfd5bcc637abe0e7678239dd1e7a2fbd2
Author: Tim Pierce <twp at curoverse.com>
Date: Fri Sep 19 15:50:14 2014 -0400
3413: added TrashHandler
* added a trashq WorkQueue to manage the trash list
* added TrashHandler to process "PUT /trash" requests
* added TestTrashHandler
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 0cfa1f3..5528176 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -630,25 +630,25 @@ func TestPullHandler(t *testing.T) {
}
var testcases = []pullTest{
{
- "user token, good request",
+ "pull: user token, good request",
RequestTester{"/pull", user_token, "PUT", good_json},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
- "user token, bad request",
+ "pull: user token, bad request",
RequestTester{"/pull", user_token, "PUT", bad_json},
http.StatusUnauthorized,
"Unauthorized\n",
},
{
- "data manager token, good request",
+ "pull: data manager token, good request",
RequestTester{"/pull", data_manager_token, "PUT", good_json},
http.StatusOK,
"Received 3 pull requests\n",
},
{
- "data manager token, bad request",
+ "pull: data manager token, bad request",
RequestTester{"/pull", data_manager_token, "PUT", bad_json},
http.StatusBadRequest,
"Bad Request\n",
@@ -674,6 +674,112 @@ func TestPullHandler(t *testing.T) {
}
}
+// TestTrashHandler
+//
+// Test cases:
+//
+// Cases tested: syntactically valid and invalid trash lists, from the
+// data manager and from unprivileged users:
+//
+// 1. Valid trash list from an ordinary user
+// (expected result: 401 Unauthorized)
+//
+// 2. Invalid trash list from an ordinary user
+// (expected result: 401 Unauthorized)
+//
+// 3. Valid trash list from the data manager
+// (expected result: 200 OK with request body "Received 3 trash
+// requests"
+//
+// 4. Invalid trash list from the data manager
+// (expected result: 400 Bad Request)
+//
+// Test that in the end, the trash collector received a good list
+// trash list with the expected number of requests.
+//
+// TODO(twp): test concurrency: launch 100 goroutines to update the
+// pull list simultaneously. Make sure that none of them return 400
+// Bad Request and that replica.Dump() returns a valid list.
+//
+func TestTrashHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up a REST router for testing the handlers.
+ rest := MakeRESTRouter()
+
+ var user_token = "USER TOKEN"
+ data_manager_token = "DATA MANAGER TOKEN"
+
+ good_json := []byte(`[
+ {
+ "locator":"block1",
+ "block_mtime":1409082153
+ },
+ {
+ "locator":"block2",
+ "block_mtime":1409082153
+ },
+ {
+ "locator":"block3",
+ "block_mtime":1409082153
+ }
+ ]`)
+
+ bad_json := []byte(`I am not a valid JSON string`)
+
+ type trashTest struct {
+ name string
+ req RequestTester
+ response_code int
+ response_body string
+ }
+
+ var testcases = []trashTest{
+ {
+ "trash: user token, good request",
+ RequestTester{"/trash", user_token, "PUT", good_json},
+ http.StatusUnauthorized,
+ "Unauthorized\n",
+ },
+ {
+ "trash: user token, bad request",
+ RequestTester{"/trash", user_token, "PUT", bad_json},
+ http.StatusUnauthorized,
+ "Unauthorized\n",
+ },
+ {
+ "trash: data manager token, good request",
+ RequestTester{"/trash", data_manager_token, "PUT", good_json},
+ http.StatusOK,
+ "Received 3 trash requests\n",
+ },
+ {
+ "trash: data manager token, bad request",
+ RequestTester{"/trash", data_manager_token, "PUT", bad_json},
+ http.StatusBadRequest,
+ "Bad Request\n",
+ },
+ }
+
+ for _, tst := range testcases {
+ response := IssueRequest(rest, &tst.req)
+ ExpectStatusCode(t, tst.name, tst.response_code, response)
+ ExpectBody(t, tst.name, tst.response_body, response)
+ }
+
+ // The trash collector should have received one good list with 3
+ // requests on it.
+ var output_list = make([]TrashRequest, 3)
+ for i := 0; i < 3; i++ {
+ item := <-trashq.NextItem
+ if tr, ok := item.(TrashRequest); ok {
+ output_list[i] = tr
+ } else {
+ t.Errorf("item %v could not be parsed as a TrashRequest", item)
+ }
+ }
+}
+
// ====================
// Helper functions
// ====================
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index fde6087..038e812 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -59,10 +59,12 @@ func MakeRESTRouter() *mux.Router {
`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
- // The PullHandler processes "PUT /pull" commands from Data Manager.
- // It parses the JSON list of pull requests in the request body, and
- // delivers them to the pull list manager for replication.
+ // The PullHandler and TrashHandler process "PUT /pull" and "PUT
+ // /trash" requests from Data Manager. Each handler parses the
+ // JSON list of block management requests in the message body, and
+ // delivers them to the pull queue or trash queue, respectively.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+ rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
// Any request which does not match any of these routes gets
// 400 Bad Request.
@@ -478,6 +480,48 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
pullq.ReplaceQueue(plist)
}
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
+
+func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ api_token := GetApiToken(req)
+ if !IsDataManagerToken(api_token) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
+ return
+ }
+
+ // Parse the request body.
+ var trash []TrashRequest
+ r := json.NewDecoder(req.Body)
+ if err := r.Decode(&trash); err != nil {
+ http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
+ return
+ }
+
+ // We have a properly formatted trash list sent from the data
+ // manager. Report success and send the list to the trash work
+ // queue for further handling.
+ log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
+ resp.WriteHeader(http.StatusOK)
+ resp.Write([]byte(
+ fmt.Sprintf("Received %d trash requests\n", len(trash))))
+
+ tlist := list.New()
+ for _, t := range trash {
+ tlist.PushBack(t)
+ }
+
+ if trashq == nil {
+ trashq = NewWorkQueue()
+ }
+ trashq.ReplaceQueue(tlist)
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 2437638..d6eb6b2 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -91,12 +91,11 @@ func (e *KeepError) Error() string {
// Initialized by the --volumes flag (or by FindKeepVolumes).
var KeepVM VolumeManager
-// The pull list queue is a singleton pull list (a list of blocks
-// that the current keepstore process should be pulling from remote
-// keepstore servers in order to increase data replication) with
-// atomic update methods that are safe to use from multiple
-// goroutines.
+// The pull list manager and trash queue are threadsafe queues which
+// support atomic update operations. The PullHandler and TrashHandler
+// store results from Data Manager /pull and /trash requests here.
var pullq *WorkQueue
+var trashq *WorkQueue
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list