[ARVADOS] updated: 7ba6bdf406546ec225baea49dbe6ccbf02e70f53
Git user
git at public.curoverse.com
Wed Jul 27 17:04:29 EDT 2016
Summary of changes:
build/run-tests.sh | 4 +-
services/keep-balance/balance_run_test.go | 2 +-
services/keepstore/handlers.go | 18 +-
services/keepstore/pull_worker.go | 5 +-
services/keepstore/s3_volume.go | 487 ++++++++++++++++++++++++++----
services/keepstore/s3_volume_test.go | 217 ++++++++++++-
services/keepstore/volume_generic_test.go | 21 +-
services/keepstore/volume_unix.go | 13 +-
8 files changed, 671 insertions(+), 96 deletions(-)
via 7ba6bdf406546ec225baea49dbe6ccbf02e70f53 (commit)
via d65114fdee11cfd20833a96c34ebf5346baeb755 (commit)
via d9f2aaaa6b5762f448276ce96b6994245062a4c8 (commit)
via 70e9fc0a1e57fb4d85d985b4c9258d7a5615b3bb (commit)
via e1f5b9c3c303502db7ef0e3c7a19c8edfbbb2183 (commit)
via 482afcd3cda97329e68b2b77f7f4a32da6ea08ef (commit)
via 78d74f846e1b35b6b65d76c345227845d00a9722 (commit)
via d137cbc6cfdcc541216e68d414c535626e4d8916 (commit)
via c15c0971e10534f36748feae87b1b73a386fd9b1 (commit)
via 99b14829e38a823f37ff1f040c9f0777e42f0d67 (commit)
via e42bc5d373290314195c47ededb4fdacc90f7aa0 (commit)
via 094024f2476d84a5b9a453ede79b243e1d282bab (commit)
via a6d2f88debdfa7bc390b63c1f18a0541987ae0b8 (commit)
via e8de69eac0308e5965bdbe13754764d4fea9b8b3 (commit)
via 35ea47144857dc16ab8b6b8a272a87af6d50cb88 (commit)
via d858874f6c39bdcfbe3de383933aa4e68b2780f6 (commit)
via 8db0d3197f659a4099e481b464cb9a877b943d3c (commit)
via 1127e8884c809a35280d8e57dbe3bc1b8f8818a5 (commit)
from be87361dedf4e35405616e802fba12dedf86dfde (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7ba6bdf406546ec225baea49dbe6ccbf02e70f53
Merge: be87361 d65114f
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 27 15:22:26 2016 -0400
Merge branch '8555-s3-trash'
closes #8555
commit d65114fdee11cfd20833a96c34ebf5346baeb755
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 27 11:29:17 2016 -0400
8555: Fail Go tests if not gofmted.
diff --git a/build/run-tests.sh b/build/run-tests.sh
index c9cb2b2..6331ec2 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -520,6 +520,8 @@ do_test_once() {
# mode makes Go show the wrong line numbers when reporting
# compilation errors.
go get -t "git.curoverse.com/arvados.git/$1" || return 1
+ cd "$WORKSPACE/$1" || return 1
+ gofmt -e -d . | egrep . && result=1
if [[ -n "${testargs[$1]}" ]]
then
# "go test -check.vv giturl" doesn't work, but this
@@ -530,7 +532,7 @@ do_test_once() {
# empty, so use this form in such cases:
go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
fi
- result="$?"
+ result=${result:-$?}
if [[ -f "$WORKSPACE/tmp/.$covername.tmp" ]]
then
go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
commit d9f2aaaa6b5762f448276ce96b6994245062a4c8
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 16:29:57 2016 -0400
8555: golint
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f698982..a6798a9 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -73,7 +73,7 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
if enforcePermissions {
locator := req.URL.Path[1:] // strip leading slash
- if err := VerifySignature(locator, GetApiToken(req)); err != nil {
+ if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
@@ -184,7 +184,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
// Success; add a size hint, sign the locator if possible, and
// return it to the client.
returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
- apiToken := GetApiToken(req)
+ apiToken := GetAPIToken(req)
if PermissionSecret != nil && apiToken != "" {
expiry := time.Now().Add(blobSignatureTTL)
returnHash = SignLocator(returnHash, apiToken, expiry)
@@ -196,7 +196,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
// IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
func IndexHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetApiToken(req)) {
+ if !IsDataManagerToken(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -328,7 +328,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
// Confirm that this user is an admin and has a token with unlimited scope.
- var tok = GetApiToken(req)
+ var tok = GetAPIToken(req)
if tok == "" || !CanDelete(tok) {
http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
return
@@ -419,7 +419,7 @@ type PullRequest struct {
// PullHandler processes "PUT /pull" requests for the data manager.
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetApiToken(req)) {
+ if !IsDataManagerToken(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -455,7 +455,7 @@ type TrashRequest struct {
// TrashHandler processes /trash requests.
func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetApiToken(req)) {
+ if !IsDataManagerToken(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -485,7 +485,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetApiToken(req)) {
+ if !IsDataManagerToken(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -714,10 +714,10 @@ func IsValidLocator(loc string) bool {
var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
-// GetApiToken returns the OAuth2 token from the Authorization
+// GetAPIToken returns the OAuth2 token from the Authorization
// header of a HTTP request, or an empty string if no matching
// token is found.
-func GetApiToken(req *http.Request) string {
+func GetAPIToken(req *http.Request) string {
if auth, ok := req.Header["Authorization"]; ok {
if match := authRe.FindStringSubmatch(auth[0]); match != nil {
return match[1]
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 2626d4b..d53d106 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -2,7 +2,6 @@ package main
import (
"crypto/rand"
- "errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io"
@@ -57,7 +56,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
return
}
if reader == nil {
- return errors.New(fmt.Sprintf("No reader found for : %s", signedLocator))
+ return fmt.Errorf("No reader found for : %s", signedLocator)
}
defer reader.Close()
@@ -67,7 +66,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
}
if (readContent == nil) || (int64(len(readContent)) != contentLen) {
- return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
+ return fmt.Errorf("Content not found for: %s", signedLocator)
}
err = PutContent(readContent, pullRequest.Locator)
commit 70e9fc0a1e57fb4d85d985b4c9258d7a5615b3bb
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 16:26:15 2016 -0400
8555: Fix up comments.
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index f39a9df..1f62f4a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,8 +18,8 @@ import (
)
var (
- // Returned by Trash if that operation is impossible with the
- // current config.
+ // ErrS3TrashDisabled is returned by Trash if that operation
+ // is impossible with the current config.
ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
s3AccessKeyFile string
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 65ce2be..6ba3904 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -250,6 +250,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
return loc, blk
}
+ // Check canGet
loc, blk := setupScenario()
buf := make([]byte, len(blk))
_, err := v.Get(loc, buf)
@@ -258,6 +259,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
c.Check(os.IsNotExist(err), check.Equals, true)
}
+ // Call Trash, then check canTrash and canGetAfterTrash
loc, blk = setupScenario()
err = v.Trash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
@@ -267,6 +269,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
c.Check(os.IsNotExist(err), check.Equals, true)
}
+ // Call Untrash, then check canUntrash
loc, blk = setupScenario()
err = v.Untrash(loc)
c.Check(err == nil, check.Equals, scenario.canUntrash)
@@ -279,6 +282,8 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
c.Check(err, check.IsNil)
}
+ // Call EmptyTrash, then check haveTrashAfterEmpty and
+ // freshAfterEmpty
loc, blk = setupScenario()
v.EmptyTrash()
_, err = v.Bucket.Head("trash/"+loc, nil)
@@ -291,6 +296,8 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
+ // Check for current Mtime after Put (applies to all
+ // scenarios)
loc, blk = setupScenario()
err = v.Put(loc, blk)
c.Check(err, check.IsNil)
commit e1f5b9c3c303502db7ef0e3c7a19c8edfbbb2183
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 16:20:30 2016 -0400
8555: gofmt
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index da24daf..f39a9df 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -592,7 +592,7 @@ func (v *S3Volume) EmptyTrash() {
continue
}
if trashT.Sub(recentT) < blobSignatureTTL {
- if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+ if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
// recent/loc is too old to protect
// loc from being Trashed again during
// the raceWindow that starts if we
commit 482afcd3cda97329e68b2b77f7f4a32da6ea08ef
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 12:25:35 2016 -0400
8555: Test Get() after successful Untrash. Test Put+Mtime in all scenarios.
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index e41e04b..65ce2be 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -270,6 +270,14 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
loc, blk = setupScenario()
err = v.Untrash(loc)
c.Check(err == nil, check.Equals, scenario.canUntrash)
+ if scenario.dataT != none || scenario.trashT != none {
+ // In all scenarios where the data exists, we
+ // should be able to Get after Untrash --
+ // regardless of timestamps, errors, race
+ // conditions, etc.
+ _, err = v.Get(loc, buf)
+ c.Check(err, check.IsNil)
+ }
loc, blk = setupScenario()
v.EmptyTrash()
@@ -282,6 +290,13 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
// allowance for 1s timestamp precision)
c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
+
+ loc, blk = setupScenario()
+ err = v.Put(loc, blk)
+ c.Check(err, check.IsNil)
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
}
}
commit 78d74f846e1b35b6b65d76c345227845d00a9722
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 11:07:46 2016 -0400
8555: Untrash to repair inconsistent state (trash/X without recentX).
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 0cfaaaf..da24daf 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -575,8 +575,15 @@ func (v *S3Volume) EmptyTrash() {
continue
}
recent, err := v.Bucket.Head("recent/"+loc, nil)
- if err != nil {
- log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+ if err != nil && os.IsNotExist(v.translateError(err)) {
+ log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+ err = v.Untrash(loc)
+ if err != nil {
+ log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+ }
+ continue
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
continue
}
recentT, err := v.lastModified(recent)
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 8a06f19..e41e04b 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -223,6 +223,11 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
true, false, true, true, true, false,
},
+ {
+ "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
+ none, none, t0.Add(-time.Minute),
+ false, false, false, true, true, true,
+ },
} {
c.Log("Scenario: ", scenario.label)
commit d137cbc6cfdcc541216e68d414c535626e4d8916
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 09:44:56 2016 -0400
8555: Log statistics in EmptyTrash.
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index be4da4d..0cfaaaf 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -551,6 +551,8 @@ func (v *S3Volume) translateError(err error) error {
// EmptyTrash looks for trashed blocks that exceeded trashLifetime
// and deletes them from the volume.
func (v *S3Volume) EmptyTrash() {
+ var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
// Use a merge sort to find matching sets of trash/X and recent/X.
trashL := s3Lister{
Bucket: v.Bucket,
@@ -564,6 +566,9 @@ func (v *S3Volume) EmptyTrash() {
if !v.isKeepBlock(loc) {
continue
}
+ bytesInTrash += trash.Size
+ blocksInTrash++
+
trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
@@ -610,6 +615,9 @@ func (v *S3Volume) EmptyTrash() {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
continue
}
+ bytesDeleted += trash.Size
+ blocksDeleted++
+
_, err = v.Bucket.Head(loc, nil)
if os.IsNotExist(err) {
err = v.Bucket.Del("recent/" + loc)
@@ -623,6 +631,7 @@ func (v *S3Volume) EmptyTrash() {
if err := trashL.Error(); err != nil {
log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
}
+ log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type s3Lister struct {
commit c15c0971e10534f36748feae87b1b73a386fd9b1
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 09:38:07 2016 -0400
8555: Improve variable names and comments.
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 8ded160..be4da4d 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -558,7 +558,7 @@ func (v *S3Volume) EmptyTrash() {
PageSize: v.indexPageSize,
}
// Define "ready to delete" as "...when EmptyTrash started".
- now := time.Now()
+ startT := time.Now()
for trash := trashL.First(); trash != nil; trash = trashL.Next() {
loc := trash.Key[6:]
if !v.isKeepBlock(loc) {
@@ -580,7 +580,7 @@ func (v *S3Volume) EmptyTrash() {
continue
}
if trashT.Sub(recentT) < blobSignatureTTL {
- if age := now.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+ if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
// recent/loc is too old to protect
// loc from being Trashed again during
// the raceWindow that starts if we
@@ -602,7 +602,7 @@ func (v *S3Volume) EmptyTrash() {
continue
}
}
- if now.Sub(trashT) < trashLifetime {
+ if startT.Sub(trashT) < trashLifetime {
continue
}
err = v.Bucket.Del(trash.Key)
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 0bb818c..8a06f19 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -122,7 +122,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
var none time.Time
- stubKey := func(t time.Time, key string, data []byte) {
+ putS3Obj := func(t time.Time, key string, data []byte) {
if t == none {
return
}
@@ -134,9 +134,9 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
nextKey := 0
for _, scenario := range []struct {
label string
- data time.Time
- recent time.Time
- trash time.Time
+ dataT time.Time
+ recentT time.Time
+ trashT time.Time
canGet bool
canTrash bool
canGetAfterTrash bool
@@ -159,42 +159,42 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
true, true, true, false, false, false,
},
{
- "Not trash; old enough to trash",
+ "Not trash, but old enough to be eligible for trash",
t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
true, true, false, false, false, false,
},
{
- "Not trash; not old enough to trash",
+ "Not trash, and not old enough to be eligible for trash",
t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
true, true, true, false, false, false,
},
{
- "Trash + not-trash: recent race between Trash and Put",
+ "Trashed + untrashed copies exist, due to recent race between Trash and Put",
t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
true, true, true, true, true, false,
},
{
- "Trash + not-trash, nearly eligible for deletion, prone to Trash race",
+ "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
true, false, true, true, true, false,
},
{
- "Trash + not-trash, eligible for deletion, prone to Trash race",
+ "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
true, false, true, true, false, false,
},
{
- "Trash + not-trash, unsafe to empty; old race between Put and unfinished Trash",
+ "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
true, false, true, true, true, true,
},
{
- "Trash + not-trash, was unsafe to empty, but since made safe by fixRace+Touch",
+ "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
true, true, true, true, false, false,
},
{
- "Trash operation was interrupted",
+ "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
true, false, true, true, false, false,
},
@@ -233,19 +233,19 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
// locator to prevent interference from previous
// tests.
- setup := func() (string, []byte) {
+ setupScenario := func() (string, []byte) {
nextKey++
blk := []byte(fmt.Sprintf("%d", nextKey))
loc := fmt.Sprintf("%x", md5.Sum(blk))
c.Log("\t", loc)
- stubKey(scenario.data, loc, blk)
- stubKey(scenario.recent, "recent/"+loc, nil)
- stubKey(scenario.trash, "trash/"+loc, blk)
+ putS3Obj(scenario.dataT, loc, blk)
+ putS3Obj(scenario.recentT, "recent/"+loc, nil)
+ putS3Obj(scenario.trashT, "trash/"+loc, blk)
v.serverClock.now = &t0
return loc, blk
}
- loc, blk := setup()
+ loc, blk := setupScenario()
buf := make([]byte, len(blk))
_, err := v.Get(loc, buf)
c.Check(err == nil, check.Equals, scenario.canGet)
@@ -253,7 +253,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
c.Check(os.IsNotExist(err), check.Equals, true)
}
- loc, blk = setup()
+ loc, blk = setupScenario()
err = v.Trash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
_, err = v.Get(loc, buf)
@@ -262,11 +262,11 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
c.Check(os.IsNotExist(err), check.Equals, true)
}
- loc, blk = setup()
+ loc, blk = setupScenario()
err = v.Untrash(loc)
c.Check(err == nil, check.Equals, scenario.canUntrash)
- loc, blk = setup()
+ loc, blk = setupScenario()
v.EmptyTrash()
_, err = v.Bucket.Head("trash/"+loc, nil)
c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
commit 99b14829e38a823f37ff1f040c9f0777e42f0d67
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jul 26 09:19:30 2016 -0400
8555: Reformat test cases.
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index fb0bea2..0bb818c 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -147,7 +147,8 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
{
"No related objects",
none, none, none,
- false, false, false, false, false, false},
+ false, false, false, false, false, false,
+ },
{
// Stored by older version, or there was a
// race between EmptyTrash and Put: Trash is a
@@ -155,59 +156,73 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
// old
"No recent/X",
t0.Add(-48 * time.Hour), none, none,
- true, true, true, false, false, false},
+ true, true, true, false, false, false,
+ },
{
"Not trash; old enough to trash",
t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
- true, true, false, false, false, false},
+ true, true, false, false, false, false,
+ },
{
"Not trash; not old enough to trash",
t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
- true, true, true, false, false, false},
+ true, true, true, false, false, false,
+ },
{
"Trash + not-trash: recent race between Trash and Put",
t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
- true, true, true, true, true, false},
+ true, true, true, true, true, false,
+ },
{
"Trash + not-trash, nearly eligible for deletion, prone to Trash race",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
- true, false, true, true, true, false},
+ true, false, true, true, true, false,
+ },
{
"Trash + not-trash, eligible for deletion, prone to Trash race",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
- true, false, true, true, false, false},
+ true, false, true, true, false, false,
+ },
{
"Trash + not-trash, unsafe to empty; old race between Put and unfinished Trash",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
- true, false, true, true, true, true},
+ true, false, true, true, true, true,
+ },
{
"Trash + not-trash, was unsafe to empty, but since made safe by fixRace+Touch",
t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
- true, true, true, true, false, false},
+ true, true, true, true, false, false,
+ },
{
"Trash operation was interrupted",
t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
- true, false, true, true, false, false},
+ true, false, true, true, false, false,
+ },
{
"Trash, not yet eligible for deletion",
none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
- false, false, false, true, true, false},
+ false, false, false, true, true, false,
+ },
{
"Trash, not yet eligible for deletion, prone to races",
none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
- false, false, false, true, true, false},
+ false, false, false, true, true, false,
+ },
{
"Trash, eligible for deletion",
none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
- false, false, false, true, false, false},
+ false, false, false, true, false, false,
+ },
{
"Erroneously trashed during a race, detected before trashLifetime",
none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
- true, false, true, true, true, false},
+ true, false, true, true, true, false,
+ },
{
"Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
- true, false, true, true, true, false},
+ true, false, true, true, true, false,
+ },
} {
c.Log("Scenario: ", scenario.label)
commit e42bc5d373290314195c47ededb4fdacc90f7aa0
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Jul 22 14:04:34 2016 -0400
8555: De-obfuscate test case.
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 59402dc..fb0bea2 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -132,7 +132,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
t0 := time.Now()
nextKey := 0
- for _, test := range []struct {
+ for _, scenario := range []struct {
label string
data time.Time
recent time.Time
@@ -209,47 +209,53 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
true, false, true, true, true, false},
} {
- c.Log("Scenario: ", test.label)
- var loc string
- var blk []byte
+ c.Log("Scenario: ", scenario.label)
- setup := func() {
+ // We have a few tests to run for each scenario, and
+ // the tests are expected to change state. By calling
+ // this setup func between tests, we (re)create the
+ // scenario as specified, using a new unique block
+ // locator to prevent interference from previous
+ // tests.
+
+ setup := func() (string, []byte) {
nextKey++
- blk = []byte(fmt.Sprintf("%d", nextKey))
- loc = fmt.Sprintf("%x", md5.Sum(blk))
+ blk := []byte(fmt.Sprintf("%d", nextKey))
+ loc := fmt.Sprintf("%x", md5.Sum(blk))
c.Log("\t", loc)
- stubKey(test.data, loc, blk)
- stubKey(test.recent, "recent/"+loc, nil)
- stubKey(test.trash, "trash/"+loc, blk)
+ stubKey(scenario.data, loc, blk)
+ stubKey(scenario.recent, "recent/"+loc, nil)
+ stubKey(scenario.trash, "trash/"+loc, blk)
v.serverClock.now = &t0
+ return loc, blk
}
- setup()
+ loc, blk := setup()
buf := make([]byte, len(blk))
_, err := v.Get(loc, buf)
- c.Check(err == nil, check.Equals, test.canGet)
+ c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
}
- setup()
+ loc, blk = setup()
err = v.Trash(loc)
- c.Check(err == nil, check.Equals, test.canTrash)
+ c.Check(err == nil, check.Equals, scenario.canTrash)
_, err = v.Get(loc, buf)
- c.Check(err == nil, check.Equals, test.canGetAfterTrash)
+ c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
}
- setup()
+ loc, blk = setup()
err = v.Untrash(loc)
- c.Check(err == nil, check.Equals, test.canUntrash)
+ c.Check(err == nil, check.Equals, scenario.canUntrash)
- setup()
+ loc, blk = setup()
v.EmptyTrash()
_, err = v.Bucket.Head("trash/"+loc, nil)
- c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
- if test.freshAfterEmpty {
+ c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+ if scenario.freshAfterEmpty {
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
// new mtime must be current (with an
commit 094024f2476d84a5b9a453ede79b243e1d282bab
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jul 21 11:44:17 2016 -0400
8555: Fix EmptyTrash so it can clean up old races.
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 5b00d4a..8ded160 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -564,14 +564,14 @@ func (v *S3Volume) EmptyTrash() {
if !v.isKeepBlock(loc) {
continue
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
continue
}
- trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+ recent, err := v.Bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
continue
}
recentT, err := v.lastModified(recent)
@@ -580,8 +580,27 @@ func (v *S3Volume) EmptyTrash() {
continue
}
if trashT.Sub(recentT) < blobSignatureTTL {
- v.fixRace(loc)
- continue
+ if age := now.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+ // recent/loc is too old to protect
+ // loc from being Trashed again during
+ // the raceWindow that starts if we
+ // delete trash/X now.
+ //
+ // Note this means (trashCheckInterval
+ // < blobSignatureTTL - raceWindow) is
+ // necessary to avoid starvation.
+ log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+ v.fixRace(loc)
+ v.Touch(loc)
+ continue
+ } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+ log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+ v.fixRace(loc)
+ continue
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ continue
+ }
}
if now.Sub(trashT) < trashLifetime {
continue
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index a40b84f..59402dc 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -142,11 +142,12 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
canGetAfterTrash bool
canUntrash bool
haveTrashAfterEmpty bool
+ freshAfterEmpty bool
}{
{
"No related objects",
none, none, none,
- false, false, false, false, false},
+ false, false, false, false, false, false},
{
// Stored by older version, or there was a
// race between EmptyTrash and Put: Trash is a
@@ -154,56 +155,59 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
// old
"No recent/X",
t0.Add(-48 * time.Hour), none, none,
- true, true, true, false, false},
+ true, true, true, false, false, false},
{
"Not trash; old enough to trash",
t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
- true, true, false, false, false},
+ true, true, false, false, false, false},
{
"Not trash; not old enough to trash",
t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
- true, true, true, false, false},
+ true, true, true, false, false, false},
{
"Trash + not-trash: recent race between Trash and Put",
t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
- true, true, true, true, true},
+ true, true, true, true, true, false},
{
"Trash + not-trash, nearly eligible for deletion, prone to Trash race",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
- true, false, true, true, true},
+ true, false, true, true, true, false},
{
"Trash + not-trash, eligible for deletion, prone to Trash race",
t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
- true, false, true, true, false},
- // FIXME: old trash never gets deleted!
- // {
- // "Not trash; old race between Trash and Put, or incomplete Trash",
- // t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
- // true, false, true, true, false},
+ true, false, true, true, false, false},
+ {
+ "Trash + not-trash, unsafe to empty; old race between Put and unfinished Trash",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+ true, false, true, true, true, true},
+ {
+ "Trash + not-trash, was unsafe to empty, but since made safe by fixRace+Touch",
+ t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
+ true, true, true, true, false, false},
{
"Trash operation was interrupted",
t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
- true, false, true, true, false},
+ true, false, true, true, false, false},
{
"Trash, not yet eligible for deletion",
none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
- false, false, false, true, true},
+ false, false, false, true, true, false},
{
"Trash, not yet eligible for deletion, prone to races",
none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
- false, false, false, true, true},
+ false, false, false, true, true, false},
{
"Trash, eligible for deletion",
none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
- false, false, false, true, false},
+ false, false, false, true, false, false},
{
"Erroneously trashed during a race, detected before trashLifetime",
none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
- true, false, true, true, true},
+ true, false, true, true, true, false},
{
"Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
- true, false, true, true, true},
+ true, false, true, true, true, false},
} {
c.Log("Scenario: ", test.label)
var loc string
@@ -245,6 +249,13 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
v.EmptyTrash()
_, err = v.Bucket.Head("trash/"+loc, nil)
c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
+ if test.freshAfterEmpty {
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ // new mtime must be current (with an
+ // allowance for 1s timestamp precision)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
}
}
commit a6d2f88debdfa7bc390b63c1f18a0541987ae0b8
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 20 11:56:59 2016 -0400
8555: golint
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 9b7b51a..5b00d4a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,6 +18,8 @@ import (
)
var (
+ // Returned by Trash if that operation is impossible with the
+ // current config.
ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
s3AccessKeyFile string
@@ -134,6 +136,7 @@ func init() {
"EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
}
+// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
*s3.Bucket
raceWindow time.Duration
@@ -158,6 +161,8 @@ func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow tim
}
}
+// Check returns an error if the volume is inaccessible (e.g., config
+// error).
func (v *S3Volume) Check() error {
return nil
}
@@ -192,6 +197,8 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
return
}
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
rdr, err := v.getReader(loc)
if err != nil {
@@ -207,6 +214,7 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
}
}
+// Compare the given data with the stored data.
func (v *S3Volume) Compare(loc string, expect []byte) error {
rdr, err := v.getReader(loc)
if err != nil {
@@ -216,6 +224,7 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
}
+// Put writes a block.
func (v *S3Volume) Put(loc string, block []byte) error {
if v.readonly {
return MethodDisabledError
@@ -236,6 +245,7 @@ func (v *S3Volume) Put(loc string, block []byte) error {
return v.translateError(err)
}
+// Touch sets the timestamp for the given locator to the current time.
func (v *S3Volume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
@@ -252,6 +262,7 @@ func (v *S3Volume) Touch(loc string) error {
return v.translateError(err)
}
+// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
_, err := v.Bucket.Head(loc, nil)
if err != nil {
@@ -279,6 +290,8 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
return v.lastModified(resp)
}
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
@@ -393,6 +406,10 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
return nil
}
+// safeCopy calls PutCopy, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+// (PutCopy returns 200 OK if the request was received, even if the
+// copy failed).
func (v *S3Volume) safeCopy(dst, src string) error {
resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
@@ -426,6 +443,7 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
return
}
+// Untrash moves block from trash back into store
func (v *S3Volume) Untrash(loc string) error {
err := v.safeCopy(loc, "trash/"+loc)
if err != nil {
@@ -435,6 +453,9 @@ func (v *S3Volume) Untrash(loc string) error {
return v.translateError(err)
}
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
@@ -443,13 +464,19 @@ func (v *S3Volume) Status() *VolumeStatus {
}
}
+// String implements fmt.Stringer.
func (v *S3Volume) String() string {
return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
}
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
func (v *S3Volume) Writable() bool {
return !v.readonly
}
+
+// Replication returns the storage redundancy of the underlying
+// device. Configured via command line flag.
func (v *S3Volume) Replication() int {
return v.replication
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 90189dc..5982fb0 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -306,7 +306,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
//
func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
- var lastErr error = nil
+ var lastErr error
rootdir, err := os.Open(v.root)
if err != nil {
return err
@@ -397,10 +397,8 @@ func (v *UnixVolume) Trash(loc string) error {
// anyway (because the permission signatures have expired).
if fi, err := os.Stat(p); err != nil {
return err
- } else {
- if time.Since(fi.ModTime()) < blobSignatureTTL {
- return nil
- }
+ } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+ return nil
}
if trashLifetime == 0 {
@@ -506,11 +504,14 @@ func (v *UnixVolume) String() string {
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
-// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
func (v *UnixVolume) Writable() bool {
return !v.readonly
}
+// Replication returns the number of replicas promised by the
+// underlying device (currently assumed to be 1).
func (v *UnixVolume) Replication() int {
return 1
}
commit e8de69eac0308e5965bdbe13754764d4fea9b8b3
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 20 11:42:14 2016 -0400
8555: gofmt
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index edc88aa..23d74fe 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -371,5 +371,5 @@ func (s *runSuite) TestRunForever(c *check.C) {
}
stop <- true
c.Check(pullReqs.Count() >= 16, check.Equals, true)
- c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
+ c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 5eb9941..9b7b51a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -288,7 +288,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
}
recentL := s3Lister{
Bucket: v.Bucket,
- Prefix: "recent/"+prefix,
+ Prefix: "recent/" + prefix,
PageSize: v.indexPageSize,
}
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
@@ -566,7 +566,7 @@ func (v *S3Volume) EmptyTrash() {
}
_, err = v.Bucket.Head(loc, nil)
if os.IsNotExist(err) {
- err = v.Bucket.Del("recent/"+loc)
+ err = v.Bucket.Del("recent/" + loc)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
}
@@ -580,12 +580,12 @@ func (v *S3Volume) EmptyTrash() {
}
type s3Lister struct {
- Bucket *s3.Bucket
- Prefix string
- PageSize int
- nextMarker string
- buf []s3.Key
- err error
+ Bucket *s3.Bucket
+ Prefix string
+ PageSize int
+ nextMarker string
+ buf []s3.Key
+ err error
}
// First fetches the first page and returns the first item. It returns
commit 35ea47144857dc16ab8b6b8a272a87af6d50cb88
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 20 11:40:41 2016 -0400
8555: Test various backend states. Update recent/X timestamp during Untrash.
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index a0a6860..5eb9941 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -427,7 +427,12 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
}
func (v *S3Volume) Untrash(loc string) error {
- return v.safeCopy(loc, "trash/"+loc)
+ err := v.safeCopy(loc, "trash/"+loc)
+ if err != nil {
+ return err
+ }
+ err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ return v.translateError(err)
}
func (v *S3Volume) Status() *VolumeStatus {
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 73c6b76..a40b84f 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -2,8 +2,10 @@ package main
import (
"bytes"
+ "crypto/md5"
"fmt"
"log"
+ "os"
"time"
"github.com/AdRoll/goamz/aws"
@@ -74,13 +76,13 @@ func (s *StubbedS3Suite) TestGeneric(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return NewTestableS3Volume(c, -time.Second, false, 2)
+ return NewTestableS3Volume(c, -2*time.Second, false, 2)
})
}
func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
- return NewTestableS3Volume(c, -time.Second, true, 2)
+ return NewTestableS3Volume(c, -2*time.Second, true, 2)
})
}
@@ -109,6 +111,143 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
}
}
+func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
+ defer func(tl, bs time.Duration) {
+ trashLifetime = tl
+ blobSignatureTTL = bs
+ }(trashLifetime, blobSignatureTTL)
+ trashLifetime = time.Hour
+ blobSignatureTTL = time.Hour
+
+ v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+ var none time.Time
+
+ stubKey := func(t time.Time, key string, data []byte) {
+ if t == none {
+ return
+ }
+ v.serverClock.now = &t
+ v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+ }
+
+ t0 := time.Now()
+ nextKey := 0
+ for _, test := range []struct {
+ label string
+ data time.Time
+ recent time.Time
+ trash time.Time
+ canGet bool
+ canTrash bool
+ canGetAfterTrash bool
+ canUntrash bool
+ haveTrashAfterEmpty bool
+ }{
+ {
+ "No related objects",
+ none, none, none,
+ false, false, false, false, false},
+ {
+ // Stored by older version, or there was a
+ // race between EmptyTrash and Put: Trash is a
+ // no-op even though the data object is very
+ // old
+ "No recent/X",
+ t0.Add(-48 * time.Hour), none, none,
+ true, true, true, false, false},
+ {
+ "Not trash; old enough to trash",
+ t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
+ true, true, false, false, false},
+ {
+ "Not trash; not old enough to trash",
+ t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
+ true, true, true, false, false},
+ {
+ "Trash + not-trash: recent race between Trash and Put",
+ t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
+ true, true, true, true, true},
+ {
+ "Trash + not-trash, nearly eligible for deletion, prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ true, false, true, true, true},
+ {
+ "Trash + not-trash, eligible for deletion, prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
+ true, false, true, true, false},
+ // FIXME: old trash never gets deleted!
+ // {
+ // "Not trash; old race between Trash and Put, or incomplete Trash",
+ // t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+ // true, false, true, true, false},
+ {
+ "Trash operation was interrupted",
+ t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
+ true, false, true, true, false},
+ {
+ "Trash, not yet eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
+ false, false, false, true, true},
+ {
+ "Trash, not yet eligible for deletion, prone to races",
+ none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ false, false, false, true, true},
+ {
+ "Trash, eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
+ false, false, false, true, false},
+ {
+ "Erroneously trashed during a race, detected before trashLifetime",
+ none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
+ true, false, true, true, true},
+ {
+ "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+ none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
+ true, false, true, true, true},
+ } {
+ c.Log("Scenario: ", test.label)
+ var loc string
+ var blk []byte
+
+ setup := func() {
+ nextKey++
+ blk = []byte(fmt.Sprintf("%d", nextKey))
+ loc = fmt.Sprintf("%x", md5.Sum(blk))
+ c.Log("\t", loc)
+ stubKey(test.data, loc, blk)
+ stubKey(test.recent, "recent/"+loc, nil)
+ stubKey(test.trash, "trash/"+loc, blk)
+ v.serverClock.now = &t0
+ }
+
+ setup()
+ buf := make([]byte, len(blk))
+ _, err := v.Get(loc, buf)
+ c.Check(err == nil, check.Equals, test.canGet)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ setup()
+ err = v.Trash(loc)
+ c.Check(err == nil, check.Equals, test.canTrash)
+ _, err = v.Get(loc, buf)
+ c.Check(err == nil, check.Equals, test.canGetAfterTrash)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ setup()
+ err = v.Untrash(loc)
+ c.Check(err == nil, check.Equals, test.canUntrash)
+
+ setup()
+ v.EmptyTrash()
+ _, err = v.Bucket.Head("trash/"+loc, nil)
+ c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
+ }
+}
+
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index c17c837..bc3e537 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -895,6 +895,9 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
t.Fatal(err)
}
+ // Untrash might have updated the timestamp, so backdate again
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
// Second set: EmptyTrash after the trash deadline has passed.
trashLifetime = time.Nanosecond
commit d858874f6c39bdcfbe3de383933aa4e68b2780f6
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jul 18 15:47:23 2016 -0400
8555: Move checkRaceWindow out to a func, tweak comments
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 568bd33..a0a6860 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,7 +18,7 @@ import (
)
var (
- ErrS3TrashNotAvailable = fmt.Errorf("trash is not possible because -trash-lifetime=0 and -s3-unsafe-delete=false")
+ ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
s3AccessKeyFile string
s3SecretKeyFile string
@@ -162,7 +162,12 @@ func (v *S3Volume) Check() error {
return nil
}
-func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err error) {
+// getReader wraps (Bucket)GetReader.
+//
+// In situations where (Bucket)GetReader would fail because the block
+// disappeared in a Trash race, getReader calls fixRace to recover the
+// data, and tries again.
+func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
rdr, err = v.Bucket.GetReader(loc)
err = v.translateError(err)
if err == nil || !os.IsNotExist(err) {
@@ -188,7 +193,7 @@ func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err erro
}
func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
- rdr, err := v.getReaderWithFixRace(loc)
+ rdr, err := v.getReader(loc)
if err != nil {
return 0, err
}
@@ -203,7 +208,7 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
}
func (v *S3Volume) Compare(loc string, expect []byte) error {
- rdr, err := v.getReaderWithFixRace(loc)
+ rdr, err := v.getReader(loc)
if err != nil {
return err
}
@@ -341,43 +346,53 @@ func (v *S3Volume) Trash(loc string) error {
}
if trashLifetime == 0 {
if !s3UnsafeDelete {
- return ErrS3TrashNotAvailable
+ return ErrS3TrashDisabled
}
return v.Bucket.Del(loc)
}
-
- // Make sure we're not in the race window.
- {
- resp, err := v.Bucket.Head("trash/"+loc, nil)
- err = v.translateError(err)
- if os.IsNotExist(err) {
- // OK, trash/X doesn't exist so we're not in
- // the race window
- } else if err != nil {
- // Error looking up trash/X. Unsafe to proceed
- // without knowing whether we're in the race
- // window
- return err
- } else if t, err := v.lastModified(resp); err != nil {
- // Can't parse timestamp
- return err
- } else if safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow)); safeWindow <= 0 {
- // We can't count on "touch trash/X" to
- // prolong trash/X's lifetime. The new
- // timestamp might not become visible until
- // now+raceWindow, and EmptyTrash is allowed
- // to delete trash/X before then.
- return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
- }
+ err := v.checkRaceWindow(loc)
+ if err != nil {
+ return err
}
-
- err := v.safeCopy("trash/"+loc, loc)
+ err = v.safeCopy("trash/"+loc, loc)
if err != nil {
return err
}
return v.translateError(v.Bucket.Del(loc))
}
+// checkRaceWindow returns a non-nil error if trash/loc is, or might
+// be, in the race window (i.e., it's not safe to trash loc).
+func (v *S3Volume) checkRaceWindow(loc string) error {
+ resp, err := v.Bucket.Head("trash/"+loc, nil)
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ // OK, trash/X doesn't exist so we're not in the race
+ // window
+ return nil
+ } else if err != nil {
+ // Error looking up trash/X. We don't know whether
+ // we're in the race window
+ return err
+ }
+ t, err := v.lastModified(resp)
+ if err != nil {
+ // Can't parse timestamp
+ return err
+ }
+ safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
+ if safeWindow <= 0 {
+ // We can't count on "touch trash/X" to prolong
+ // trash/X's lifetime. The new timestamp might not
+ // become visible until now+raceWindow, and EmptyTrash
+ // is allowed to delete trash/X before then.
+ return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ }
+ // trash/X exists, but it won't be eligible for deletion until
+ // after now+raceWindow, so it's safe to overwrite it.
+ return nil
+}
+
func (v *S3Volume) safeCopy(dst, src string) error {
resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
commit 8db0d3197f659a4099e481b464cb9a877b943d3c
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jul 14 16:08:16 2016 -0400
8555: Add log messages for time-parsing errors.
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index ad6ff5b..568bd33 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -524,10 +524,12 @@ func (v *S3Volume) EmptyTrash() {
}
trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
continue
}
recentT, err := v.lastModified(recent)
if err != nil {
+ log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
continue
}
if trashT.Sub(recentT) < blobSignatureTTL {
commit 1127e8884c809a35280d8e57dbe3bc1b8f8818a5
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jul 14 15:53:20 2016 -0400
8555: Implement trash with race-recovery for S3 volumes.
See https://dev.arvados.org/projects/arvados/wiki/S3_bucket_volume_implementation
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 98e1203..ad6ff5b 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,7 +18,7 @@ import (
)
var (
- ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
+ ErrS3TrashNotAvailable = fmt.Errorf("trash is not possible because -trash-lifetime=0 and -s3-unsafe-delete=false")
s3AccessKeyFile string
s3SecretKeyFile string
@@ -26,6 +26,7 @@ var (
s3Endpoint string
s3Replication int
s3UnsafeDelete bool
+ s3RaceWindow time.Duration
s3ACL = s3.Private
)
@@ -77,7 +78,7 @@ func (s *s3VolumeAdder) Set(bucketName string) error {
if flagSerializeIO {
log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
}
- v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
+ v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
if err := v.Check(); err != nil {
return err
}
@@ -116,6 +117,11 @@ func init() {
"s3-secret-key-file",
"",
"File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+ flag.DurationVar(
+ &s3RaceWindow,
+ "s3-race-window",
+ 24*time.Hour,
+ "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
flag.IntVar(
&s3Replication,
"s3-replication",
@@ -130,6 +136,7 @@ func init() {
type S3Volume struct {
*s3.Bucket
+ raceWindow time.Duration
readonly bool
replication int
indexPageSize int
@@ -138,12 +145,13 @@ type S3Volume struct {
// NewS3Volume returns a new S3Volume using the given auth, region,
// and bucket name. The replication argument specifies the replication
// level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
+func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
return &S3Volume{
Bucket: &s3.Bucket{
S3: s3.New(auth, region),
Name: bucket,
},
+ raceWindow: raceWindow,
readonly: readonly,
replication: replication,
indexPageSize: 1000,
@@ -154,10 +162,35 @@ func (v *S3Volume) Check() error {
return nil
}
+func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err error) {
+ rdr, err = v.Bucket.GetReader(loc)
+ err = v.translateError(err)
+ if err == nil || !os.IsNotExist(err) {
+ return
+ }
+ _, err = v.Bucket.Head("recent/"+loc, nil)
+ err = v.translateError(err)
+ if err != nil {
+ // If we can't read recent/X, there's no point in
+ // trying fixRace. Give up.
+ return
+ }
+ if !v.fixRace(loc) {
+ err = os.ErrNotExist
+ return
+ }
+ rdr, err = v.Bucket.GetReader(loc)
+ if err != nil {
+ log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+ err = v.translateError(err)
+ }
+ return
+}
+
func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
- rdr, err := v.Bucket.GetReader(loc)
+ rdr, err := v.getReaderWithFixRace(loc)
if err != nil {
- return 0, v.translateError(err)
+ return 0, err
}
defer rdr.Close()
n, err := io.ReadFull(rdr, buf)
@@ -170,9 +203,9 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
}
func (v *S3Volume) Compare(loc string, expect []byte) error {
- rdr, err := v.Bucket.GetReader(loc)
+ rdr, err := v.getReaderWithFixRace(loc)
if err != nil {
- return v.translateError(err)
+ return err
}
defer rdr.Close()
return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
@@ -190,71 +223,108 @@ func (v *S3Volume) Put(loc string, block []byte) error {
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
}
- return v.translateError(
- v.Bucket.Put(
- loc, block, "application/octet-stream", s3ACL, opts))
+ err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+ if err != nil {
+ return v.translateError(err)
+ }
+ err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ return v.translateError(err)
}
func (v *S3Volume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
}
- result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
- ContentType: "application/octet-stream",
- MetadataDirective: "REPLACE",
- }, v.Bucket.Name+"/"+loc)
- if err != nil {
- return v.translateError(err)
- }
- t, err := time.Parse(time.RFC3339, result.LastModified)
- if err != nil {
+ _, err := v.Bucket.Head(loc, nil)
+ err = v.translateError(err)
+ if os.IsNotExist(err) && v.fixRace(loc) {
+ // The data object got trashed in a race, but fixRace
+ // rescued it.
+ } else if err != nil {
return err
}
- if time.Since(t) > maxClockSkew {
- return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
- }
- return nil
+ err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ return v.translateError(err)
}
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
- resp, err := v.Bucket.Head(loc, nil)
+ _, err := v.Bucket.Head(loc, nil)
if err != nil {
return zeroTime, v.translateError(err)
}
- hdr := resp.Header.Get("Last-Modified")
- t, err := time.Parse(time.RFC1123, hdr)
- if err != nil && hdr != "" {
- // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
- // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
- // as required by HTTP spec. If it's not a valid HTTP
- // header value, it's probably AWS (or s3test) giving
- // us a nearly-RFC1123 timestamp.
- t, err = time.Parse(nearlyRFC1123, hdr)
+ resp, err := v.Bucket.Head("recent/"+loc, nil)
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ // The data object X exists, but recent/X is missing.
+ err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ if err != nil {
+ log.Printf("error: creating %q: %s", "recent/"+loc, err)
+ return zeroTime, v.translateError(err)
+ }
+ log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+ resp, err = v.Bucket.Head("recent/"+loc, nil)
+ if err != nil {
+ log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+ return zeroTime, v.translateError(err)
+ }
+ } else if err != nil {
+ // HEAD recent/X failed for some other reason.
+ return zeroTime, err
}
- return t, err
+ return v.lastModified(resp)
}
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
- nextMarker := ""
- for {
- listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
- if err != nil {
- return err
+ // Use a merge sort to find matching sets of X and recent/X.
+ dataL := s3Lister{
+ Bucket: v.Bucket,
+ Prefix: prefix,
+ PageSize: v.indexPageSize,
+ }
+ recentL := s3Lister{
+ Bucket: v.Bucket,
+ Prefix: "recent/"+prefix,
+ PageSize: v.indexPageSize,
+ }
+ for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
+ if data.Key >= "g" {
+ // Conveniently, "recent/*" and "trash/*" are
+ // lexically greater than all hex-encoded data
+ // hashes, so stopping here avoids iterating
+ // over all of them needlessly with dataL.
+ break
}
- for _, key := range listResp.Contents {
- t, err := time.Parse(time.RFC3339, key.LastModified)
- if err != nil {
- return err
- }
- if !v.isKeepBlock(key.Key) {
+ if !v.isKeepBlock(data.Key) {
+ continue
+ }
+
+ // stamp is the list entry we should use to report the
+ // last-modified time for this data block: it will be
+ // the recent/X entry if one exists, otherwise the
+ // entry for the data block itself.
+ stamp := data
+
+ // Advance to the corresponding recent/X marker, if any
+ for recent != nil {
+ if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
+ recent = recentL.Next()
continue
+ } else if cmp == 0 {
+ stamp = recent
+ recent = recentL.Next()
+ break
+ } else {
+ // recent/X marker is missing: we'll
+ // use the timestamp on the data
+ // object.
+ break
}
- fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.UnixNano())
}
- if !listResp.IsTruncated {
- break
+ t, err := time.Parse(time.RFC3339, stamp.LastModified)
+ if err != nil {
+ return err
}
- nextMarker = listResp.NextMarker
+ fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
}
return nil
}
@@ -264,23 +334,85 @@ func (v *S3Volume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
}
- if trashLifetime != 0 {
- return ErrNotImplemented
- }
if t, err := v.Mtime(loc); err != nil {
return err
} else if time.Since(t) < blobSignatureTTL {
return nil
}
- if !s3UnsafeDelete {
- return ErrS3DeleteNotAvailable
+ if trashLifetime == 0 {
+ if !s3UnsafeDelete {
+ return ErrS3TrashNotAvailable
+ }
+ return v.Bucket.Del(loc)
+ }
+
+ // Make sure we're not in the race window.
+ {
+ resp, err := v.Bucket.Head("trash/"+loc, nil)
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ // OK, trash/X doesn't exist so we're not in
+ // the race window
+ } else if err != nil {
+ // Error looking up trash/X. Unsafe to proceed
+ // without knowing whether we're in the race
+ // window
+ return err
+ } else if t, err := v.lastModified(resp); err != nil {
+ // Can't parse timestamp
+ return err
+ } else if safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow)); safeWindow <= 0 {
+ // We can't count on "touch trash/X" to
+ // prolong trash/X's lifetime. The new
+ // timestamp might not become visible until
+ // now+raceWindow, and EmptyTrash is allowed
+ // to delete trash/X before then.
+ return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ }
+ }
+
+ err := v.safeCopy("trash/"+loc, loc)
+ if err != nil {
+ return err
+ }
+ return v.translateError(v.Bucket.Del(loc))
+}
+
+func (v *S3Volume) safeCopy(dst, src string) error {
+ resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+ ContentType: "application/octet-stream",
+ MetadataDirective: "REPLACE",
+ }, v.Bucket.Name+"/"+src)
+ err = v.translateError(err)
+ if err != nil {
+ return err
+ }
+ if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
+ return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
+ } else if time.Now().Sub(t) > maxClockSkew {
+ return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
}
- return v.Bucket.Del(loc)
+ return nil
+}
+
+// Get the LastModified header from resp, and parse it as RFC1123 or
+// -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
+func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
+ s := resp.Header.Get("Last-Modified")
+ t, err = time.Parse(time.RFC1123, s)
+ if err != nil && s != "" {
+ // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
+ // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
+ // as required by HTTP spec. If it's not a valid HTTP
+ // header value, it's probably AWS (or s3test) giving
+ // us a nearly-RFC1123 timestamp.
+ t, err = time.Parse(nearlyRFC1123, s)
+ }
+ return
}
-// TBD
func (v *S3Volume) Untrash(loc string) error {
- return ErrNotImplemented
+ return v.safeCopy(loc, "trash/"+loc)
}
func (v *S3Volume) Status() *VolumeStatus {
@@ -308,6 +440,52 @@ func (v *S3Volume) isKeepBlock(s string) bool {
return s3KeepBlockRegexp.MatchString(s)
}
+// fixRace(X) is called when "recent/X" exists but "X" doesn't
+// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
+// there was a race between Put and Trash, fixRace recovers from the
+// race by Untrashing the block.
+func (v *S3Volume) fixRace(loc string) bool {
+ trash, err := v.Bucket.Head("trash/"+loc, nil)
+ if err != nil {
+ if !os.IsNotExist(v.translateError(err)) {
+ log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+ }
+ return false
+ }
+ trashTime, err := v.lastModified(trash)
+ if err != nil {
+ log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+ return false
+ }
+
+ recent, err := v.Bucket.Head("recent/"+loc, nil)
+ if err != nil {
+ log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+ return false
+ }
+ recentTime, err := v.lastModified(recent)
+ if err != nil {
+ log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+ return false
+ }
+
+ ageWhenTrashed := trashTime.Sub(recentTime)
+ if ageWhenTrashed >= blobSignatureTTL {
+ // No evidence of a race: block hasn't been written
+ // since it became eligible for Trash. No fix needed.
+ return false
+ }
+
+ log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+ log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+ err = v.safeCopy(loc, "trash/"+loc)
+ if err != nil {
+ log.Printf("error: fixRace: %s", err)
+ return false
+ }
+ return true
+}
+
func (v *S3Volume) translateError(err error) error {
switch err := err.(type) {
case *s3.Error:
@@ -325,6 +503,115 @@ func (v *S3Volume) translateError(err error) error {
// EmptyTrash looks for trashed blocks that exceeded trashLifetime
// and deletes them from the volume.
-// TBD
func (v *S3Volume) EmptyTrash() {
+ // Use a merge sort to find matching sets of trash/X and recent/X.
+ trashL := s3Lister{
+ Bucket: v.Bucket,
+ Prefix: "trash/",
+ PageSize: v.indexPageSize,
+ }
+ // Define "ready to delete" as "...when EmptyTrash started".
+ now := time.Now()
+ for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+ loc := trash.Key[6:]
+ if !v.isKeepBlock(loc) {
+ continue
+ }
+ recent, err := v.Bucket.Head("recent/"+loc, nil)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+ continue
+ }
+ trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+ if err != nil {
+ continue
+ }
+ recentT, err := v.lastModified(recent)
+ if err != nil {
+ continue
+ }
+ if trashT.Sub(recentT) < blobSignatureTTL {
+ v.fixRace(loc)
+ continue
+ }
+ if now.Sub(trashT) < trashLifetime {
+ continue
+ }
+ err = v.Bucket.Del(trash.Key)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+ continue
+ }
+ _, err = v.Bucket.Head(loc, nil)
+ if os.IsNotExist(err) {
+ err = v.Bucket.Del("recent/"+loc)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ }
+ } else if err != nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ }
+ }
+ if err := trashL.Error(); err != nil {
+ log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+ }
+}
+
+type s3Lister struct {
+ Bucket *s3.Bucket
+ Prefix string
+ PageSize int
+ nextMarker string
+ buf []s3.Key
+ err error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3Lister) First() *s3.Key {
+ lister.getPage()
+ return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3Lister) Next() *s3.Key {
+ if len(lister.buf) == 0 && lister.nextMarker != "" {
+ lister.getPage()
+ }
+ return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3Lister) Error() error {
+ return lister.err
+}
+
+func (lister *s3Lister) getPage() {
+ resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
+ lister.nextMarker = ""
+ if err != nil {
+ lister.err = err
+ return
+ }
+ if resp.IsTruncated {
+ lister.nextMarker = resp.NextMarker
+ }
+ lister.buf = make([]s3.Key, 0, len(resp.Contents))
+ for _, key := range resp.Contents {
+ if !strings.HasPrefix(key.Key, lister.Prefix) {
+ log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+ continue
+ }
+ lister.buf = append(lister.buf, key)
+ }
+}
+
+func (lister *s3Lister) pop() (k *s3.Key) {
+ if len(lister.buf) > 0 {
+ k = &lister.buf[0]
+ lister.buf = lister.buf[1:]
+ }
+ return
}
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 0c2cd49..73c6b76 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"log"
- "strings"
"time"
"github.com/AdRoll/goamz/aws"
@@ -41,7 +40,7 @@ func init() {
s3UnsafeDelete = true
}
-func NewTestableS3Volume(c *check.C, readonly bool, replication int) *TestableS3Volume {
+func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
clock := &fakeClock{}
srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
c.Assert(err, check.IsNil)
@@ -59,7 +58,7 @@ func NewTestableS3Volume(c *check.C, readonly bool, replication int) *TestableS3
c.Assert(err, check.IsNil)
return &TestableS3Volume{
- S3Volume: NewS3Volume(auth, region, TestBucketName, readonly, replication),
+ S3Volume: NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
server: srv,
serverClock: clock,
}
@@ -73,18 +72,20 @@ type StubbedS3Suite struct {
func (s *StubbedS3Suite) TestGeneric(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
- return NewTestableS3Volume(c, false, 2)
+ // Use a negative raceWindow so s3test's 1-second
+ // timestamp precision doesn't confuse fixRace.
+ return NewTestableS3Volume(c, -time.Second, false, 2)
})
}
func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
- return NewTestableS3Volume(c, true, 2)
+ return NewTestableS3Volume(c, -time.Second, true, 2)
})
}
func (s *StubbedS3Suite) TestIndex(c *check.C) {
- v := NewTestableS3Volume(c, false, 2)
+ v := NewTestableS3Volume(c, 0, false, 2)
v.indexPageSize = 3
for i := 0; i < 256; i++ {
v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
@@ -121,9 +122,9 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
// while we do this.
func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
v.serverClock.now = &lastPut
- err := v.Touch(locator)
- if err != nil && !strings.Contains(err.Error(), "PutCopy returned old LastModified") {
- log.Printf("Touch: %+v", err)
+ err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+ if err != nil {
+ panic(err)
}
v.serverClock.now = nil
}
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 4291c6c..c17c837 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -741,7 +741,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
defer func() {
- trashLifetime = 0 * time.Second
+ trashLifetime = 0
}()
trashLifetime = 3600 * time.Second
@@ -830,7 +830,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
// First set: EmptyTrash before reaching the trash deadline.
- trashLifetime = 1 * time.Hour
+ trashLifetime = time.Hour
v.PutRaw(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
@@ -880,14 +880,15 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
// Because we Touch'ed, need to backdate again for next set of tests
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- // Untrash should fail if the only block in the trash has
- // already been untrashed.
+ // If the only block in the trash has already been untrashed,
+ // most volumes will fail a subsequent Untrash with a 404, but
+ // it's also acceptable for Untrash to succeed.
err = v.Untrash(TestHash)
- if err == nil || !os.IsNotExist(err) {
- t.Fatalf("os.IsNotExist(%v) should have been true", err)
+ if err != nil && !os.IsNotExist(err) {
+ t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
}
- // The failed Untrash should not interfere with our
+ // The additional Untrash should not interfere with our
// already-untrashed copy.
err = checkGet()
if err != nil {
@@ -896,7 +897,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
// Second set: EmptyTrash after the trash deadline has passed.
- trashLifetime = 1 * time.Nanosecond
+ trashLifetime = time.Nanosecond
err = v.Trash(TestHash)
if err != nil {
@@ -927,7 +928,6 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
if err == nil || !os.IsNotExist(err) {
t.Fatalf("os.IsNotExist(%v) should have been true", err)
}
- // EmptryTrash
v.EmptyTrash()
// Untrash won't find it
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list