[ARVADOS] created: b9d9b77e04adb656aa9ab37d2688aef7ce0b8b2a

git at public.curoverse.com git at public.curoverse.com
Wed Jul 30 14:00:18 EDT 2014


        at  b9d9b77e04adb656aa9ab37d2688aef7ce0b8b2a (commit)


commit b9d9b77e04adb656aa9ab37d2688aef7ce0b8b2a
Author: Tim Pierce <twp at curoverse.com>
Date:   Wed Jul 23 13:40:48 2014 -0400

    2769: implement DELETE.
    
    The DeleteHandler method implements DELETE requests for Keep, checking
    that the user is authorized to issue DELETE and then deleting the
    requested block from local volumes.
    
    * CanDelete checks that the user with the given token is authorized to
      delete blocks from Keep.
    * MockVolume and UnixVolume objects provide a Delete method for deleting
      a block from that volume.
    * TestDeleteHandler tests:
    ** Unauthenticated requests
    ** Requests from a non-admin user
    ** Requests from a non-admin user for a nonexistent block
    ** Requests from a non-admin user for an existing block

diff --git a/services/keep/src/keep/handler_test.go b/services/keep/src/keep/handler_test.go
index 1b0a90f..cab203d 100644
--- a/services/keep/src/keep/handler_test.go
+++ b/services/keep/src/keep/handler_test.go
@@ -11,10 +11,12 @@ package main
 
 import (
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"github.com/gorilla/mux"
 	"net/http"
 	"net/http/httptest"
+	"os"
 	"regexp"
 	"strings"
 	"testing"
@@ -411,6 +413,119 @@ func TestIndexHandler(t *testing.T) {
 	}
 }
 
+// TestDeleteHandler
+//
+// Cases to test:
+//
+//   With no token and with a non-data-manager token:
+//   * Delete existing block
+//     (test for 403 Forbidden, confirm block not deleted)
+//
+//   With data manager token:
+//
+//   * Delete existing block
+//     (test for 200 OK, response counts, confirm block deleted)
+//
+//   * Delete nonexistent block
+//     (test for 200 OK, response counts)
+//
+//   TODO(twp):
+//
+//   * Delete block on read-only volume
+//     (test for 200 OK, response counts, confirm block not deleted)
+//
+func TestDeleteHandler(t *testing.T) {
+	defer teardown()
+
+	// Set up Keep volumes and populate them.
+	// Include multiple blocks on different volumes, and
+	// some metadata files (which should be omitted from index listings)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
+
+	vols := KeepVM.Volumes()
+	vols[0].Put(TEST_HASH, TEST_BLOCK)
+
+	// Set up a REST router for testing the handlers.
+	rest := MakeRESTRouter()
+
+	var user_token = "NOT DATA MANAGER TOKEN"
+	data_manager_token = "DATA MANAGER TOKEN"
+
+	unauth_req := &RequestTester{
+		method: "DELETE",
+		uri:    "http://localhost:25107/" + TEST_HASH,
+	}
+
+	user_req := &RequestTester{
+		method:    "DELETE",
+		uri:       "http://localhost:25107/" + TEST_HASH,
+		api_token: user_token,
+	}
+
+	superuser_existing_block_req := &RequestTester{
+		method:    "DELETE",
+		uri:       "http://localhost:25107/" + TEST_HASH,
+		api_token: data_manager_token,
+	}
+
+	superuser_nonexistent_block_req := &RequestTester{
+		method:    "DELETE",
+		uri:       "http://localhost:25107/" + TEST_HASH_2,
+		api_token: data_manager_token,
+	}
+
+	// Unauthenticated request returns PermissionError.
+	var response *httptest.ResponseRecorder
+	response = IssueRequest(rest, unauth_req)
+	ExpectStatusCode(t,
+		"unauthenticated request",
+		PermissionError.HTTPCode,
+		response)
+
+	// Authenticated non-admin request returns PermissionError.
+	response = IssueRequest(rest, user_req)
+	ExpectStatusCode(t,
+		"authenticated non-admin request",
+		PermissionError.HTTPCode,
+		response)
+
+	// Authenticated admin request for nonexistent block.
+	type deletecounter struct {
+		Deleted int `json:"copies_deleted"`
+		Failed  int `json:"copies_failed"`
+	}
+	var dc deletecounter
+
+	response = IssueRequest(rest, superuser_nonexistent_block_req)
+	ExpectStatusCode(t,
+		"data manager request, nonexistent block",
+		http.StatusOK,
+		response)
+	// Expect response {"copies_deleted":0,"copies_failed":0}
+	json.NewDecoder(response.Body).Decode(&dc)
+	if dc.Deleted != 0 || dc.Failed != 0 {
+		t.Errorf("superuser_nonexistent_block_req: response = %+v", dc)
+	}
+
+	// Authenticated admin request for existing block.
+	response = IssueRequest(rest, superuser_existing_block_req)
+	ExpectStatusCode(t,
+		"data manager request, existing block",
+		http.StatusOK,
+		response)
+	// Expect response {"copies_deleted":1,"copies_failed":0}
+	json.NewDecoder(response.Body).Decode(&dc)
+	if dc.Deleted != 1 || dc.Failed != 0 {
+		t.Errorf("superuser_existing_block_req: response = %+v", dc)
+	}
+	// Confirm the block has been deleted
+	_, err := vols[0].Get(TEST_HASH)
+	if !os.IsNotExist(err) {
+		t.Error("superuser_existing_block_req: block not deleted")
+	}
+}
+
 // ====================
 // Helper functions
 // ====================
diff --git a/services/keep/src/keep/handlers.go b/services/keep/src/keep/handlers.go
index a5d5e47..234e21a 100644
--- a/services/keep/src/keep/handlers.go
+++ b/services/keep/src/keep/handlers.go
@@ -39,6 +39,7 @@ func MakeRESTRouter() *mux.Router {
 		GetBlockHandler).Methods("GET", "HEAD")
 
 	rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+	rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
 
 	// For IndexHandler we support:
 	//   /index           - returns all locators
@@ -315,6 +316,61 @@ func GetVolumeStatus(volume string) *VolumeStatus {
 	return &VolumeStatus{volume, devnum, free, used}
 }
 
+// DeleteHandler processes DELETE requests.
+//
+// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
+// from all connected volumes.
+//
+// Only the Data Manager, or an Arvados admin with scope "all", are
+// allowed to issue DELETE requests.  If a DELETE request is not
+// authenticated or is issued by a non-admin user, the server returns
+// a PermissionError.
+//
+// Upon completion, DELETE returns HTTP 200 OK with a JSON message body
+// in the format
+//
+//    {"copies_deleted":d,"copies_failed":f}
+//
+// where d and f are integers representing the number of blocks that
+// were successfully and unsuccessfully deleted.
+//
+func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
+	hash := mux.Vars(req)["hash"]
+
+	// Confirm that this user is an admin and has a token with unlimited scope.
+	var tok = GetApiToken(req)
+	if tok == "" || !CanDelete(tok) {
+		http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+		return
+	}
+
+	// Delete copies of this block from all available volumes.  Report
+	// how many blocks were successfully and unsuccessfully
+	// deleted.
+	var result struct {
+		Deleted int `json:"copies_deleted"`
+		Failed  int `json:"copies_failed"`
+	}
+	for _, vol := range KeepVM.Volumes() {
+		if err := vol.Delete(hash); err == nil {
+			result.Deleted++
+		} else if os.IsNotExist(err) {
+			continue
+		} else {
+			result.Failed++
+			log.Println("DeleteHandler:", err)
+		}
+	}
+
+	if j, err := json.Marshal(result); err == nil {
+		resp.Write(j)
+	} else {
+		log.Printf("json.Marshal: %s\n", err)
+		log.Printf("result = %v\n", result)
+		http.Error(resp, err.Error(), 500)
+	}
+}
+
 func GetBlock(hash string) ([]byte, error) {
 	// Attempt to read the requested hash from a keep volume.
 	error_to_caller := NotFoundError
@@ -477,3 +533,23 @@ func IsExpired(timestamp_hex string) bool {
 	}
 	return time.Unix(ts, 0).Before(time.Now())
 }
+
+// CanDelete returns true if the user identified by api_token is
+// allowed to delete blocks.
+func CanDelete(api_token string) bool {
+	if api_token == "" {
+		return false
+	}
+	// Blocks may be deleted only when Keep has been configured with a
+	// data manager.
+	if data_manager_token == "" {
+		return false
+	}
+	if api_token == data_manager_token {
+		return true
+	}
+	// TODO(twp): look up api_token with the API server
+	// return true if is_admin is true and if the token
+	// has unlimited scope
+	return false
+}
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index d195686..b15ef23 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -7,14 +7,15 @@ package main
 import (
 	"errors"
 	"fmt"
-	"strings"
 	"os"
+	"strings"
 )
 
 type Volume interface {
 	Get(loc string) ([]byte, error)
 	Put(loc string, block []byte) error
 	Index(prefix string) string
+	Delete(loc string) error
 	Status() *VolumeStatus
 	String() string
 }
@@ -61,6 +62,14 @@ func (v *MockVolume) Index(prefix string) string {
 	return result
 }
 
+func (v *MockVolume) Delete(loc string) error {
+	if _, ok := v.Store[loc]; ok {
+		delete(v.Store, loc)
+		return nil
+	}
+	return os.ErrNotExist
+}
+
 func (v *MockVolume) Status() *VolumeStatus {
 	var used uint64
 	for _, block := range v.Store {
diff --git a/services/keep/src/keep/volume_unix.go b/services/keep/src/keep/volume_unix.go
index aafc8de..ddddd5a 100644
--- a/services/keep/src/keep/volume_unix.go
+++ b/services/keep/src/keep/volume_unix.go
@@ -109,8 +109,7 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 // corrupted data block.
 //
 func (v *UnixVolume) Read(loc string) ([]byte, error) {
-	blockFilename := filepath.Join(v.root, loc[0:3], loc)
-	buf, err := ioutil.ReadFile(blockFilename)
+	buf, err := ioutil.ReadFile(v.blockPath(loc))
 	return buf, err
 }
 
@@ -123,22 +122,22 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
 	if v.IsFull() {
 		return FullError
 	}
-	blockDir := filepath.Join(v.root, loc[0:3])
-	if err := os.MkdirAll(blockDir, 0755); err != nil {
+	bdir := v.blockDir(loc)
+	if err := os.MkdirAll(bdir, 0755); err != nil {
 		log.Printf("%s: could not create directory %s: %s",
-			loc, blockDir, err)
+			loc, bdir, err)
 		return err
 	}
 
-	tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+	tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
 	if tmperr != nil {
-		log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+		log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
 		return tmperr
 	}
-	blockFilename := filepath.Join(blockDir, loc)
+	bpath := v.blockPath(loc)
 
 	if _, err := tmpfile.Write(block); err != nil {
-		log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
+		log.Printf("%s: writing to %s: %s\n", v, bpath, err)
 		return err
 	}
 	if err := tmpfile.Close(); err != nil {
@@ -146,8 +145,8 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
 		os.Remove(tmpfile.Name())
 		return err
 	}
-	if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
-		log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+	if err := os.Rename(tmpfile.Name(), bpath); err != nil {
+		log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
 		os.Remove(tmpfile.Name())
 		return err
 	}
@@ -230,6 +229,22 @@ func (v *UnixVolume) Index(prefix string) (output string) {
 	return
 }
 
+func (v *UnixVolume) Delete(loc string) error {
+	return os.Remove(v.blockPath(loc))
+}
+
+// blockDir returns the fully qualified directory name for the directory
+// where loc is (or would be) stored on this volume.
+func (v *UnixVolume) blockDir(loc string) string {
+	return filepath.Join(v.root, loc[0:3])
+}
+
+// blockPath returns the fully qualified pathname for the path to loc
+// on this volume.
+func (v *UnixVolume) blockPath(loc string) string {
+	return filepath.Join(v.blockDir(loc), loc)
+}
+
 // IsFull returns true if the free space on the volume is less than
 // MIN_FREE_KILOBYTES.
 //

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list