[ARVADOS] created: cbd11b4bfd5bcc637abe0e7678239dd1e7a2fbd2

git at public.curoverse.com git at public.curoverse.com
Fri Sep 19 16:20:33 EDT 2014


        at  cbd11b4bfd5bcc637abe0e7678239dd1e7a2fbd2 (commit)


commit cbd11b4bfd5bcc637abe0e7678239dd1e7a2fbd2
Author: Tim Pierce <twp at curoverse.com>
Date:   Fri Sep 19 15:50:14 2014 -0400

    3413: added TrashHandler
    
    * added a trashq WorkQueue to manage the trash list
    * added TrashHandler to process "PUT /trash" requests
    * added TestTrashHandler

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 0cfa1f3..5528176 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -630,25 +630,25 @@ func TestPullHandler(t *testing.T) {
 	}
 	var testcases = []pullTest{
 		{
-			"user token, good request",
+			"pull: user token, good request",
 			RequestTester{"/pull", user_token, "PUT", good_json},
 			http.StatusUnauthorized,
 			"Unauthorized\n",
 		},
 		{
-			"user token, bad request",
+			"pull: user token, bad request",
 			RequestTester{"/pull", user_token, "PUT", bad_json},
 			http.StatusUnauthorized,
 			"Unauthorized\n",
 		},
 		{
-			"data manager token, good request",
+			"pull: data manager token, good request",
 			RequestTester{"/pull", data_manager_token, "PUT", good_json},
 			http.StatusOK,
 			"Received 3 pull requests\n",
 		},
 		{
-			"data manager token, bad request",
+			"pull: data manager token, bad request",
 			RequestTester{"/pull", data_manager_token, "PUT", bad_json},
 			http.StatusBadRequest,
 			"Bad Request\n",
@@ -674,6 +674,112 @@ func TestPullHandler(t *testing.T) {
 	}
 }
 
+// TestTrashHandler
+//
+// Test cases:
+//
+// Cases tested: syntactically valid and invalid trash lists, from the
+// data manager and from unprivileged users:
+//
+//   1. Valid trash list from an ordinary user
+//      (expected result: 401 Unauthorized)
+//
+//   2. Invalid trash list from an ordinary user
+//      (expected result: 401 Unauthorized)
+//
+//   3. Valid trash list from the data manager
+//      (expected result: 200 OK with request body "Received 3 trash
+//      requests"
+//
+//   4. Invalid trash list from the data manager
+//      (expected result: 400 Bad Request)
+//
+// Test that in the end, the trash collector received a good list
+// trash list with the expected number of requests.
+//
+// TODO(twp): test concurrency: launch 100 goroutines to update the
+// pull list simultaneously.  Make sure that none of them return 400
+// Bad Request and that replica.Dump() returns a valid list.
+//
+func TestTrashHandler(t *testing.T) {
+	defer teardown()
+
+	// Set up a REST router for testing the handlers.
+	rest := MakeRESTRouter()
+
+	var user_token = "USER TOKEN"
+	data_manager_token = "DATA MANAGER TOKEN"
+
+	good_json := []byte(`[
+		{
+			"locator":"block1",
+			"block_mtime":1409082153
+		},
+		{
+			"locator":"block2",
+			"block_mtime":1409082153
+		},
+		{
+			"locator":"block3",
+			"block_mtime":1409082153
+		}
+	]`)
+
+	bad_json := []byte(`I am not a valid JSON string`)
+
+	type trashTest struct {
+		name          string
+		req           RequestTester
+		response_code int
+		response_body string
+	}
+
+	var testcases = []trashTest{
+		{
+			"trash: user token, good request",
+			RequestTester{"/trash", user_token, "PUT", good_json},
+			http.StatusUnauthorized,
+			"Unauthorized\n",
+		},
+		{
+			"trash: user token, bad request",
+			RequestTester{"/trash", user_token, "PUT", bad_json},
+			http.StatusUnauthorized,
+			"Unauthorized\n",
+		},
+		{
+			"trash: data manager token, good request",
+			RequestTester{"/trash", data_manager_token, "PUT", good_json},
+			http.StatusOK,
+			"Received 3 trash requests\n",
+		},
+		{
+			"trash: data manager token, bad request",
+			RequestTester{"/trash", data_manager_token, "PUT", bad_json},
+			http.StatusBadRequest,
+			"Bad Request\n",
+		},
+	}
+
+	for _, tst := range testcases {
+		response := IssueRequest(rest, &tst.req)
+		ExpectStatusCode(t, tst.name, tst.response_code, response)
+		ExpectBody(t, tst.name, tst.response_body, response)
+	}
+
+	// The trash collector should have received one good list with 3
+	// requests on it.
+	var output_list = make([]TrashRequest, 3)
+	for i := 0; i < 3; i++ {
+		item := <-trashq.NextItem
+		if tr, ok := item.(TrashRequest); ok {
+			output_list[i] = tr
+		} else {
+			t.Errorf("item %v could not be parsed as a TrashRequest", item)
+		}
+	}
+}
+
 // ====================
 // Helper functions
 // ====================
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index fde6087..038e812 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -59,10 +59,12 @@ func MakeRESTRouter() *mux.Router {
 		`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
 	rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
-	// The PullHandler processes "PUT /pull" commands from Data Manager.
-	// It parses the JSON list of pull requests in the request body, and
-	// delivers them to the pull list manager for replication.
+	// The PullHandler and TrashHandler process "PUT /pull" and "PUT
+	// /trash" requests from Data Manager.  Each handler parses the
+	// JSON list of block management requests in the message body, and
+	// delivers them to the pull queue or trash queue, respectively.
 	rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+	rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
 	// Any request which does not match any of these routes gets
 	// 400 Bad Request.
@@ -478,6 +480,48 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
 	pullq.ReplaceQueue(plist)
 }
 
+type TrashRequest struct {
+	Locator    string `json:"locator"`
+	BlockMtime int64  `json:"block_mtime"`
+}
+
+func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+	// Reject unauthorized requests.
+	api_token := GetApiToken(req)
+	if !IsDataManagerToken(api_token) {
+		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+		log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
+		return
+	}
+
+	// Parse the request body.
+	var trash []TrashRequest
+	r := json.NewDecoder(req.Body)
+	if err := r.Decode(&trash); err != nil {
+		http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+		log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
+		return
+	}
+
+	// We have a properly formatted trash list sent from the data
+	// manager.  Report success and send the list to the trash work
+	// queue for further handling.
+	log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
+	resp.WriteHeader(http.StatusOK)
+	resp.Write([]byte(
+		fmt.Sprintf("Received %d trash requests\n", len(trash))))
+
+	tlist := list.New()
+	for _, t := range trash {
+		tlist.PushBack(t)
+	}
+
+	if trashq == nil {
+		trashq = NewWorkQueue()
+	}
+	trashq.ReplaceQueue(tlist)
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 2437638..d6eb6b2 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -91,12 +91,11 @@ func (e *KeepError) Error() string {
 // Initialized by the --volumes flag (or by FindKeepVolumes).
 var KeepVM VolumeManager
 
-// The pull list queue is a singleton pull list (a list of blocks
-// that the current keepstore process should be pulling from remote
-// keepstore servers in order to increase data replication) with
-// atomic update methods that are safe to use from multiple
-// goroutines.
+// The pull list manager and trash queue are threadsafe queues which
+// support atomic update operations. The PullHandler and TrashHandler
+// store results from Data Manager /pull and /trash requests here.
 var pullq *WorkQueue
+var trashq *WorkQueue
 
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list