[ARVADOS] updated: 3b31f110db82c93c3aade294f50bbb0218c74697
git at public.curoverse.com
git at public.curoverse.com
Fri Jan 22 17:49:24 EST 2016
Summary of changes:
apps/workbench/fpm-info.sh | 1 +
sdk/python/arvados/collection.py | 13 +++++--
sdk/python/arvados/keep.py | 10 +++---
sdk/python/tests/run_test_server.py | 5 ++-
sdk/python/tests/test_collections.py | 28 +++++++++------
sdk/python/tests/test_keep_client.py | 39 +++++++++++++++++++++
services/api/fpm-info.sh | 10 ++++++
services/fuse/arvados_fuse/__init__.py | 10 +++---
services/fuse/arvados_fuse/command.py | 3 +-
services/fuse/arvados_fuse/fusedir.py | 15 +++++----
services/fuse/tests/mount_test_base.py | 14 +++++---
services/fuse/tests/test_mount.py | 35 +++++++++++++++++++
services/fuse/tests/test_retry.py | 60 +++++++++++++++++++++++++++++++++
services/keepstore/azure_blob_volume.go | 5 ++-
services/keepstore/handler_test.go | 55 +++++++++++++++++++++---------
services/keepstore/handlers.go | 49 ++++++++++++++++++---------
services/keepstore/keepstore.go | 11 +++---
services/keepstore/keepstore_test.go | 2 --
services/keepstore/s3_volume.go | 5 ++-
services/keepstore/volume_test.go | 2 +-
services/keepstore/volume_unix.go | 7 ++--
21 files changed, 297 insertions(+), 82 deletions(-)
create mode 100644 apps/workbench/fpm-info.sh
create mode 100644 services/api/fpm-info.sh
create mode 100644 services/fuse/tests/test_retry.py
via 3b31f110db82c93c3aade294f50bbb0218c74697 (commit)
via 6ac861fd4a44cb39371a66d96b77ad7a41c85d3b (commit)
via 33e39bb6053e1cf5bf849a29e41fb74b31a22ef7 (commit)
via f50626706f9177137472fcd37472d3437b7c7a57 (commit)
via 13ca6c961ce700e84bfa4ace9ea715ce9610b7e5 (commit)
via 83172cf795687bd4f618d2c673be8fb30ca840df (commit)
via 1034616dcc47472072a5c6e2d7b92c9b95c544c9 (commit)
via cabbceab8a2e164f2c4f5afb0e2062dd02fd7790 (commit)
via 787fdb3943a9189486fc0ad95a460180a2469e31 (commit)
via de9e183ceac9e5dc53cd4fbd089b752a75a4d5cf (commit)
via b73a5d296669ae58c8cd5a7d2e1aedd19f0c0029 (commit)
via f82d809aad0833f231765042aaee8ee43f3358ee (commit)
via d3313e652e392b1c0067bcddd65903ed0af19b44 (commit)
via e40d18172103a45acccc4ae5aebc1b522c270e68 (commit)
via 73448d12b20fd56321ed556d5b4ad3665564740e (commit)
via 44ff73fa397095d69819761e66933783a5f6d541 (commit)
via e399a43de926edcff6fb8383b2bc3b9b29db016b (commit)
via 24209ad2d22dffd191d2acc34d3487b99924c030 (commit)
via 5d986feae8c88fa4e15e14f185a10c8d7e822594 (commit)
via 343fffd1284fa47e58b45d0852075d9ef6695c79 (commit)
from a4d0d55ac333e8060d13a600d8aa8f5443760b29 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 3b31f110db82c93c3aade294f50bbb0218c74697
Merge: 6ac861f a4d0d55
Author: radhika <radhika at curoverse.com>
Date: Fri Jan 22 17:49:08 2016 -0500
Merge branch '8178-keepstore-trash-interface' of git.curoverse.com:arvados into 8178-keepstore-trash-interface
Conflicts:
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/s3_volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
commit 6ac861fd4a44cb39371a66d96b77ad7a41c85d3b
Author: radhika <radhika at curoverse.com>
Date: Fri Jan 22 17:37:15 2016 -0500
8178: (for now) all volumes must return ErrNotImplemented if trash-lifetime != 0
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index f135835..3681230 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -43,8 +43,7 @@ type azureVolumeAdder struct {
}
func (s *azureVolumeAdder) Set(containerName string) error {
- if trashLifetime <= 0 {
- log.Print("Missing required configuration parameter: trash-lifetime")
+ if trashLifetime != 0 {
return ErrNotImplemented
}
@@ -343,7 +342,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
// Untrash a Keep block.
// TBD
func (v *AzureBlobVolume) Untrash(loc string) error {
- return nil
+ return ErrNotImplemented
}
// Status returns a VolumeStatus struct with placeholder data.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 9526c3c..a7675fb 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -971,7 +971,7 @@ func TestPutReplicationHeader(t *testing.T) {
}
}
-func TestUndeleteHandler(t *testing.T) {
+func TestUntrashHandler(t *testing.T) {
defer teardown()
// Set up Keep volumes
@@ -980,73 +980,96 @@ func TestUndeleteHandler(t *testing.T) {
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, TestBlock)
- // Only the datamanager user should be allowed to undelete blocks
dataManagerToken = "DATA MANAGER TOKEN"
// unauthenticatedReq => UnauthorizedError
unauthenticatedReq := &RequestTester{
method: "PUT",
- uri: "/undelete/" + TestHash,
+ uri: "/untrash/" + TestHash,
}
response := IssueRequest(unauthenticatedReq)
ExpectStatusCode(t,
- "enforcePermissions on, unauthenticated request",
+ "Unauthenticated request",
UnauthorizedError.HTTPCode,
response)
// notDataManagerReq => UnauthorizedError
notDataManagerReq := &RequestTester{
method: "PUT",
- uri: "/undelete/" + TestHash,
+ uri: "/untrash/" + TestHash,
apiToken: knownToken,
}
response = IssueRequest(notDataManagerReq)
ExpectStatusCode(t,
- "permissions on, unauthenticated /index/prefix request",
+ "Non-datamanager token",
UnauthorizedError.HTTPCode,
response)
// datamanagerWithBadHashReq => StatusBadRequest
datamanagerWithBadHashReq := &RequestTester{
method: "PUT",
- uri: "/undelete/thisisnotalocator",
+ uri: "/untrash/thisisnotalocator",
apiToken: dataManagerToken,
}
response = IssueRequest(datamanagerWithBadHashReq)
ExpectStatusCode(t,
- "permissions on, authenticated request, non-superuser",
+ "Bad locator in untrash request",
http.StatusBadRequest,
response)
// datamanagerWrongMethodReq => StatusBadRequest
datamanagerWrongMethodReq := &RequestTester{
method: "GET",
- uri: "/undelete/" + TestHash,
+ uri: "/untrash/" + TestHash,
apiToken: dataManagerToken,
}
response = IssueRequest(datamanagerWrongMethodReq)
ExpectStatusCode(t,
- "permissions on, authenticated request, non-superuser",
+ "Only PUT method is supported for untrash",
http.StatusBadRequest,
response)
// datamanagerReq => StatusOK
datamanagerReq := &RequestTester{
method: "PUT",
- uri: "/undelete/" + TestHash,
+ uri: "/untrash/" + TestHash,
apiToken: dataManagerToken,
}
response = IssueRequest(datamanagerReq)
ExpectStatusCode(t,
- "permissions on, authenticated request, non-superuser",
+ "",
http.StatusOK,
response)
- expected := `Untrashed on volume`
- match, _ := regexp.MatchString(expected, response.Body.String())
- if !match {
+ expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
+ if response.Body.String() != expected {
t.Errorf(
- "Undelete response mismatched: expected %s, got:\n%s",
+ "Untrash response mismatched: expected %s, got:\n%s",
expected, response.Body.String())
}
}
+
+func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
+ defer teardown()
+
+ // Set up readonly Keep volumes
+ vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+ vols[0].Readonly = true
+ vols[1].Readonly = true
+ KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+ defer KeepVM.Close()
+
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/untrash/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response := IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "No writable volumes",
+ http.StatusNotFound,
+ response)
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f8e4d71..e52dba8 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -20,6 +20,7 @@ import (
"regexp"
"runtime"
"strconv"
+ "strings"
"sync"
"time"
)
@@ -53,8 +54,8 @@ func MakeRESTRouter() *mux.Router {
// Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
- // Undelete moves blocks from trash back into store
- rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
+ // Untrash moves blocks from trash back into store
+ rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
// Any request which does not match any of these routes gets
// 400 Bad Request.
@@ -433,8 +434,8 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
trashq.ReplaceQueue(tlist)
}
-// UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
-func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
if !IsDataManagerToken(GetApiToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
@@ -442,25 +443,43 @@ func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
}
hash := mux.Vars(req)["hash"]
- successResp := "Untrashed on volume: "
- var st int
+
+ if len(KeepVM.AllWritable()) == 0 {
+ http.Error(resp, "No writable volumes", http.StatusNotFound)
+ return
+ }
+
+ var untrashedOn, failedOn []string
+ var numNotFound int
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Untrash(hash); err == nil {
+ err := vol.Untrash(hash)
+ if err == nil || err == ErrNotImplemented {
log.Printf("Untrashed %v on volume %v", hash, vol.String())
- st = http.StatusOK
- successResp += vol.String()
- break
+ untrashedOn = append(untrashedOn, vol.String())
} else {
- log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
- st = 500
+ if os.IsNotExist(err) {
+ numNotFound++
+ } else {
+ log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ failedOn = append(failedOn, vol.String())
+ }
}
}
- if st == http.StatusOK {
- resp.Write([]byte(successResp))
+ if numNotFound == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+ return
}
- resp.WriteHeader(st)
+ if len(failedOn) == len(KeepVM.AllWritable()) {
+ http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+ } else {
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ if len(failedOn) > 0 {
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ }
+ resp.Write([]byte(respBody))
+ }
}
// ==============================
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index f6b1f6e..3850e99 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -55,6 +55,10 @@ var dataManagerToken string
// actually deleting anything.
var neverDelete = true
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+var trashLifetime time.Duration
+
var maxBuffers = 128
var bufs *bufferPool
@@ -114,7 +118,6 @@ var (
flagSerializeIO bool
flagReadonly bool
volumes volumeSet
- trashLifetime int
)
func (vs *volumeSet) String() string {
@@ -202,11 +205,11 @@ func main() {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
- flag.IntVar(
+ flag.DurationVar(
&trashLifetime,
"trash-lifetime",
- 0,
- fmt.Sprintf("Trashed blocks will stay in trash for trash-lifetime interval before they are actually deleted by the system."))
+ 0*time.Second,
+ "Interval after a block is trashed during which it can be recovered using an /untrash request")
flag.Parse()
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 746d99e..2a1c3d2 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -335,7 +335,6 @@ func TestDiscoverTmpfs(t *testing.T) {
f.Close()
ProcMounts = f.Name()
- trashLifetime = 24 * 60 * 60
resultVols := volumeSet{}
added := (&unixVolumeAdder{&resultVols}).Discover()
@@ -376,7 +375,6 @@ func TestDiscoverNone(t *testing.T) {
f.Close()
ProcMounts = f.Name()
- trashLifetime = 24 * 60 * 60
resultVols := volumeSet{}
added := (&unixVolumeAdder{&resultVols}).Discover()
if added != 0 || len(resultVols) != 0 {
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 17cc194..1df8f78 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -39,8 +39,7 @@ type s3VolumeAdder struct {
}
func (s *s3VolumeAdder) Set(bucketName string) error {
- if trashLifetime <= 0 {
- log.Print("Missing required configuration parameter: trash-lifetime")
+ if trashLifetime != 0 {
return ErrNotImplemented
}
if bucketName == "" {
@@ -278,7 +277,7 @@ func (v *S3Volume) Trash(loc string) error {
// TBD
func (v *S3Volume) Untrash(loc string) error {
- return nil
+ return ErrNotImplemented
}
func (v *S3Volume) Status() *VolumeStatus {
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 53ffeef..2af1ad6 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -201,7 +201,7 @@ func (v *MockVolume) Trash(loc string) error {
// TBD
func (v *MockVolume) Untrash(loc string) error {
- return nil
+ return ErrNotImplemented
}
func (v *MockVolume) Status() *VolumeStatus {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 1be622c..d71e175 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -23,9 +23,8 @@ type unixVolumeAdder struct {
}
func (vs *unixVolumeAdder) Set(value string) error {
- if trashLifetime <= 0 {
- log.Print("Missing required configuration parameter: trash-lifetime")
- //return ErrNotImplemented
+ if trashLifetime != 0 {
+ return ErrNotImplemented
}
if dirs := strings.Split(value, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
@@ -412,7 +411,7 @@ func (v *UnixVolume) Trash(loc string) error {
// Untrash moves block from trash back into store
// TBD
func (v *UnixVolume) Untrash(loc string) error {
- return nil
+ return ErrNotImplemented
}
// blockDir returns the fully qualified directory name for the directory
commit 33e39bb6053e1cf5bf849a29e41fb74b31a22ef7
Author: radhika <radhika at curoverse.com>
Date: Thu Jan 21 15:25:06 2016 -0500
8178: All three currently supported volumes return error when trash-lifetime period is not configured. azure blob and s3 volumes are updated to do so.
Returning an error is causing test failures in unix volume and hence is still a work in progress.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 0071567..f135835 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -43,6 +43,11 @@ type azureVolumeAdder struct {
}
func (s *azureVolumeAdder) Set(containerName string) error {
+ if trashLifetime <= 0 {
+ log.Print("Missing required configuration parameter: trash-lifetime")
+ return ErrNotImplemented
+ }
+
if containerName == "" {
return errors.New("no container name given")
}
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 2a1c3d2..746d99e 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -335,6 +335,7 @@ func TestDiscoverTmpfs(t *testing.T) {
f.Close()
ProcMounts = f.Name()
+ trashLifetime = 24 * 60 * 60
resultVols := volumeSet{}
added := (&unixVolumeAdder{&resultVols}).Discover()
@@ -375,6 +376,7 @@ func TestDiscoverNone(t *testing.T) {
f.Close()
ProcMounts = f.Name()
+ trashLifetime = 24 * 60 * 60
resultVols := volumeSet{}
added := (&unixVolumeAdder{&resultVols}).Discover()
if added != 0 || len(resultVols) != 0 {
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 16afc32..17cc194 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -39,6 +39,10 @@ type s3VolumeAdder struct {
}
func (s *s3VolumeAdder) Set(bucketName string) error {
+ if trashLifetime <= 0 {
+ log.Print("Missing required configuration parameter: trash-lifetime")
+ return ErrNotImplemented
+ }
if bucketName == "" {
return fmt.Errorf("no container name given")
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index da1d390..1be622c 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -23,6 +23,10 @@ type unixVolumeAdder struct {
}
func (vs *unixVolumeAdder) Set(value string) error {
+ if trashLifetime <= 0 {
+ log.Print("Missing required configuration parameter: trash-lifetime")
+ //return ErrNotImplemented
+ }
if dirs := strings.Split(value, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
for _, dir := range dirs {
commit f50626706f9177137472fcd37472d3437b7c7a57
Author: radhika <radhika at curoverse.com>
Date: Thu Jan 21 13:59:36 2016 -0500
8178: rename Delete api as Trash; add Untrash to volume interface; add UndeleteHandler and test for this endpoint.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index c0033d9..0071567 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -311,8 +311,8 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
}
}
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
@@ -335,6 +335,12 @@ func (v *AzureBlobVolume) Delete(loc string) error {
})
}
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+ return nil
+}
+
// Status returns a VolumeStatus struct with placeholder data.
func (v *AzureBlobVolume) Status() *VolumeStatus {
return &VolumeStatus{
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 3817ea1..9526c3c 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -970,3 +970,83 @@ func TestPutReplicationHeader(t *testing.T) {
t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
}
}
+
+func TestUndeleteHandler(t *testing.T) {
+ defer teardown()
+
+ // Set up Keep volumes
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+ vols := KeepVM.AllWritable()
+ vols[0].Put(TestHash, TestBlock)
+
+ // Only the datamanager user should be allowed to undelete blocks
+ dataManagerToken = "DATA MANAGER TOKEN"
+
+ // unauthenticatedReq => UnauthorizedError
+ unauthenticatedReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ }
+ response := IssueRequest(unauthenticatedReq)
+ ExpectStatusCode(t,
+ "enforcePermissions on, unauthenticated request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // notDataManagerReq => UnauthorizedError
+ notDataManagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ apiToken: knownToken,
+ }
+
+ response = IssueRequest(notDataManagerReq)
+ ExpectStatusCode(t,
+ "permissions on, unauthenticated /index/prefix request",
+ UnauthorizedError.HTTPCode,
+ response)
+
+ // datamanagerWithBadHashReq => StatusBadRequest
+ datamanagerWithBadHashReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/thisisnotalocator",
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWithBadHashReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerWrongMethodReq => StatusBadRequest
+ datamanagerWrongMethodReq := &RequestTester{
+ method: "GET",
+ uri: "/undelete/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerWrongMethodReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusBadRequest,
+ response)
+
+ // datamanagerReq => StatusOK
+ datamanagerReq := &RequestTester{
+ method: "PUT",
+ uri: "/undelete/" + TestHash,
+ apiToken: dataManagerToken,
+ }
+ response = IssueRequest(datamanagerReq)
+ ExpectStatusCode(t,
+ "permissions on, authenticated request, non-superuser",
+ http.StatusOK,
+ response)
+ expected := `Untrashed on volume`
+ match, _ := regexp.MatchString(expected, response.Body.String())
+ if !match {
+ t.Errorf(
+ "Undelete response mismatched: expected %s, got:\n%s",
+ expected, response.Body.String())
+ }
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 95af1b4..f8e4d71 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -53,6 +53,9 @@ func MakeRESTRouter() *mux.Router {
// Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+ // Undelete moves blocks from trash back into store
+ rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
+
// Any request which does not match any of these routes gets
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -295,7 +298,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
Failed int `json:"copies_failed"`
}
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Delete(hash); err == nil {
+ if err := vol.Trash(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
continue
@@ -430,6 +433,36 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
trashq.ReplaceQueue(tlist)
}
+// UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
+ // Reject unauthorized requests.
+ if !IsDataManagerToken(GetApiToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+
+ hash := mux.Vars(req)["hash"]
+ successResp := "Untrashed on volume: "
+ var st int
+ for _, vol := range KeepVM.AllWritable() {
+ if err := vol.Untrash(hash); err == nil {
+ log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ st = http.StatusOK
+ successResp += vol.String()
+ break
+ } else {
+ log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
+ st = 500
+ }
+ }
+
+ if st == http.StatusOK {
+ resp.Write([]byte(successResp))
+ }
+
+ resp.WriteHeader(st)
+}
+
// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 96a887f..f6b1f6e 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -79,6 +79,7 @@ var (
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
)
func (e *KeepError) Error() string {
@@ -113,6 +114,7 @@ var (
flagSerializeIO bool
flagReadonly bool
volumes volumeSet
+ trashLifetime int
)
func (vs *volumeSet) String() string {
@@ -200,6 +202,11 @@ func main() {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+ flag.IntVar(
+ &trashLifetime,
+ "trash-lifetime",
+ 0,
+ fmt.Sprintf("Trashed blocks will stay in trash for trash-lifetime interval before they are actually deleted by the system."))
flag.Parse()
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 572ee46..16afc32 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -257,7 +257,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
return nil
}
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
@@ -272,6 +272,11 @@ func (v *S3Volume) Delete(loc string) error {
return v.Bucket.Del(loc)
}
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+ return nil
+}
+
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 65e3fbd..62f63d5 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -47,7 +47,7 @@ func TrashItem(trashRequest TrashRequest) {
if neverDelete {
err = errors.New("did not delete block because neverDelete is true")
} else {
- err = volume.Delete(trashRequest.Locator)
+ err = volume.Trash(trashRequest.Locator)
}
if err != nil {
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 7966c41..58710c0 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -144,20 +144,21 @@ type Volume interface {
// particular order.
IndexTo(prefix string, writer io.Writer) error
- // Delete deletes the block data from the underlying storage
- // device.
+ // Trash moves the block data from the underlying storage
+ // device to trash area. The block then stays in trash for
+ // -trash-lifetime interval before it is actually deleted.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Delete must not delete the data.
+ // blobSignatureTTL, Trash must not trash the data.
//
- // If a Delete operation overlaps with any Touch or Put
+ // If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// ensure one of the following outcomes:
//
// - Touch and Put return a non-nil error, or
- // - Delete does not delete the block, or
+ // - Trash does not trash the block, or
// - Both of the above.
//
// If it is possible for the storage device to be accessed by
@@ -171,9 +172,12 @@ type Volume interface {
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be deleted for at least blobSignatureTTL
+ // will not be trashed for at least blobSignatureTTL
// seconds.
- Delete(loc string) error
+ Trash(loc string) error
+
+ // Untrash moves block from trash back into store
+ Untrash(loc string) error
// Status returns a *VolumeStatus representing the current
// in-use and available storage capacity and an
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 7580a20..e168940 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -420,7 +420,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
v.Put(TestHash, TestBlock)
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data, err := v.Get(TestHash)
@@ -449,7 +449,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
v.Put(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- if err := v.Delete(TestHash); err != nil {
+ if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
@@ -463,7 +463,7 @@ func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- if err := v.Delete(TestHash2); err == nil {
+ if err := v.Trash(TestHash2); err == nil {
t.Errorf("Expected error when attempting to delete a non-existing block")
}
}
@@ -535,7 +535,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
}
// Delete a block from a read-only volume should result in error
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err == nil {
t.Errorf("Expected error when deleting block from a read-only volume")
}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index d671436..53ffeef 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -183,7 +183,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
return nil
}
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
v.gotCall("Delete")
<-v.Gate
if v.Readonly {
@@ -199,6 +199,11 @@ func (v *MockVolume) Delete(loc string) error {
return os.ErrNotExist
}
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+ return nil
+}
+
func (v *MockVolume) Status() *VolumeStatus {
var used uint64
for _, block := range v.Store {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 910cc25..da1d390 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -363,7 +363,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
}
// Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Delete() because either (a) the file will be deleted and Touch()
@@ -405,6 +405,12 @@ func (v *UnixVolume) Delete(loc string) error {
return os.Remove(p)
}
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+ return nil
+}
+
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index b216810..0775e89 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -166,7 +166,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
- err = v.Delete(TestHash)
+ err = v.Trash(TestHash)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list