[ARVADOS] created: 2a6e0785f8fb675038f867aebc65033dc8a85211

git at public.curoverse.com git at public.curoverse.com
Thu Jan 21 14:02:36 EST 2016


        at  2a6e0785f8fb675038f867aebc65033dc8a85211 (commit)


commit 2a6e0785f8fb675038f867aebc65033dc8a85211
Author: radhika <radhika at curoverse.com>
Date:   Thu Jan 21 13:59:36 2016 -0500

    8178: rename Delete api as Trash; add Untrash to volume interface; add UndeleteHandler and test for this endpoint.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index c0033d9..0071567 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -311,8 +311,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
 	}
 }
 
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
@@ -335,6 +335,12 @@ func (v *AzureBlobVolume) Delete(loc string) error {
 	})
 }
 
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+	return nil
+}
+
 // Status returns a VolumeStatus struct with placeholder data.
 func (v *AzureBlobVolume) Status() *VolumeStatus {
 	return &VolumeStatus{
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 3817ea1..9526c3c 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -970,3 +970,83 @@ func TestPutReplicationHeader(t *testing.T) {
 		t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
 	}
 }
+
+func TestUndeleteHandler(t *testing.T) {
+	defer teardown()
+
+	// Set up Keep volumes
+	KeepVM = MakeTestVolumeManager(2)
+	defer KeepVM.Close()
+	vols := KeepVM.AllWritable()
+	vols[0].Put(TestHash, TestBlock)
+
+	// Only the datamanager user should be allowed to undelete blocks
+	dataManagerToken = "DATA MANAGER TOKEN"
+
+	// unauthenticatedReq => UnauthorizedError
+	unauthenticatedReq := &RequestTester{
+		method: "PUT",
+		uri:    "/undelete/" + TestHash,
+	}
+	response := IssueRequest(unauthenticatedReq)
+	ExpectStatusCode(t,
+		"enforcePermissions on, unauthenticated request",
+		UnauthorizedError.HTTPCode,
+		response)
+
+	// notDataManagerReq => UnauthorizedError
+	notDataManagerReq := &RequestTester{
+		method:   "PUT",
+		uri:      "/undelete/" + TestHash,
+		apiToken: knownToken,
+	}
+
+	response = IssueRequest(notDataManagerReq)
+	ExpectStatusCode(t,
+		"permissions on, unauthenticated /index/prefix request",
+		UnauthorizedError.HTTPCode,
+		response)
+
+	// datamanagerWithBadHashReq => StatusBadRequest
+	datamanagerWithBadHashReq := &RequestTester{
+		method:   "PUT",
+		uri:      "/undelete/thisisnotalocator",
+		apiToken: dataManagerToken,
+	}
+	response = IssueRequest(datamanagerWithBadHashReq)
+	ExpectStatusCode(t,
+		"permissions on, authenticated request, non-superuser",
+		http.StatusBadRequest,
+		response)
+
+	// datamanagerWrongMethodReq => StatusBadRequest
+	datamanagerWrongMethodReq := &RequestTester{
+		method:   "GET",
+		uri:      "/undelete/" + TestHash,
+		apiToken: dataManagerToken,
+	}
+	response = IssueRequest(datamanagerWrongMethodReq)
+	ExpectStatusCode(t,
+		"permissions on, authenticated request, non-superuser",
+		http.StatusBadRequest,
+		response)
+
+	// datamanagerReq => StatusOK
+	datamanagerReq := &RequestTester{
+		method:   "PUT",
+		uri:      "/undelete/" + TestHash,
+		apiToken: dataManagerToken,
+	}
+	response = IssueRequest(datamanagerReq)
+	ExpectStatusCode(t,
+		"permissions on, authenticated request, non-superuser",
+		http.StatusOK,
+		response)
+	expected := `Untrashed on volume`
+	match, _ := regexp.MatchString(expected, response.Body.String())
+	if !match {
+		t.Errorf(
+			"Undelete response mismatched: expected %s, got:\n%s",
+			expected, response.Body.String())
+	}
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 95af1b4..f8e4d71 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -53,6 +53,9 @@ func MakeRESTRouter() *mux.Router {
 	// Replace the current trash queue.
 	rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
+	// Undelete moves blocks from trash back into store
+	rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
+
 	// Any request which does not match any of these routes gets
 	// 400 Bad Request.
 	rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -295,7 +298,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
 		Failed  int `json:"copies_failed"`
 	}
 	for _, vol := range KeepVM.AllWritable() {
-		if err := vol.Delete(hash); err == nil {
+		if err := vol.Trash(hash); err == nil {
 			result.Deleted++
 		} else if os.IsNotExist(err) {
 			continue
@@ -430,6 +433,36 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 	trashq.ReplaceQueue(tlist)
 }
 
+// UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
+	// Reject unauthorized requests.
+	if !IsDataManagerToken(GetApiToken(req)) {
+		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+		return
+	}
+
+	hash := mux.Vars(req)["hash"]
+	successResp := "Untrashed on volume: "
+	var st int
+	for _, vol := range KeepVM.AllWritable() {
+		if err := vol.Untrash(hash); err == nil {
+			log.Printf("Untrashed %v on volume %v", hash, vol.String())
+			st = http.StatusOK
+			successResp += vol.String()
+			break
+		} else {
+			log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
+			st = 500
+		}
+	}
+
+	if st == http.StatusOK {
+		resp.Write([]byte(successResp))
+	}
+
+	resp.WriteHeader(st)
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 96a887f..f6b1f6e 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -79,6 +79,7 @@ var (
 	SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
 	TooLongError        = &KeepError{413, "Block is too large"}
 	MethodDisabledError = &KeepError{405, "Method disabled"}
+	ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
 )
 
 func (e *KeepError) Error() string {
@@ -113,6 +114,7 @@ var (
 	flagSerializeIO bool
 	flagReadonly    bool
 	volumes         volumeSet
+	trashLifetime   int
 )
 
 func (vs *volumeSet) String() string {
@@ -200,6 +202,11 @@ func main() {
 		"max-buffers",
 		maxBuffers,
 		fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+	flag.IntVar(
+		&trashLifetime,
+		"trash-lifetime",
+		0,
+		fmt.Sprintf("Trashed blocks will stay in trash for trash-lifetime interval before they are actually deleted by the system."))
 
 	flag.Parse()
 
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 572ee46..16afc32 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -257,7 +257,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 	return nil
 }
 
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
@@ -272,6 +272,11 @@ func (v *S3Volume) Delete(loc string) error {
 	return v.Bucket.Del(loc)
 }
 
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+	return nil
+}
+
 func (v *S3Volume) Status() *VolumeStatus {
 	return &VolumeStatus{
 		DeviceNum: 1,
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 65e3fbd..62f63d5 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -47,7 +47,7 @@ func TrashItem(trashRequest TrashRequest) {
 		if neverDelete {
 			err = errors.New("did not delete block because neverDelete is true")
 		} else {
-			err = volume.Delete(trashRequest.Locator)
+			err = volume.Trash(trashRequest.Locator)
 		}
 
 		if err != nil {
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 7966c41..58710c0 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -144,20 +144,21 @@ type Volume interface {
 	// particular order.
 	IndexTo(prefix string, writer io.Writer) error
 
-	// Delete deletes the block data from the underlying storage
-	// device.
+	// Trash moves the block data from the underlying storage
+	// device to trash area. The block then stays in trash for
+	// -trash-lifetime interval before it is actually deleted.
 	//
 	// loc is as described in Get.
 	//
 	// If the timestamp for the given locator is newer than
-	// blobSignatureTTL, Delete must not delete the data.
+	// blobSignatureTTL, Trash must not trash the data.
 	//
-	// If a Delete operation overlaps with any Touch or Put
+	// If a Trash operation overlaps with any Touch or Put
 	// operations on the same locator, the implementation must
 	// ensure one of the following outcomes:
 	//
 	//   - Touch and Put return a non-nil error, or
-	//   - Delete does not delete the block, or
+	//   - Trash does not trash the block, or
 	//   - Both of the above.
 	//
 	// If it is possible for the storage device to be accessed by
@@ -171,9 +172,12 @@ type Volume interface {
 	// reliably or fail outright.
 	//
 	// Corollary: A successful Touch or Put guarantees a block
-	// will not be deleted for at least blobSignatureTTL
+	// will not be trashed for at least blobSignatureTTL
 	// seconds.
-	Delete(loc string) error
+	Trash(loc string) error
+
+	// Untrash moves block from trash back into store
+	Untrash(loc string) error
 
 	// Status returns a *VolumeStatus representing the current
 	// in-use and available storage capacity and an
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 7580a20..e168940 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -420,7 +420,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 
 	v.Put(TestHash, TestBlock)
 
-	if err := v.Delete(TestHash); err != nil {
+	if err := v.Trash(TestHash); err != nil {
 		t.Error(err)
 	}
 	data, err := v.Get(TestHash)
@@ -449,7 +449,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 	v.Put(TestHash, TestBlock)
 	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-	if err := v.Delete(TestHash); err != nil {
+	if err := v.Trash(TestHash); err != nil {
 		t.Error(err)
 	}
 	if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
@@ -463,7 +463,7 @@ func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
 
-	if err := v.Delete(TestHash2); err == nil {
+	if err := v.Trash(TestHash2); err == nil {
 		t.Errorf("Expected error when attempting to delete a non-existing block")
 	}
 }
@@ -535,7 +535,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	}
 
 	// Delete a block from a read-only volume should result in error
-	err = v.Delete(TestHash)
+	err = v.Trash(TestHash)
 	if err == nil {
 		t.Errorf("Expected error when deleting block from a read-only volume")
 	}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index d671436..53ffeef 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -183,7 +183,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
 	return nil
 }
 
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
 	v.gotCall("Delete")
 	<-v.Gate
 	if v.Readonly {
@@ -199,6 +199,11 @@ func (v *MockVolume) Delete(loc string) error {
 	return os.ErrNotExist
 }
 
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+	return nil
+}
+
 func (v *MockVolume) Status() *VolumeStatus {
 	var used uint64
 	for _, block := range v.Store {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 910cc25..da1d390 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -363,7 +363,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
 	// Touch() must be called before calling Write() on a block.  Touch()
 	// also uses lockfile().  This avoids a race condition between Write()
 	// and Delete() because either (a) the file will be deleted and Touch()
@@ -405,6 +405,12 @@ func (v *UnixVolume) Delete(loc string) error {
 	return os.Remove(p)
 }
 
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+	return nil
+}
+
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index b216810..0775e89 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -166,7 +166,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
 		t.Errorf("got err %v, expected MethodDisabledError", err)
 	}
 
-	err = v.Delete(TestHash)
+	err = v.Trash(TestHash)
 	if err != MethodDisabledError {
 		t.Errorf("got err %v, expected MethodDisabledError", err)
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list