[ARVADOS] created: 2ad96e9da0e9a2d11b821e64ad541174092c8952

git at public.curoverse.com git at public.curoverse.com
Tue Aug 26 16:58:17 EDT 2014


        at  2ad96e9da0e9a2d11b821e64ad541174092c8952 (commit)


commit 2ad96e9da0e9a2d11b821e64ad541174092c8952
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Aug 26 16:39:51 2014 -0400

    3413: add TrashHandler, package trash
    
    Added a trash collector interface for handling the trash list from Data
    Manager.
    
    "PUT /trash" requests are routed to the TrashHandler, which verifies
    that the request was sent by Data Manager and consists of a valid trash
    list.  The TrashHandler sends the list to a trash.Collector goroutine
    for handling.
    
    TestTrashHandler simulates trash requests from the data manager and from
    an ordinary user, using syntactically valid and invalid trash lists.  It
    verifies that the trash collector goroutine received at least one valid
    list of the correct size.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 200e1b1..068f0fe 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -566,6 +566,103 @@ func TestDeleteHandler(t *testing.T) {
 	}
 }
 
+// TestTrashHandler
+//
+// Test cases:
+//
+// Cases tested: syntactically valid and invalid trash lists, from the
+// data manager and from unprivileged users:
+//
+//   1. Valid trash list from an ordinary user
+//      (expected result: 401 Unauthorized)
+//
+//   2. Invalid trash list from an ordinary user
+//      (expected result: 401 Unauthorized)
+//
+//   3. Valid trash list from the data manager
+//      (expected result: 200 OK with request body "Received 3 trash
+//      requests"
+//
+//   4. Invalid trash list from the data manager
+//      (expected result: 400 Bad Request)
+//
+// Test that in the end, the trash collector received a good list
+// trash list with the expected number of requests.
+//
+// TODO(twp): test concurrency: launch 100 goroutines to update the
+// pull list simultaneously.  Make sure that none of them return 400
+// Bad Request and that replica.Dump() returns a valid list.
+//
+func TestTrashHandler(t *testing.T) {
+	defer teardown()
+
+	// Set up a REST router for testing the handlers.
+	rest := MakeRESTRouter()
+
+	var user_token = "USER TOKEN"
+	data_manager_token = "DATA MANAGER TOKEN"
+
+	good_json := []byte(`{
+		"expiration_time":1409082153,
+		"trash_blocks":[
+			"block1",
+			"block2",
+			"block3"
+		]
+	}`)
+
+	bad_json := []byte(`I am not a valid JSON string`)
+
+	type trashTest struct {
+		name          string
+		req           RequestTester
+		response_code int
+		response_body string
+	}
+
+	var testcases = []trashTest{
+		{
+			"user token, good request",
+			RequestTester{"/trash", user_token, "PUT", good_json},
+			http.StatusUnauthorized,
+			"Unauthorized\n",
+		},
+		{
+			"user token, bad request",
+			RequestTester{"/trash", user_token, "PUT", bad_json},
+			http.StatusUnauthorized,
+			"Unauthorized\n",
+		},
+		{
+			"data manager token, good request",
+			RequestTester{"/trash", data_manager_token, "PUT", good_json},
+			http.StatusOK,
+			"Received 3 trash requests\n",
+		},
+		{
+			"data manager token, bad request",
+			RequestTester{"/trash", data_manager_token, "PUT", bad_json},
+			http.StatusBadRequest,
+			"Bad Request\n",
+		},
+	}
+
+	for _, tst := range testcases {
+		response := IssueRequest(rest, &tst.req)
+		ExpectStatusCode(t, tst.name, tst.response_code, response)
+		ExpectBody(t, tst.name, tst.response_body, response)
+	}
+
+	// The trash collector should have received one good list with 3
+	// requests on it.
+	var saved_trash_list = trashbin.Dump()
+	if len(saved_trash_list.TrashBlocks) != 3 {
+		t.Errorf(
+			"saved_trash_list: expected 3 elements, got %d\nsaved_trash_list = %v",
+			len(saved_trash_list.TrashBlocks), saved_trash_list)
+	}
+}
+
 // ====================
 // Helper functions
 // ====================
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 039b2ac..170e547 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -13,6 +13,7 @@ import (
 	"crypto/md5"
 	"encoding/json"
 	"fmt"
+	"git.curoverse.com/arvados.git/services/keepstore/trash"
 	"github.com/gorilla/mux"
 	"io"
 	"log"
@@ -58,6 +59,12 @@ func MakeRESTRouter() *mux.Router {
 		`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
 	rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
+	// The TrashHandler processes "PUT /trash" commands from Data
+	// Manager.  It parses the JSON list of blocks to be trashed in
+	// the request body, and delivers them to the TrashCollector
+	// goroutine for replication.
+	rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+
 	// Any request which does not match any of these routes gets
 	// 400 Bad Request.
 	rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -397,6 +404,58 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 }
 
+/* TrashHandler processes "PUT /trash" requests for the data manager.
+   The request body is a JSON message containing a trash list
+   in the following format:
+
+   {
+       "expiration_time":1409082153
+       "trash_blocks":[
+           "e4d909c290d0fb1ca068ffaddf22cbd0",
+           "55ae4d45d2db0793d53f03e805f656e5",
+           "1fd08fc162a5c6413070a8bd0bffc818",
+           ...
+       ]
+   }
+
+   The expiration_time field is a Unix timestamp signifying when this
+   trash list expires. After that time, this list should be discarded.
+
+   If the request has not been sent by the Data Manager, return 401
+   Unauthorized.
+
+   If the JSON unmarshalling fails, return 400 Bad Request.
+*/
+
+func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+	// Reject unauthorized requests.
+	api_token := GetApiToken(req)
+	if !IsDataManagerToken(api_token) {
+		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+		return
+	}
+
+	// Parse the request body.
+	var tlist trash.List
+	r := json.NewDecoder(req.Body)
+	if err := r.Decode(&tlist); err != nil {
+		http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+		return
+	}
+
+	// We have a properly formatted trash list sent from the data
+	// manager.  Report success and send the list to the keep
+	// replicator for further handling.
+	resp.WriteHeader(http.StatusOK)
+	resp.Write([]byte(
+		fmt.Sprintf("Received %d trash requests\n", len(tlist.TrashBlocks))))
+
+	if trashbin == nil {
+		trashbin = trash.New()
+	}
+	trashbin.Start(tlist)
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
@@ -617,3 +676,9 @@ func CanDelete(api_token string) bool {
 	// has unlimited scope
 	return false
 }
+
+// IsDataManagerToken returns true if api_token represents the data
+// manager's token.
+func IsDataManagerToken(api_token string) bool {
+	return data_manager_token != "" && api_token == data_manager_token
+}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 9556185..1260a74 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"flag"
 	"fmt"
+	"git.curoverse.com/arvados.git/services/keepstore/trash"
 	"io/ioutil"
 	"log"
 	"net"
@@ -65,8 +66,13 @@ type KeepError struct {
 	ErrMsg   string
 }
 
+// The trashbin is responsible for processing the trash list sent by
+// Data Manager via the PUT /trash request.
+var trashbin *trash.Collector
+
 var (
 	BadRequestError     = &KeepError{400, "Bad Request"}
+	UnauthorizedError   = &KeepError{401, "Unauthorized"}
 	CollisionError      = &KeepError{500, "Collision"}
 	RequestHashError    = &KeepError{422, "Hash mismatch in request"}
 	PermissionError     = &KeepError{403, "Forbidden"}
diff --git a/services/keepstore/trash/trash.go b/services/keepstore/trash/trash.go
new file mode 100644
index 0000000..4c478c2
--- /dev/null
+++ b/services/keepstore/trash/trash.go
@@ -0,0 +1,76 @@
+package trash
+
+/* The Keep trash collector processes the trash list sent
+   by Data Manager.
+
+   The interface is:
+
+   trash.New() launches a trash collection goroutine and returns the
+   new trash.Collector object
+
+   trash.Start(trashlist) sends a trash list to the collector.
+
+   trash.Dump() reports the collector's current trash list.
+
+   trash.Close() shuts down the trash collector.
+*/
+
+type List struct {
+	ExpirationTime int      `json:"expiration_time"`
+	TrashBlocks    []string `json:"trash_blocks"`
+}
+
+type Collector struct {
+	queue chan List
+	dump  chan List
+}
+
+// New returns a new trash.Collector.  It launches a goroutine that
+// waits for a list of blocks to trash.
+//
+func New() *Collector {
+	c := Collector{
+		make(chan List),
+		make(chan List),
+	}
+	go c.listen()
+	return &c
+}
+
+// Start sends a new trash list to the trash collector goroutine.  The
+// collector will discard any old trash list and replace it with the
+// new one.
+func (c *Collector) Start(trashlist List) {
+	c.queue <- trashlist
+}
+
+// Dump reports the contents of the current trash list.
+func (c *Collector) Dump() List {
+	return <-c.dump
+}
+
+// Close shuts down the trash collector.
+//
+func (c *Collector) Close() {
+	close(c.queue)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+func (c *Collector) listen() {
+	var current List
+	for {
+		select {
+		case newlist, ok := <-c.queue:
+			if ok {
+				current = newlist
+			} else {
+				// The input channel is closed; time to shut down
+				close(c.dump)
+				return
+			}
+		case c.dump <- current:
+			// no-op
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list