[ARVADOS] created: b9d9b77e04adb656aa9ab37d2688aef7ce0b8b2a
git at public.curoverse.com
git at public.curoverse.com
Wed Jul 30 14:00:18 EDT 2014
at b9d9b77e04adb656aa9ab37d2688aef7ce0b8b2a (commit)
commit b9d9b77e04adb656aa9ab37d2688aef7ce0b8b2a
Author: Tim Pierce <twp at curoverse.com>
Date: Wed Jul 23 13:40:48 2014 -0400
2769: implement DELETE.
The DeleteHandler method implements DELETE requests for Keep, checking
that the user is authorized to issue DELETE and then deleting the
requested block from local volumes.
* CanDelete checks that the user with the given token is authorized to
delete blocks from Keep.
* MockVolume and UnixVolume objects provide a Delete method for deleting
a block from that volume.
* TestDeleteHandler tests:
** Unauthenticated requests
** Requests from a non-admin user
** Requests from a non-admin user for a nonexistent block
** Requests from a non-admin user for an existing block
diff --git a/services/keep/src/keep/handler_test.go b/services/keep/src/keep/handler_test.go
index 1b0a90f..cab203d 100644
--- a/services/keep/src/keep/handler_test.go
+++ b/services/keep/src/keep/handler_test.go
@@ -11,10 +11,12 @@ package main
import (
"bytes"
+ "encoding/json"
"fmt"
"github.com/gorilla/mux"
"net/http"
"net/http/httptest"
+ "os"
"regexp"
"strings"
"testing"
@@ -411,6 +413,119 @@ func TestIndexHandler(t *testing.T) {
}
}
+// TestDeleteHandler
+//
+// Cases to test:
+//
+// With no token and with a non-data-manager token:
+// * Delete existing block
+// (test for 403 Forbidden, confirm block not deleted)
+//
+// With data manager token:
+//
+// * Delete existing block
+// (test for 200 OK, response counts, confirm block deleted)
+//
+// * Delete nonexistent block
+// (test for 200 OK, response counts)
+//
+// TODO(twp):
+//
+// * Delete block on read-only volume
+// (test for 200 OK, response counts, confirm block not deleted)
+//
+func TestDeleteHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up Keep volumes and populate them.
+ // Include multiple blocks on different volumes, and
+ // some metadata files (which should be omitted from index listings)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, TEST_BLOCK)
+
+ // Set up a REST router for testing the handlers.
+ rest := MakeRESTRouter()
+
+ var user_token = "NOT DATA MANAGER TOKEN"
+ data_manager_token = "DATA MANAGER TOKEN"
+
+ unauth_req := &RequestTester{
+ method: "DELETE",
+ uri: "http://localhost:25107/" + TEST_HASH,
+ }
+
+ user_req := &RequestTester{
+ method: "DELETE",
+ uri: "http://localhost:25107/" + TEST_HASH,
+ api_token: user_token,
+ }
+
+ superuser_existing_block_req := &RequestTester{
+ method: "DELETE",
+ uri: "http://localhost:25107/" + TEST_HASH,
+ api_token: data_manager_token,
+ }
+
+ superuser_nonexistent_block_req := &RequestTester{
+ method: "DELETE",
+ uri: "http://localhost:25107/" + TEST_HASH_2,
+ api_token: data_manager_token,
+ }
+
+ // Unauthenticated request returns PermissionError.
+ var response *httptest.ResponseRecorder
+ response = IssueRequest(rest, unauth_req)
+ ExpectStatusCode(t,
+ "unauthenticated request",
+ PermissionError.HTTPCode,
+ response)
+
+ // Authenticated non-admin request returns PermissionError.
+ response = IssueRequest(rest, user_req)
+ ExpectStatusCode(t,
+ "authenticated non-admin request",
+ PermissionError.HTTPCode,
+ response)
+
+ // Authenticated admin request for nonexistent block.
+ type deletecounter struct {
+ Deleted int `json:"copies_deleted"`
+ Failed int `json:"copies_failed"`
+ }
+ var dc deletecounter
+
+ response = IssueRequest(rest, superuser_nonexistent_block_req)
+ ExpectStatusCode(t,
+ "data manager request, nonexistent block",
+ http.StatusOK,
+ response)
+ // Expect response {"copies_deleted":0,"copies_failed":0}
+ json.NewDecoder(response.Body).Decode(&dc)
+ if dc.Deleted != 0 || dc.Failed != 0 {
+ t.Errorf("superuser_nonexistent_block_req: response = %+v", dc)
+ }
+
+ // Authenticated admin request for existing block.
+ response = IssueRequest(rest, superuser_existing_block_req)
+ ExpectStatusCode(t,
+ "data manager request, existing block",
+ http.StatusOK,
+ response)
+ // Expect response {"copies_deleted":1,"copies_failed":0}
+ json.NewDecoder(response.Body).Decode(&dc)
+ if dc.Deleted != 1 || dc.Failed != 0 {
+ t.Errorf("superuser_existing_block_req: response = %+v", dc)
+ }
+ // Confirm the block has been deleted
+ _, err := vols[0].Get(TEST_HASH)
+ if !os.IsNotExist(err) {
+ t.Error("superuser_existing_block_req: block not deleted")
+ }
+}
+
// ====================
// Helper functions
// ====================
diff --git a/services/keep/src/keep/handlers.go b/services/keep/src/keep/handlers.go
index a5d5e47..234e21a 100644
--- a/services/keep/src/keep/handlers.go
+++ b/services/keep/src/keep/handlers.go
@@ -39,6 +39,7 @@ func MakeRESTRouter() *mux.Router {
GetBlockHandler).Methods("GET", "HEAD")
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
// For IndexHandler we support:
// /index - returns all locators
@@ -315,6 +316,61 @@ func GetVolumeStatus(volume string) *VolumeStatus {
return &VolumeStatus{volume, devnum, free, used}
}
+// DeleteHandler processes DELETE requests.
+//
+// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
+// from all connected volumes.
+//
+// Only the Data Manager, or an Arvados admin with scope "all", are
+// allowed to issue DELETE requests. If a DELETE request is not
+// authenticated or is issued by a non-admin user, the server returns
+// a PermissionError.
+//
+// Upon completion, DELETE returns HTTP 200 OK with a JSON message body
+// in the format
+//
+// {"copies_deleted":d,"copies_failed":f}
+//
+// where d and f are integers representing the number of blocks that
+// were successfully and unsuccessfully deleted.
+//
+func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
+ hash := mux.Vars(req)["hash"]
+
+ // Confirm that this user is an admin and has a token with unlimited scope.
+ var tok = GetApiToken(req)
+ if tok == "" || !CanDelete(tok) {
+ http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
+ return
+ }
+
+ // Delete copies of this block from all available volumes. Report
+ // how many blocks were successfully and unsuccessfully
+ // deleted.
+ var result struct {
+ Deleted int `json:"copies_deleted"`
+ Failed int `json:"copies_failed"`
+ }
+ for _, vol := range KeepVM.Volumes() {
+ if err := vol.Delete(hash); err == nil {
+ result.Deleted++
+ } else if os.IsNotExist(err) {
+ continue
+ } else {
+ result.Failed++
+ log.Println("DeleteHandler:", err)
+ }
+ }
+
+ if j, err := json.Marshal(result); err == nil {
+ resp.Write(j)
+ } else {
+ log.Printf("json.Marshal: %s\n", err)
+ log.Printf("result = %v\n", result)
+ http.Error(resp, err.Error(), 500)
+ }
+}
+
func GetBlock(hash string) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
@@ -477,3 +533,23 @@ func IsExpired(timestamp_hex string) bool {
}
return time.Unix(ts, 0).Before(time.Now())
}
+
+// CanDelete returns true if the user identified by api_token is
+// allowed to delete blocks.
+func CanDelete(api_token string) bool {
+ if api_token == "" {
+ return false
+ }
+ // Blocks may be deleted only when Keep has been configured with a
+ // data manager.
+ if data_manager_token == "" {
+ return false
+ }
+ if api_token == data_manager_token {
+ return true
+ }
+ // TODO(twp): look up api_token with the API server
+ // return true if is_admin is true and if the token
+ // has unlimited scope
+ return false
+}
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index d195686..b15ef23 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -7,14 +7,15 @@ package main
import (
"errors"
"fmt"
- "strings"
"os"
+ "strings"
)
type Volume interface {
Get(loc string) ([]byte, error)
Put(loc string, block []byte) error
Index(prefix string) string
+ Delete(loc string) error
Status() *VolumeStatus
String() string
}
@@ -61,6 +62,14 @@ func (v *MockVolume) Index(prefix string) string {
return result
}
+func (v *MockVolume) Delete(loc string) error {
+ if _, ok := v.Store[loc]; ok {
+ delete(v.Store, loc)
+ return nil
+ }
+ return os.ErrNotExist
+}
+
func (v *MockVolume) Status() *VolumeStatus {
var used uint64
for _, block := range v.Store {
diff --git a/services/keep/src/keep/volume_unix.go b/services/keep/src/keep/volume_unix.go
index aafc8de..ddddd5a 100644
--- a/services/keep/src/keep/volume_unix.go
+++ b/services/keep/src/keep/volume_unix.go
@@ -109,8 +109,7 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
// corrupted data block.
//
func (v *UnixVolume) Read(loc string) ([]byte, error) {
- blockFilename := filepath.Join(v.root, loc[0:3], loc)
- buf, err := ioutil.ReadFile(blockFilename)
+ buf, err := ioutil.ReadFile(v.blockPath(loc))
return buf, err
}
@@ -123,22 +122,22 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
if v.IsFull() {
return FullError
}
- blockDir := filepath.Join(v.root, loc[0:3])
- if err := os.MkdirAll(blockDir, 0755); err != nil {
+ bdir := v.blockDir(loc)
+ if err := os.MkdirAll(bdir, 0755); err != nil {
log.Printf("%s: could not create directory %s: %s",
- loc, blockDir, err)
+ loc, bdir, err)
return err
}
- tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+ tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
if tmperr != nil {
- log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+ log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
return tmperr
}
- blockFilename := filepath.Join(blockDir, loc)
+ bpath := v.blockPath(loc)
if _, err := tmpfile.Write(block); err != nil {
- log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
+ log.Printf("%s: writing to %s: %s\n", v, bpath, err)
return err
}
if err := tmpfile.Close(); err != nil {
@@ -146,8 +145,8 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
os.Remove(tmpfile.Name())
return err
}
- if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
- log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+ if err := os.Rename(tmpfile.Name(), bpath); err != nil {
+ log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
os.Remove(tmpfile.Name())
return err
}
@@ -230,6 +229,22 @@ func (v *UnixVolume) Index(prefix string) (output string) {
return
}
+func (v *UnixVolume) Delete(loc string) error {
+ return os.Remove(v.blockPath(loc))
+}
+
+// blockDir returns the fully qualified directory name for the directory
+// where loc is (or would be) stored on this volume.
+func (v *UnixVolume) blockDir(loc string) string {
+ return filepath.Join(v.root, loc[0:3])
+}
+
+// blockPath returns the fully qualified pathname for the path to loc
+// on this volume.
+func (v *UnixVolume) blockPath(loc string) string {
+ return filepath.Join(v.blockDir(loc), loc)
+}
+
// IsFull returns true if the free space on the volume is less than
// MIN_FREE_KILOBYTES.
//
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list