[ARVADOS] created: 2a6e0785f8fb675038f867aebc65033dc8a85211
git at public.curoverse.com
git at public.curoverse.com
Thu Jan 21 14:02:36 EST 2016
at 2a6e0785f8fb675038f867aebc65033dc8a85211 (commit)
commit 2a6e0785f8fb675038f867aebc65033dc8a85211
Author: radhika <radhika at curoverse.com>
Date: Thu Jan 21 13:59:36 2016 -0500
8178: rename Delete api as Trash; add Untrash to volume interface; add UndeleteHandler and test for this endpoint.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index c0033d9..0071567 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -311,8 +311,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
}
}
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
@@ -335,6 +335,12 @@ func (v *AzureBlobVolume) Delete(loc string) error {
})
}
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+ return nil
+}
+
// Status returns a VolumeStatus struct with placeholder data.
func (v *AzureBlobVolume) Status() *VolumeStatus {
return &VolumeStatus{
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 3817ea1..9526c3c 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -970,3 +970,83 @@ func TestPutReplicationHeader(t *testing.T) {
t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
}
}
+
+func TestUndeleteHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up Keep volumes
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+ vols := KeepVM.AllWritable()
+ vols[0].Put(TestHash, TestBlock)
+
+ // Only the datamanager user should be allowed to undelete blocks
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // unauthenticatedReq => UnauthorizedError
+ unauthenticatedReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ }
+ response := IssueRequest(unauthenticatedReq)
+ ExpectStatusCode(t,
+ "enforcePermissions on, unauthenticated request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // notDataManagerReq => UnauthorizedError
+ notDataManagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ apiToken: knownToken,
+ }
+
+ response = IssueRequest(notDataManagerReq)
+ ExpectStatusCode(t,
+ "permissions on, unauthenticated /index/prefix request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // datamanagerWithBadHashReq => StatusBadRequest
+ datamanagerWithBadHashReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/thisisnotalocator",
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWithBadHashReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerWrongMethodReq => StatusBadRequest
+ datamanagerWrongMethodReq := &RequestTester{
+ method: "GET",
+ uri: "/undelete/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWrongMethodReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusOK,
+ response)
+ expected := `Untrashed on volume`
+ match, _ := regexp.MatchString(expected, response.Body.String())
+ if !match {
+ t.Errorf(
+ "Undelete response mismatched: expected %s, got:\n%s",
+ expected, response.Body.String())
+ }
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 95af1b4..f8e4d71 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -53,6 +53,9 @@ func MakeRESTRouter() *mux.Router {
// Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ // Undelete moves blocks from trash back into store
+ rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -295,7 +298,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
Failed int `json:"copies_failed"`
}
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Delete(hash); err == nil {
+ if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
continue
@@ -430,6 +433,36 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
trashq.ReplaceQueue(tlist)
}
+// UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ hash := mux.Vars(req)["hash"]
+ successResp := "Untrashed on volume: "
+ var st int
+ for _, vol := range KeepVM.AllWritable() {
+ if err := vol.Untrash(hash); err == nil {
+ log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ st = http.StatusOK
+ successResp += vol.String()
+ break
+ } else {
+ log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
+ st = 500
+ }
+ }
+
+ if st == http.StatusOK {
+ resp.Write([]byte(successResp))
+ }
+
+ resp.WriteHeader(st)
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 96a887f..f6b1f6e 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -79,6 +79,7 @@ var (
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
)
func (e *KeepError) Error() string {
@@ -113,6 +114,7 @@ var (
flagSerializeIO bool
flagReadonly bool
volumes volumeSet
+ trashLifetime int
)
func (vs *volumeSet) String() string {
@@ -200,6 +202,11 @@ func main() {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+ flag.IntVar(
+ &trashLifetime,
+ "trash-lifetime",
+ 0,
+ fmt.Sprintf("Trashed blocks will stay in trash for trash-lifetime interval before they are actually deleted by the system."))
flag.Parse()
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 572ee46..16afc32 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -257,7 +257,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
return nil
}
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
@@ -272,6 +272,11 @@ func (v *S3Volume) Delete(loc string) error {
return v.Bucket.Del(loc)
}
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+ return nil
+}
+
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 65e3fbd..62f63d5 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -47,7 +47,7 @@ func TrashItem(trashRequest TrashRequest) {
if neverDelete {
err = errors.New("did not delete block because neverDelete is true")
} else {
- err = volume.Delete(trashRequest.Locator)
+ err = volume.Trash(trashRequest.Locator)
}
if err != nil {
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 7966c41..58710c0 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -144,20 +144,21 @@ type Volume interface {
// particular order.
IndexTo(prefix string, writer io.Writer) error
- // Delete deletes the block data from the underlying storage
- // device.
+ // Trash moves the block data from the underlying storage
+ // device to trash area. The block then stays in trash for
+ // -trash-lifetime interval before it is actually deleted.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Delete must not delete the data.
+ // blobSignatureTTL, Trash must not trash the data.
//
- // If a Delete operation overlaps with any Touch or Put
+ // If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// ensure one of the following outcomes:
//
// - Touch and Put return a non-nil error, or
- // - Delete does not delete the block, or
+ // - Trash does not trash the block, or
// - Both of the above.
//
// If it is possible for the storage device to be accessed by
@@ -171,9 +172,12 @@ type Volume interface {
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be deleted for at least blobSignatureTTL
+ // will not be trashed for at least blobSignatureTTL
// seconds.
- Delete(loc string) error
+ Trash(loc string) error
+
+ // Untrash moves block from trash back into store
+ Untrash(loc string) error
// Status returns a *VolumeStatus representing the current
// in-use and available storage capacity and an
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 7580a20..e168940 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -420,7 +420,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
v.Put(TestHash, TestBlock)
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data, err := v.Get(TestHash)
@@ -449,7 +449,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
v.Put(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
@@ -463,7 +463,7 @@ func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- if err := v.Delete(TestHash2); err == nil {
+ if err := v.Trash(TestHash2); err == nil {
t.Errorf("Expected error when attempting to delete a non-existing block")
}
}
@@ -535,7 +535,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
}
// Delete a block from a read-only volume should result in error
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err == nil {
t.Errorf("Expected error when deleting block from a read-only volume")
}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index d671436..53ffeef 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -183,7 +183,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
return nil
}
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
v.gotCall("Delete")
<-v.Gate
if v.Readonly {
@@ -199,6 +199,11 @@ func (v *MockVolume) Delete(loc string) error {
return os.ErrNotExist
}
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+ return nil
+}
+
func (v *MockVolume) Status() *VolumeStatus {
var used uint64
for _, block := range v.Store {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 910cc25..da1d390 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -363,7 +363,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
}
// Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Delete() because either (a) the file will be deleted and Touch()
@@ -405,6 +405,12 @@ func (v *UnixVolume) Delete(loc string) error {
return os.Remove(p)
}
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+ return nil
+}
+
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index b216810..0775e89 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -166,7 +166,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list