[ARVADOS] updated: 7ba6bdf406546ec225baea49dbe6ccbf02e70f53

Git user git at public.curoverse.com
Wed Jul 27 17:04:29 EDT 2016


Summary of changes:
 build/run-tests.sh                        |   4 +-
 services/keep-balance/balance_run_test.go |   2 +-
 services/keepstore/handlers.go            |  18 +-
 services/keepstore/pull_worker.go         |   5 +-
 services/keepstore/s3_volume.go           | 487 ++++++++++++++++++++++++++----
 services/keepstore/s3_volume_test.go      | 217 ++++++++++++-
 services/keepstore/volume_generic_test.go |  21 +-
 services/keepstore/volume_unix.go         |  13 +-
 8 files changed, 671 insertions(+), 96 deletions(-)

       via  7ba6bdf406546ec225baea49dbe6ccbf02e70f53 (commit)
       via  d65114fdee11cfd20833a96c34ebf5346baeb755 (commit)
       via  d9f2aaaa6b5762f448276ce96b6994245062a4c8 (commit)
       via  70e9fc0a1e57fb4d85d985b4c9258d7a5615b3bb (commit)
       via  e1f5b9c3c303502db7ef0e3c7a19c8edfbbb2183 (commit)
       via  482afcd3cda97329e68b2b77f7f4a32da6ea08ef (commit)
       via  78d74f846e1b35b6b65d76c345227845d00a9722 (commit)
       via  d137cbc6cfdcc541216e68d414c535626e4d8916 (commit)
       via  c15c0971e10534f36748feae87b1b73a386fd9b1 (commit)
       via  99b14829e38a823f37ff1f040c9f0777e42f0d67 (commit)
       via  e42bc5d373290314195c47ededb4fdacc90f7aa0 (commit)
       via  094024f2476d84a5b9a453ede79b243e1d282bab (commit)
       via  a6d2f88debdfa7bc390b63c1f18a0541987ae0b8 (commit)
       via  e8de69eac0308e5965bdbe13754764d4fea9b8b3 (commit)
       via  35ea47144857dc16ab8b6b8a272a87af6d50cb88 (commit)
       via  d858874f6c39bdcfbe3de383933aa4e68b2780f6 (commit)
       via  8db0d3197f659a4099e481b464cb9a877b943d3c (commit)
       via  1127e8884c809a35280d8e57dbe3bc1b8f8818a5 (commit)
      from  be87361dedf4e35405616e802fba12dedf86dfde (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7ba6bdf406546ec225baea49dbe6ccbf02e70f53
Merge: be87361 d65114f
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jul 27 15:22:26 2016 -0400

    Merge branch '8555-s3-trash'
    
    closes #8555


commit d65114fdee11cfd20833a96c34ebf5346baeb755
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jul 27 11:29:17 2016 -0400

    8555: Fail Go tests if not gofmted.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index c9cb2b2..6331ec2 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -520,6 +520,8 @@ do_test_once() {
             # mode makes Go show the wrong line numbers when reporting
             # compilation errors.
             go get -t "git.curoverse.com/arvados.git/$1" || return 1
+            cd "$WORKSPACE/$1" || return 1
+            gofmt -e -d . | egrep . && result=1
             if [[ -n "${testargs[$1]}" ]]
             then
                 # "go test -check.vv giturl" doesn't work, but this
@@ -530,7 +532,7 @@ do_test_once() {
                 # empty, so use this form in such cases:
                 go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
             fi
-            result="$?"
+            result=${result:-$?}
             if [[ -f "$WORKSPACE/tmp/.$covername.tmp" ]]
             then
                 go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"

commit d9f2aaaa6b5762f448276ce96b6994245062a4c8
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 16:29:57 2016 -0400

    8555: golint

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f698982..a6798a9 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -73,7 +73,7 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	if enforcePermissions {
 		locator := req.URL.Path[1:] // strip leading slash
-		if err := VerifySignature(locator, GetApiToken(req)); err != nil {
+		if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
 			http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
 			return
 		}
@@ -184,7 +184,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	// Success; add a size hint, sign the locator if possible, and
 	// return it to the client.
 	returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
-	apiToken := GetApiToken(req)
+	apiToken := GetAPIToken(req)
 	if PermissionSecret != nil && apiToken != "" {
 		expiry := time.Now().Add(blobSignatureTTL)
 		returnHash = SignLocator(returnHash, apiToken, expiry)
@@ -196,7 +196,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
 	// Reject unauthorized requests.
-	if !IsDataManagerToken(GetApiToken(req)) {
+	if !IsDataManagerToken(GetAPIToken(req)) {
 		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
 		return
 	}
@@ -328,7 +328,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
 	hash := mux.Vars(req)["hash"]
 
 	// Confirm that this user is an admin and has a token with unlimited scope.
-	var tok = GetApiToken(req)
+	var tok = GetAPIToken(req)
 	if tok == "" || !CanDelete(tok) {
 		http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
 		return
@@ -419,7 +419,7 @@ type PullRequest struct {
 // PullHandler processes "PUT /pull" requests for the data manager.
 func PullHandler(resp http.ResponseWriter, req *http.Request) {
 	// Reject unauthorized requests.
-	if !IsDataManagerToken(GetApiToken(req)) {
+	if !IsDataManagerToken(GetAPIToken(req)) {
 		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
 		return
 	}
@@ -455,7 +455,7 @@ type TrashRequest struct {
 // TrashHandler processes /trash requests.
 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 	// Reject unauthorized requests.
-	if !IsDataManagerToken(GetApiToken(req)) {
+	if !IsDataManagerToken(GetAPIToken(req)) {
 		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
 		return
 	}
@@ -485,7 +485,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 	// Reject unauthorized requests.
-	if !IsDataManagerToken(GetApiToken(req)) {
+	if !IsDataManagerToken(GetAPIToken(req)) {
 		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
 		return
 	}
@@ -714,10 +714,10 @@ func IsValidLocator(loc string) bool {
 
 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
 
-// GetApiToken returns the OAuth2 token from the Authorization
+// GetAPIToken returns the OAuth2 token from the Authorization
 // header of a HTTP request, or an empty string if no matching
 // token is found.
-func GetApiToken(req *http.Request) string {
+func GetAPIToken(req *http.Request) string {
 	if auth, ok := req.Header["Authorization"]; ok {
 		if match := authRe.FindStringSubmatch(auth[0]); match != nil {
 			return match[1]
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 2626d4b..d53d106 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -2,7 +2,6 @@ package main
 
 import (
 	"crypto/rand"
-	"errors"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"io"
@@ -57,7 +56,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
 		return
 	}
 	if reader == nil {
-		return errors.New(fmt.Sprintf("No reader found for : %s", signedLocator))
+		return fmt.Errorf("No reader found for : %s", signedLocator)
 	}
 	defer reader.Close()
 
@@ -67,7 +66,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepc
 	}
 
 	if (readContent == nil) || (int64(len(readContent)) != contentLen) {
-		return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
+		return fmt.Errorf("Content not found for: %s", signedLocator)
 	}
 
 	err = PutContent(readContent, pullRequest.Locator)

commit 70e9fc0a1e57fb4d85d985b4c9258d7a5615b3bb
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 16:26:15 2016 -0400

    8555: Fix up comments.

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index f39a9df..1f62f4a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,8 +18,8 @@ import (
 )
 
 var (
-	// Returned by Trash if that operation is impossible with the
-	// current config.
+	// ErrS3TrashDisabled is returned by Trash if that operation
+	// is impossible with the current config.
 	ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
 
 	s3AccessKeyFile string
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 65ce2be..6ba3904 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -250,6 +250,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			return loc, blk
 		}
 
+		// Check canGet
 		loc, blk := setupScenario()
 		buf := make([]byte, len(blk))
 		_, err := v.Get(loc, buf)
@@ -258,6 +259,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			c.Check(os.IsNotExist(err), check.Equals, true)
 		}
 
+		// Call Trash, then check canTrash and canGetAfterTrash
 		loc, blk = setupScenario()
 		err = v.Trash(loc)
 		c.Check(err == nil, check.Equals, scenario.canTrash)
@@ -267,6 +269,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			c.Check(os.IsNotExist(err), check.Equals, true)
 		}
 
+		// Call Untrash, then check canUntrash
 		loc, blk = setupScenario()
 		err = v.Untrash(loc)
 		c.Check(err == nil, check.Equals, scenario.canUntrash)
@@ -279,6 +282,8 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			c.Check(err, check.IsNil)
 		}
 
+		// Call EmptyTrash, then check haveTrashAfterEmpty and
+		// freshAfterEmpty
 		loc, blk = setupScenario()
 		v.EmptyTrash()
 		_, err = v.Bucket.Head("trash/"+loc, nil)
@@ -291,6 +296,8 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
 		}
 
+		// Check for current Mtime after Put (applies to all
+		// scenarios)
 		loc, blk = setupScenario()
 		err = v.Put(loc, blk)
 		c.Check(err, check.IsNil)

commit e1f5b9c3c303502db7ef0e3c7a19c8edfbbb2183
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 16:20:30 2016 -0400

    8555: gofmt

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index da24daf..f39a9df 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -592,7 +592,7 @@ func (v *S3Volume) EmptyTrash() {
 			continue
 		}
 		if trashT.Sub(recentT) < blobSignatureTTL {
-			if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+			if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
 				// recent/loc is too old to protect
 				// loc from being Trashed again during
 				// the raceWindow that starts if we

commit 482afcd3cda97329e68b2b77f7f4a32da6ea08ef
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 12:25:35 2016 -0400

    8555: Test Get() after successful Untrash. Test Put+Mtime in all scenarios.

diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index e41e04b..65ce2be 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -270,6 +270,14 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		loc, blk = setupScenario()
 		err = v.Untrash(loc)
 		c.Check(err == nil, check.Equals, scenario.canUntrash)
+		if scenario.dataT != none || scenario.trashT != none {
+			// In all scenarios where the data exists, we
+			// should be able to Get after Untrash --
+			// regardless of timestamps, errors, race
+			// conditions, etc.
+			_, err = v.Get(loc, buf)
+			c.Check(err, check.IsNil)
+		}
 
 		loc, blk = setupScenario()
 		v.EmptyTrash()
@@ -282,6 +290,13 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			// allowance for 1s timestamp precision)
 			c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
 		}
+
+		loc, blk = setupScenario()
+		err = v.Put(loc, blk)
+		c.Check(err, check.IsNil)
+		t, err := v.Mtime(loc)
+		c.Check(err, check.IsNil)
+		c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
 	}
 }
 

commit 78d74f846e1b35b6b65d76c345227845d00a9722
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 11:07:46 2016 -0400

    8555: Untrash to repair inconsistent state (trash/X without recentX).

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 0cfaaaf..da24daf 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -575,8 +575,15 @@ func (v *S3Volume) EmptyTrash() {
 			continue
 		}
 		recent, err := v.Bucket.Head("recent/"+loc, nil)
-		if err != nil {
-			log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+		if err != nil && os.IsNotExist(v.translateError(err)) {
+			log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+			err = v.Untrash(loc)
+			if err != nil {
+				log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+			}
+			continue
+		} else if err != nil {
+			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
 			continue
 		}
 		recentT, err := v.lastModified(recent)
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 8a06f19..e41e04b 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -223,6 +223,11 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
 			true, false, true, true, true, false,
 		},
+		{
+			"Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
+			none, none, t0.Add(-time.Minute),
+			false, false, false, true, true, true,
+		},
 	} {
 		c.Log("Scenario: ", scenario.label)
 

commit d137cbc6cfdcc541216e68d414c535626e4d8916
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 09:44:56 2016 -0400

    8555: Log statistics in EmptyTrash.

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index be4da4d..0cfaaaf 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -551,6 +551,8 @@ func (v *S3Volume) translateError(err error) error {
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
 func (v *S3Volume) EmptyTrash() {
+	var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
 	// Use a merge sort to find matching sets of trash/X and recent/X.
 	trashL := s3Lister{
 		Bucket:   v.Bucket,
@@ -564,6 +566,9 @@ func (v *S3Volume) EmptyTrash() {
 		if !v.isKeepBlock(loc) {
 			continue
 		}
+		bytesInTrash += trash.Size
+		blocksInTrash++
+
 		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
 		if err != nil {
 			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
@@ -610,6 +615,9 @@ func (v *S3Volume) EmptyTrash() {
 			log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
 			continue
 		}
+		bytesDeleted += trash.Size
+		blocksDeleted++
+
 		_, err = v.Bucket.Head(loc, nil)
 		if os.IsNotExist(err) {
 			err = v.Bucket.Del("recent/" + loc)
@@ -623,6 +631,7 @@ func (v *S3Volume) EmptyTrash() {
 	if err := trashL.Error(); err != nil {
 		log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
 	}
+	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type s3Lister struct {

commit c15c0971e10534f36748feae87b1b73a386fd9b1
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 09:38:07 2016 -0400

    8555: Improve variable names and comments.

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 8ded160..be4da4d 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -558,7 +558,7 @@ func (v *S3Volume) EmptyTrash() {
 		PageSize: v.indexPageSize,
 	}
 	// Define "ready to delete" as "...when EmptyTrash started".
-	now := time.Now()
+	startT := time.Now()
 	for trash := trashL.First(); trash != nil; trash = trashL.Next() {
 		loc := trash.Key[6:]
 		if !v.isKeepBlock(loc) {
@@ -580,7 +580,7 @@ func (v *S3Volume) EmptyTrash() {
 			continue
 		}
 		if trashT.Sub(recentT) < blobSignatureTTL {
-			if age := now.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+			if age := startT.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
 				// recent/loc is too old to protect
 				// loc from being Trashed again during
 				// the raceWindow that starts if we
@@ -602,7 +602,7 @@ func (v *S3Volume) EmptyTrash() {
 				continue
 			}
 		}
-		if now.Sub(trashT) < trashLifetime {
+		if startT.Sub(trashT) < trashLifetime {
 			continue
 		}
 		err = v.Bucket.Del(trash.Key)
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 0bb818c..8a06f19 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -122,7 +122,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 	v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
 	var none time.Time
 
-	stubKey := func(t time.Time, key string, data []byte) {
+	putS3Obj := func(t time.Time, key string, data []byte) {
 		if t == none {
 			return
 		}
@@ -134,9 +134,9 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 	nextKey := 0
 	for _, scenario := range []struct {
 		label               string
-		data                time.Time
-		recent              time.Time
-		trash               time.Time
+		dataT               time.Time
+		recentT             time.Time
+		trashT              time.Time
 		canGet              bool
 		canTrash            bool
 		canGetAfterTrash    bool
@@ -159,42 +159,42 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			true, true, true, false, false, false,
 		},
 		{
-			"Not trash; old enough to trash",
+			"Not trash, but old enough to be eligible for trash",
 			t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
 			true, true, false, false, false, false,
 		},
 		{
-			"Not trash; not old enough to trash",
+			"Not trash, and not old enough to be eligible for trash",
 			t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
 			true, true, true, false, false, false,
 		},
 		{
-			"Trash + not-trash: recent race between Trash and Put",
+			"Trashed + untrashed copies exist, due to recent race between Trash and Put",
 			t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
 			true, true, true, true, true, false,
 		},
 		{
-			"Trash + not-trash, nearly eligible for deletion, prone to Trash race",
+			"Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
 			true, false, true, true, true, false,
 		},
 		{
-			"Trash + not-trash, eligible for deletion, prone to Trash race",
+			"Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
 			true, false, true, true, false, false,
 		},
 		{
-			"Trash + not-trash, unsafe to empty; old race between Put and unfinished Trash",
+			"Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
 			true, false, true, true, true, true,
 		},
 		{
-			"Trash + not-trash, was unsafe to empty, but since made safe by fixRace+Touch",
+			"Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
 			t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
 			true, true, true, true, false, false,
 		},
 		{
-			"Trash operation was interrupted",
+			"Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
 			t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
 			true, false, true, true, false, false,
 		},
@@ -233,19 +233,19 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		// locator to prevent interference from previous
 		// tests.
 
-		setup := func() (string, []byte) {
+		setupScenario := func() (string, []byte) {
 			nextKey++
 			blk := []byte(fmt.Sprintf("%d", nextKey))
 			loc := fmt.Sprintf("%x", md5.Sum(blk))
 			c.Log("\t", loc)
-			stubKey(scenario.data, loc, blk)
-			stubKey(scenario.recent, "recent/"+loc, nil)
-			stubKey(scenario.trash, "trash/"+loc, blk)
+			putS3Obj(scenario.dataT, loc, blk)
+			putS3Obj(scenario.recentT, "recent/"+loc, nil)
+			putS3Obj(scenario.trashT, "trash/"+loc, blk)
 			v.serverClock.now = &t0
 			return loc, blk
 		}
 
-		loc, blk := setup()
+		loc, blk := setupScenario()
 		buf := make([]byte, len(blk))
 		_, err := v.Get(loc, buf)
 		c.Check(err == nil, check.Equals, scenario.canGet)
@@ -253,7 +253,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			c.Check(os.IsNotExist(err), check.Equals, true)
 		}
 
-		loc, blk = setup()
+		loc, blk = setupScenario()
 		err = v.Trash(loc)
 		c.Check(err == nil, check.Equals, scenario.canTrash)
 		_, err = v.Get(loc, buf)
@@ -262,11 +262,11 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			c.Check(os.IsNotExist(err), check.Equals, true)
 		}
 
-		loc, blk = setup()
+		loc, blk = setupScenario()
 		err = v.Untrash(loc)
 		c.Check(err == nil, check.Equals, scenario.canUntrash)
 
-		loc, blk = setup()
+		loc, blk = setupScenario()
 		v.EmptyTrash()
 		_, err = v.Bucket.Head("trash/"+loc, nil)
 		c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)

commit 99b14829e38a823f37ff1f040c9f0777e42f0d67
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jul 26 09:19:30 2016 -0400

    8555: Reformat test cases.

diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index fb0bea2..0bb818c 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -147,7 +147,8 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		{
 			"No related objects",
 			none, none, none,
-			false, false, false, false, false, false},
+			false, false, false, false, false, false,
+		},
 		{
 			// Stored by older version, or there was a
 			// race between EmptyTrash and Put: Trash is a
@@ -155,59 +156,73 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			// old
 			"No recent/X",
 			t0.Add(-48 * time.Hour), none, none,
-			true, true, true, false, false, false},
+			true, true, true, false, false, false,
+		},
 		{
 			"Not trash; old enough to trash",
 			t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
-			true, true, false, false, false, false},
+			true, true, false, false, false, false,
+		},
 		{
 			"Not trash; not old enough to trash",
 			t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
-			true, true, true, false, false, false},
+			true, true, true, false, false, false,
+		},
 		{
 			"Trash + not-trash: recent race between Trash and Put",
 			t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
-			true, true, true, true, true, false},
+			true, true, true, true, true, false,
+		},
 		{
 			"Trash + not-trash, nearly eligible for deletion, prone to Trash race",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
-			true, false, true, true, true, false},
+			true, false, true, true, true, false,
+		},
 		{
 			"Trash + not-trash, eligible for deletion, prone to Trash race",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
-			true, false, true, true, false, false},
+			true, false, true, true, false, false,
+		},
 		{
 			"Trash + not-trash, unsafe to empty; old race between Put and unfinished Trash",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
-			true, false, true, true, true, true},
+			true, false, true, true, true, true,
+		},
 		{
 			"Trash + not-trash, was unsafe to empty, but since made safe by fixRace+Touch",
 			t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
-			true, true, true, true, false, false},
+			true, true, true, true, false, false,
+		},
 		{
 			"Trash operation was interrupted",
 			t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
-			true, false, true, true, false, false},
+			true, false, true, true, false, false,
+		},
 		{
 			"Trash, not yet eligible for deletion",
 			none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
-			false, false, false, true, true, false},
+			false, false, false, true, true, false,
+		},
 		{
 			"Trash, not yet eligible for deletion, prone to races",
 			none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
-			false, false, false, true, true, false},
+			false, false, false, true, true, false,
+		},
 		{
 			"Trash, eligible for deletion",
 			none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
-			false, false, false, true, false, false},
+			false, false, false, true, false, false,
+		},
 		{
 			"Erroneously trashed during a race, detected before trashLifetime",
 			none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
-			true, false, true, true, true, false},
+			true, false, true, true, true, false,
+		},
 		{
 			"Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
 			none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
-			true, false, true, true, true, false},
+			true, false, true, true, true, false,
+		},
 	} {
 		c.Log("Scenario: ", scenario.label)
 

commit e42bc5d373290314195c47ededb4fdacc90f7aa0
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Jul 22 14:04:34 2016 -0400

    8555: De-obfuscate test case.

diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 59402dc..fb0bea2 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -132,7 +132,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 
 	t0 := time.Now()
 	nextKey := 0
-	for _, test := range []struct {
+	for _, scenario := range []struct {
 		label               string
 		data                time.Time
 		recent              time.Time
@@ -209,47 +209,53 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
 			true, false, true, true, true, false},
 	} {
-		c.Log("Scenario: ", test.label)
-		var loc string
-		var blk []byte
+		c.Log("Scenario: ", scenario.label)
 
-		setup := func() {
+		// We have a few tests to run for each scenario, and
+		// the tests are expected to change state. By calling
+		// this setup func between tests, we (re)create the
+		// scenario as specified, using a new unique block
+		// locator to prevent interference from previous
+		// tests.
+
+		setup := func() (string, []byte) {
 			nextKey++
-			blk = []byte(fmt.Sprintf("%d", nextKey))
-			loc = fmt.Sprintf("%x", md5.Sum(blk))
+			blk := []byte(fmt.Sprintf("%d", nextKey))
+			loc := fmt.Sprintf("%x", md5.Sum(blk))
 			c.Log("\t", loc)
-			stubKey(test.data, loc, blk)
-			stubKey(test.recent, "recent/"+loc, nil)
-			stubKey(test.trash, "trash/"+loc, blk)
+			stubKey(scenario.data, loc, blk)
+			stubKey(scenario.recent, "recent/"+loc, nil)
+			stubKey(scenario.trash, "trash/"+loc, blk)
 			v.serverClock.now = &t0
+			return loc, blk
 		}
 
-		setup()
+		loc, blk := setup()
 		buf := make([]byte, len(blk))
 		_, err := v.Get(loc, buf)
-		c.Check(err == nil, check.Equals, test.canGet)
+		c.Check(err == nil, check.Equals, scenario.canGet)
 		if err != nil {
 			c.Check(os.IsNotExist(err), check.Equals, true)
 		}
 
-		setup()
+		loc, blk = setup()
 		err = v.Trash(loc)
-		c.Check(err == nil, check.Equals, test.canTrash)
+		c.Check(err == nil, check.Equals, scenario.canTrash)
 		_, err = v.Get(loc, buf)
-		c.Check(err == nil, check.Equals, test.canGetAfterTrash)
+		c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
 		if err != nil {
 			c.Check(os.IsNotExist(err), check.Equals, true)
 		}
 
-		setup()
+		loc, blk = setup()
 		err = v.Untrash(loc)
-		c.Check(err == nil, check.Equals, test.canUntrash)
+		c.Check(err == nil, check.Equals, scenario.canUntrash)
 
-		setup()
+		loc, blk = setup()
 		v.EmptyTrash()
 		_, err = v.Bucket.Head("trash/"+loc, nil)
-		c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
-		if test.freshAfterEmpty {
+		c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+		if scenario.freshAfterEmpty {
 			t, err := v.Mtime(loc)
 			c.Check(err, check.IsNil)
 			// new mtime must be current (with an

commit 094024f2476d84a5b9a453ede79b243e1d282bab
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Jul 21 11:44:17 2016 -0400

    8555: Fix EmptyTrash so it can clean up old races.

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 5b00d4a..8ded160 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -564,14 +564,14 @@ func (v *S3Volume) EmptyTrash() {
 		if !v.isKeepBlock(loc) {
 			continue
 		}
-		recent, err := v.Bucket.Head("recent/"+loc, nil)
+		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
 		if err != nil {
-			log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
 			continue
 		}
-		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+		recent, err := v.Bucket.Head("recent/"+loc, nil)
 		if err != nil {
-			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+			log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
 			continue
 		}
 		recentT, err := v.lastModified(recent)
@@ -580,8 +580,27 @@ func (v *S3Volume) EmptyTrash() {
 			continue
 		}
 		if trashT.Sub(recentT) < blobSignatureTTL {
-			v.fixRace(loc)
-			continue
+			if age := now.Sub(recentT); age >= blobSignatureTTL - v.raceWindow {
+				// recent/loc is too old to protect
+				// loc from being Trashed again during
+				// the raceWindow that starts if we
+				// delete trash/X now.
+				//
+				// Note this means (trashCheckInterval
+				// < blobSignatureTTL - raceWindow) is
+				// necessary to avoid starvation.
+				log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+				v.fixRace(loc)
+				v.Touch(loc)
+				continue
+			} else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+				log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+				v.fixRace(loc)
+				continue
+			} else if err != nil {
+				log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+				continue
+			}
 		}
 		if now.Sub(trashT) < trashLifetime {
 			continue
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index a40b84f..59402dc 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -142,11 +142,12 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		canGetAfterTrash    bool
 		canUntrash          bool
 		haveTrashAfterEmpty bool
+		freshAfterEmpty     bool
 	}{
 		{
 			"No related objects",
 			none, none, none,
-			false, false, false, false, false},
+			false, false, false, false, false, false},
 		{
 			// Stored by older version, or there was a
 			// race between EmptyTrash and Put: Trash is a
@@ -154,56 +155,59 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			// old
 			"No recent/X",
 			t0.Add(-48 * time.Hour), none, none,
-			true, true, true, false, false},
+			true, true, true, false, false, false},
 		{
 			"Not trash; old enough to trash",
 			t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
-			true, true, false, false, false},
+			true, true, false, false, false, false},
 		{
 			"Not trash; not old enough to trash",
 			t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
-			true, true, true, false, false},
+			true, true, true, false, false, false},
 		{
 			"Trash + not-trash: recent race between Trash and Put",
 			t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
-			true, true, true, true, true},
+			true, true, true, true, true, false},
 		{
 			"Trash + not-trash, nearly eligible for deletion, prone to Trash race",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
-			true, false, true, true, true},
+			true, false, true, true, true, false},
 		{
 			"Trash + not-trash, eligible for deletion, prone to Trash race",
 			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
-			true, false, true, true, false},
-		// FIXME: old trash never gets deleted!
-		// {
-		// 	"Not trash; old race between Trash and Put, or incomplete Trash",
-		// 	t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
-		// 	true, false, true, true, false},
+			true, false, true, true, false, false},
+		{
+			"Trash + not-trash, unsafe to empty; old race between Put and unfinished Trash",
+			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+			true, false, true, true, true, true},
+		{
+			"Trash + not-trash, was unsafe to empty, but since made safe by fixRace+Touch",
+			t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
+			true, true, true, true, false, false},
 		{
 			"Trash operation was interrupted",
 			t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
-			true, false, true, true, false},
+			true, false, true, true, false, false},
 		{
 			"Trash, not yet eligible for deletion",
 			none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
-			false, false, false, true, true},
+			false, false, false, true, true, false},
 		{
 			"Trash, not yet eligible for deletion, prone to races",
 			none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
-			false, false, false, true, true},
+			false, false, false, true, true, false},
 		{
 			"Trash, eligible for deletion",
 			none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
-			false, false, false, true, false},
+			false, false, false, true, false, false},
 		{
 			"Erroneously trashed during a race, detected before trashLifetime",
 			none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
-			true, false, true, true, true},
+			true, false, true, true, true, false},
 		{
 			"Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
 			none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
-			true, false, true, true, true},
+			true, false, true, true, true, false},
 	} {
 		c.Log("Scenario: ", test.label)
 		var loc string
@@ -245,6 +249,13 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		v.EmptyTrash()
 		_, err = v.Bucket.Head("trash/"+loc, nil)
 		c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
+		if test.freshAfterEmpty {
+			t, err := v.Mtime(loc)
+			c.Check(err, check.IsNil)
+			// new mtime must be current (with an
+			// allowance for 1s timestamp precision)
+			c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+		}
 	}
 }
 

commit a6d2f88debdfa7bc390b63c1f18a0541987ae0b8
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jul 20 11:56:59 2016 -0400

    8555: golint

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 9b7b51a..5b00d4a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,6 +18,8 @@ import (
 )
 
 var (
+	// Returned by Trash if that operation is impossible with the
+	// current config.
 	ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
 
 	s3AccessKeyFile string
@@ -134,6 +136,7 @@ func init() {
 		"EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
 }
 
+// S3Volume implements Volume using an S3 bucket.
 type S3Volume struct {
 	*s3.Bucket
 	raceWindow    time.Duration
@@ -158,6 +161,8 @@ func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow tim
 	}
 }
 
+// Check returns an error if the volume is inaccessible (e.g., config
+// error).
 func (v *S3Volume) Check() error {
 	return nil
 }
@@ -192,6 +197,8 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 	return
 }
 
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
 	rdr, err := v.getReader(loc)
 	if err != nil {
@@ -207,6 +214,7 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
 	}
 }
 
+// Compare the given data with the stored data.
 func (v *S3Volume) Compare(loc string, expect []byte) error {
 	rdr, err := v.getReader(loc)
 	if err != nil {
@@ -216,6 +224,7 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
 	return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
 }
 
+// Put writes a block.
 func (v *S3Volume) Put(loc string, block []byte) error {
 	if v.readonly {
 		return MethodDisabledError
@@ -236,6 +245,7 @@ func (v *S3Volume) Put(loc string, block []byte) error {
 	return v.translateError(err)
 }
 
+// Touch sets the timestamp for the given locator to the current time.
 func (v *S3Volume) Touch(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
@@ -252,6 +262,7 @@ func (v *S3Volume) Touch(loc string) error {
 	return v.translateError(err)
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 	_, err := v.Bucket.Head(loc, nil)
 	if err != nil {
@@ -279,6 +290,8 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 	return v.lastModified(resp)
 }
 
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 	// Use a merge sort to find matching sets of X and recent/X.
 	dataL := s3Lister{
@@ -393,6 +406,10 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
 	return nil
 }
 
+// safeCopy calls PutCopy, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+// (PutCopy returns 200 OK if the request was received, even if the
+// copy failed).
 func (v *S3Volume) safeCopy(dst, src string) error {
 	resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
 		ContentType:       "application/octet-stream",
@@ -426,6 +443,7 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
 	return
 }
 
+// Untrash moves block from trash back into store
 func (v *S3Volume) Untrash(loc string) error {
 	err := v.safeCopy(loc, "trash/"+loc)
 	if err != nil {
@@ -435,6 +453,9 @@ func (v *S3Volume) Untrash(loc string) error {
 	return v.translateError(err)
 }
 
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
 func (v *S3Volume) Status() *VolumeStatus {
 	return &VolumeStatus{
 		DeviceNum: 1,
@@ -443,13 +464,19 @@ func (v *S3Volume) Status() *VolumeStatus {
 	}
 }
 
+// String implements fmt.Stringer.
 func (v *S3Volume) String() string {
 	return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *S3Volume) Writable() bool {
 	return !v.readonly
 }
+
+// Replication returns the storage redundancy of the underlying
+// device. Configured via command line flag.
 func (v *S3Volume) Replication() int {
 	return v.replication
 }
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 90189dc..5982fb0 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -306,7 +306,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-	var lastErr error = nil
+	var lastErr error
 	rootdir, err := os.Open(v.root)
 	if err != nil {
 		return err
@@ -397,10 +397,8 @@ func (v *UnixVolume) Trash(loc string) error {
 	// anyway (because the permission signatures have expired).
 	if fi, err := os.Stat(p); err != nil {
 		return err
-	} else {
-		if time.Since(fi.ModTime()) < blobSignatureTTL {
-			return nil
-		}
+	} else if time.Since(fi.ModTime()) < blobSignatureTTL {
+		return nil
 	}
 
 	if trashLifetime == 0 {
@@ -506,11 +504,14 @@ func (v *UnixVolume) String() string {
 	return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
-// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *UnixVolume) Writable() bool {
 	return !v.readonly
 }
 
+// Replication returns the number of replicas promised by the
+// underlying device (currently assumed to be 1).
 func (v *UnixVolume) Replication() int {
 	return 1
 }

commit e8de69eac0308e5965bdbe13754764d4fea9b8b3
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jul 20 11:42:14 2016 -0400

    8555: gofmt

diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index edc88aa..23d74fe 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -371,5 +371,5 @@ func (s *runSuite) TestRunForever(c *check.C) {
 	}
 	stop <- true
 	c.Check(pullReqs.Count() >= 16, check.Equals, true)
-	c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
+	c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
 }
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 5eb9941..9b7b51a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -288,7 +288,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 	}
 	recentL := s3Lister{
 		Bucket:   v.Bucket,
-		Prefix:   "recent/"+prefix,
+		Prefix:   "recent/" + prefix,
 		PageSize: v.indexPageSize,
 	}
 	for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
@@ -566,7 +566,7 @@ func (v *S3Volume) EmptyTrash() {
 		}
 		_, err = v.Bucket.Head(loc, nil)
 		if os.IsNotExist(err) {
-			err = v.Bucket.Del("recent/"+loc)
+			err = v.Bucket.Del("recent/" + loc)
 			if err != nil {
 				log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
 			}
@@ -580,12 +580,12 @@ func (v *S3Volume) EmptyTrash() {
 }
 
 type s3Lister struct {
-	Bucket      *s3.Bucket
-	Prefix      string
-	PageSize    int
-	nextMarker  string
-	buf         []s3.Key
-	err         error
+	Bucket     *s3.Bucket
+	Prefix     string
+	PageSize   int
+	nextMarker string
+	buf        []s3.Key
+	err        error
 }
 
 // First fetches the first page and returns the first item. It returns

commit 35ea47144857dc16ab8b6b8a272a87af6d50cb88
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jul 20 11:40:41 2016 -0400

    8555: Test various backend states. Update recent/X timestamp during Untrash.

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index a0a6860..5eb9941 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -427,7 +427,12 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
 }
 
 func (v *S3Volume) Untrash(loc string) error {
-	return v.safeCopy(loc, "trash/"+loc)
+	err := v.safeCopy(loc, "trash/"+loc)
+	if err != nil {
+		return err
+	}
+	err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+	return v.translateError(err)
 }
 
 func (v *S3Volume) Status() *VolumeStatus {
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 73c6b76..a40b84f 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -2,8 +2,10 @@ package main
 
 import (
 	"bytes"
+	"crypto/md5"
 	"fmt"
 	"log"
+	"os"
 	"time"
 
 	"github.com/AdRoll/goamz/aws"
@@ -74,13 +76,13 @@ func (s *StubbedS3Suite) TestGeneric(c *check.C) {
 	DoGenericVolumeTests(c, func(t TB) TestableVolume {
 		// Use a negative raceWindow so s3test's 1-second
 		// timestamp precision doesn't confuse fixRace.
-		return NewTestableS3Volume(c, -time.Second, false, 2)
+		return NewTestableS3Volume(c, -2*time.Second, false, 2)
 	})
 }
 
 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
 	DoGenericVolumeTests(c, func(t TB) TestableVolume {
-		return NewTestableS3Volume(c, -time.Second, true, 2)
+		return NewTestableS3Volume(c, -2*time.Second, true, 2)
 	})
 }
 
@@ -109,6 +111,143 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
 	}
 }
 
+func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
+	defer func(tl, bs time.Duration) {
+		trashLifetime = tl
+		blobSignatureTTL = bs
+	}(trashLifetime, blobSignatureTTL)
+	trashLifetime = time.Hour
+	blobSignatureTTL = time.Hour
+
+	v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+	var none time.Time
+
+	stubKey := func(t time.Time, key string, data []byte) {
+		if t == none {
+			return
+		}
+		v.serverClock.now = &t
+		v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+	}
+
+	t0 := time.Now()
+	nextKey := 0
+	for _, test := range []struct {
+		label               string
+		data                time.Time
+		recent              time.Time
+		trash               time.Time
+		canGet              bool
+		canTrash            bool
+		canGetAfterTrash    bool
+		canUntrash          bool
+		haveTrashAfterEmpty bool
+	}{
+		{
+			"No related objects",
+			none, none, none,
+			false, false, false, false, false},
+		{
+			// Stored by older version, or there was a
+			// race between EmptyTrash and Put: Trash is a
+			// no-op even though the data object is very
+			// old
+			"No recent/X",
+			t0.Add(-48 * time.Hour), none, none,
+			true, true, true, false, false},
+		{
+			"Not trash; old enough to trash",
+			t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
+			true, true, false, false, false},
+		{
+			"Not trash; not old enough to trash",
+			t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
+			true, true, true, false, false},
+		{
+			"Trash + not-trash: recent race between Trash and Put",
+			t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
+			true, true, true, true, true},
+		{
+			"Trash + not-trash, nearly eligible for deletion, prone to Trash race",
+			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+			true, false, true, true, true},
+		{
+			"Trash + not-trash, eligible for deletion, prone to Trash race",
+			t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
+			true, false, true, true, false},
+		// FIXME: old trash never gets deleted!
+		// {
+		// 	"Not trash; old race between Trash and Put, or incomplete Trash",
+		// 	t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+		// 	true, false, true, true, false},
+		{
+			"Trash operation was interrupted",
+			t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
+			true, false, true, true, false},
+		{
+			"Trash, not yet eligible for deletion",
+			none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
+			false, false, false, true, true},
+		{
+			"Trash, not yet eligible for deletion, prone to races",
+			none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+			false, false, false, true, true},
+		{
+			"Trash, eligible for deletion",
+			none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
+			false, false, false, true, false},
+		{
+			"Erroneously trashed during a race, detected before trashLifetime",
+			none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
+			true, false, true, true, true},
+		{
+			"Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+			none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
+			true, false, true, true, true},
+	} {
+		c.Log("Scenario: ", test.label)
+		var loc string
+		var blk []byte
+
+		setup := func() {
+			nextKey++
+			blk = []byte(fmt.Sprintf("%d", nextKey))
+			loc = fmt.Sprintf("%x", md5.Sum(blk))
+			c.Log("\t", loc)
+			stubKey(test.data, loc, blk)
+			stubKey(test.recent, "recent/"+loc, nil)
+			stubKey(test.trash, "trash/"+loc, blk)
+			v.serverClock.now = &t0
+		}
+
+		setup()
+		buf := make([]byte, len(blk))
+		_, err := v.Get(loc, buf)
+		c.Check(err == nil, check.Equals, test.canGet)
+		if err != nil {
+			c.Check(os.IsNotExist(err), check.Equals, true)
+		}
+
+		setup()
+		err = v.Trash(loc)
+		c.Check(err == nil, check.Equals, test.canTrash)
+		_, err = v.Get(loc, buf)
+		c.Check(err == nil, check.Equals, test.canGetAfterTrash)
+		if err != nil {
+			c.Check(os.IsNotExist(err), check.Equals, true)
+		}
+
+		setup()
+		err = v.Untrash(loc)
+		c.Check(err == nil, check.Equals, test.canUntrash)
+
+		setup()
+		v.EmptyTrash()
+		_, err = v.Bucket.Head("trash/"+loc, nil)
+		c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
+	}
+}
+
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 	err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index c17c837..bc3e537 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -895,6 +895,9 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 		t.Fatal(err)
 	}
 
+	// Untrash might have updated the timestamp, so backdate again
+	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
 	// Second set: EmptyTrash after the trash deadline has passed.
 
 	trashLifetime = time.Nanosecond

commit d858874f6c39bdcfbe3de383933aa4e68b2780f6
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Jul 18 15:47:23 2016 -0400

    8555: Move checkRaceWindow out to a func, tweak comments

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 568bd33..a0a6860 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,7 +18,7 @@ import (
 )
 
 var (
-	ErrS3TrashNotAvailable = fmt.Errorf("trash is not possible because -trash-lifetime=0 and -s3-unsafe-delete=false")
+	ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
 
 	s3AccessKeyFile string
 	s3SecretKeyFile string
@@ -162,7 +162,12 @@ func (v *S3Volume) Check() error {
 	return nil
 }
 
-func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err error) {
+// getReader wraps (Bucket)GetReader.
+//
+// In situations where (Bucket)GetReader would fail because the block
+// disappeared in a Trash race, getReader calls fixRace to recover the
+// data, and tries again.
+func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 	rdr, err = v.Bucket.GetReader(loc)
 	err = v.translateError(err)
 	if err == nil || !os.IsNotExist(err) {
@@ -188,7 +193,7 @@ func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err erro
 }
 
 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
-	rdr, err := v.getReaderWithFixRace(loc)
+	rdr, err := v.getReader(loc)
 	if err != nil {
 		return 0, err
 	}
@@ -203,7 +208,7 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
 }
 
 func (v *S3Volume) Compare(loc string, expect []byte) error {
-	rdr, err := v.getReaderWithFixRace(loc)
+	rdr, err := v.getReader(loc)
 	if err != nil {
 		return err
 	}
@@ -341,43 +346,53 @@ func (v *S3Volume) Trash(loc string) error {
 	}
 	if trashLifetime == 0 {
 		if !s3UnsafeDelete {
-			return ErrS3TrashNotAvailable
+			return ErrS3TrashDisabled
 		}
 		return v.Bucket.Del(loc)
 	}
-
-	// Make sure we're not in the race window.
-	{
-		resp, err := v.Bucket.Head("trash/"+loc, nil)
-		err = v.translateError(err)
-		if os.IsNotExist(err) {
-			// OK, trash/X doesn't exist so we're not in
-			// the race window
-		} else if err != nil {
-			// Error looking up trash/X. Unsafe to proceed
-			// without knowing whether we're in the race
-			// window
-			return err
-		} else if t, err := v.lastModified(resp); err != nil {
-			// Can't parse timestamp
-			return err
-		} else if safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow)); safeWindow <= 0 {
-			// We can't count on "touch trash/X" to
-			// prolong trash/X's lifetime. The new
-			// timestamp might not become visible until
-			// now+raceWindow, and EmptyTrash is allowed
-			// to delete trash/X before then.
-			return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
-		}
+	err := v.checkRaceWindow(loc)
+	if err != nil {
+		return err
 	}
-
-	err := v.safeCopy("trash/"+loc, loc)
+	err = v.safeCopy("trash/"+loc, loc)
 	if err != nil {
 		return err
 	}
 	return v.translateError(v.Bucket.Del(loc))
 }
 
+// checkRaceWindow returns a non-nil error if trash/loc is, or might
+// be, in the race window (i.e., it's not safe to trash loc).
+func (v *S3Volume) checkRaceWindow(loc string) error {
+	resp, err := v.Bucket.Head("trash/"+loc, nil)
+	err = v.translateError(err)
+	if os.IsNotExist(err) {
+		// OK, trash/X doesn't exist so we're not in the race
+		// window
+		return nil
+	} else if err != nil {
+		// Error looking up trash/X. We don't know whether
+		// we're in the race window
+		return err
+	}
+	t, err := v.lastModified(resp)
+	if err != nil {
+		// Can't parse timestamp
+		return err
+	}
+	safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
+	if safeWindow <= 0 {
+		// We can't count on "touch trash/X" to prolong
+		// trash/X's lifetime. The new timestamp might not
+		// become visible until now+raceWindow, and EmptyTrash
+		// is allowed to delete trash/X before then.
+		return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+	}
+	// trash/X exists, but it won't be eligible for deletion until
+	// after now+raceWindow, so it's safe to overwrite it.
+	return nil
+}
+
 func (v *S3Volume) safeCopy(dst, src string) error {
 	resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
 		ContentType:       "application/octet-stream",

commit 8db0d3197f659a4099e481b464cb9a877b943d3c
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Jul 14 16:08:16 2016 -0400

    8555: Add log messages for time-parsing errors.

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index ad6ff5b..568bd33 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -524,10 +524,12 @@ func (v *S3Volume) EmptyTrash() {
 		}
 		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
 		if err != nil {
+			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
 			continue
 		}
 		recentT, err := v.lastModified(recent)
 		if err != nil {
+			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
 			continue
 		}
 		if trashT.Sub(recentT) < blobSignatureTTL {

commit 1127e8884c809a35280d8e57dbe3bc1b8f8818a5
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Jul 14 15:53:20 2016 -0400

    8555: Implement trash with race-recovery for S3 volumes.
    
    See https://dev.arvados.org/projects/arvados/wiki/S3_bucket_volume_implementation

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 98e1203..ad6ff5b 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,7 +18,7 @@ import (
 )
 
 var (
-	ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
+	ErrS3TrashNotAvailable = fmt.Errorf("trash is not possible because -trash-lifetime=0 and -s3-unsafe-delete=false")
 
 	s3AccessKeyFile string
 	s3SecretKeyFile string
@@ -26,6 +26,7 @@ var (
 	s3Endpoint      string
 	s3Replication   int
 	s3UnsafeDelete  bool
+	s3RaceWindow    time.Duration
 
 	s3ACL = s3.Private
 )
@@ -77,7 +78,7 @@ func (s *s3VolumeAdder) Set(bucketName string) error {
 	if flagSerializeIO {
 		log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
 	}
-	v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
+	v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
 	if err := v.Check(); err != nil {
 		return err
 	}
@@ -116,6 +117,11 @@ func init() {
 		"s3-secret-key-file",
 		"",
 		"File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+	flag.DurationVar(
+		&s3RaceWindow,
+		"s3-race-window",
+		24*time.Hour,
+		"Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
 	flag.IntVar(
 		&s3Replication,
 		"s3-replication",
@@ -130,6 +136,7 @@ func init() {
 
 type S3Volume struct {
 	*s3.Bucket
+	raceWindow    time.Duration
 	readonly      bool
 	replication   int
 	indexPageSize int
@@ -138,12 +145,13 @@ type S3Volume struct {
 // NewS3Volume returns a new S3Volume using the given auth, region,
 // and bucket name. The replication argument specifies the replication
 // level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
+func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
 	return &S3Volume{
 		Bucket: &s3.Bucket{
 			S3:   s3.New(auth, region),
 			Name: bucket,
 		},
+		raceWindow:    raceWindow,
 		readonly:      readonly,
 		replication:   replication,
 		indexPageSize: 1000,
@@ -154,10 +162,35 @@ func (v *S3Volume) Check() error {
 	return nil
 }
 
+func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err error) {
+	rdr, err = v.Bucket.GetReader(loc)
+	err = v.translateError(err)
+	if err == nil || !os.IsNotExist(err) {
+		return
+	}
+	_, err = v.Bucket.Head("recent/"+loc, nil)
+	err = v.translateError(err)
+	if err != nil {
+		// If we can't read recent/X, there's no point in
+		// trying fixRace. Give up.
+		return
+	}
+	if !v.fixRace(loc) {
+		err = os.ErrNotExist
+		return
+	}
+	rdr, err = v.Bucket.GetReader(loc)
+	if err != nil {
+		log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+		err = v.translateError(err)
+	}
+	return
+}
+
 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
-	rdr, err := v.Bucket.GetReader(loc)
+	rdr, err := v.getReaderWithFixRace(loc)
 	if err != nil {
-		return 0, v.translateError(err)
+		return 0, err
 	}
 	defer rdr.Close()
 	n, err := io.ReadFull(rdr, buf)
@@ -170,9 +203,9 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
 }
 
 func (v *S3Volume) Compare(loc string, expect []byte) error {
-	rdr, err := v.Bucket.GetReader(loc)
+	rdr, err := v.getReaderWithFixRace(loc)
 	if err != nil {
-		return v.translateError(err)
+		return err
 	}
 	defer rdr.Close()
 	return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
@@ -190,71 +223,108 @@ func (v *S3Volume) Put(loc string, block []byte) error {
 		}
 		opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
 	}
-	return v.translateError(
-		v.Bucket.Put(
-			loc, block, "application/octet-stream", s3ACL, opts))
+	err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+	if err != nil {
+		return v.translateError(err)
+	}
+	err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+	return v.translateError(err)
 }
 
 func (v *S3Volume) Touch(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
-	result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
-		ContentType:       "application/octet-stream",
-		MetadataDirective: "REPLACE",
-	}, v.Bucket.Name+"/"+loc)
-	if err != nil {
-		return v.translateError(err)
-	}
-	t, err := time.Parse(time.RFC3339, result.LastModified)
-	if err != nil {
+	_, err := v.Bucket.Head(loc, nil)
+	err = v.translateError(err)
+	if os.IsNotExist(err) && v.fixRace(loc) {
+		// The data object got trashed in a race, but fixRace
+		// rescued it.
+	} else if err != nil {
 		return err
 	}
-	if time.Since(t) > maxClockSkew {
-		return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
-	}
-	return nil
+	err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+	return v.translateError(err)
 }
 
 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
-	resp, err := v.Bucket.Head(loc, nil)
+	_, err := v.Bucket.Head(loc, nil)
 	if err != nil {
 		return zeroTime, v.translateError(err)
 	}
-	hdr := resp.Header.Get("Last-Modified")
-	t, err := time.Parse(time.RFC1123, hdr)
-	if err != nil && hdr != "" {
-		// AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
-		// which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
-		// as required by HTTP spec. If it's not a valid HTTP
-		// header value, it's probably AWS (or s3test) giving
-		// us a nearly-RFC1123 timestamp.
-		t, err = time.Parse(nearlyRFC1123, hdr)
+	resp, err := v.Bucket.Head("recent/"+loc, nil)
+	err = v.translateError(err)
+	if os.IsNotExist(err) {
+		// The data object X exists, but recent/X is missing.
+		err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+		if err != nil {
+			log.Printf("error: creating %q: %s", "recent/"+loc, err)
+			return zeroTime, v.translateError(err)
+		}
+		log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+		resp, err = v.Bucket.Head("recent/"+loc, nil)
+		if err != nil {
+			log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+			return zeroTime, v.translateError(err)
+		}
+	} else if err != nil {
+		// HEAD recent/X failed for some other reason.
+		return zeroTime, err
 	}
-	return t, err
+	return v.lastModified(resp)
 }
 
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
-	nextMarker := ""
-	for {
-		listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
-		if err != nil {
-			return err
+	// Use a merge sort to find matching sets of X and recent/X.
+	dataL := s3Lister{
+		Bucket:   v.Bucket,
+		Prefix:   prefix,
+		PageSize: v.indexPageSize,
+	}
+	recentL := s3Lister{
+		Bucket:   v.Bucket,
+		Prefix:   "recent/"+prefix,
+		PageSize: v.indexPageSize,
+	}
+	for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
+		if data.Key >= "g" {
+			// Conveniently, "recent/*" and "trash/*" are
+			// lexically greater than all hex-encoded data
+			// hashes, so stopping here avoids iterating
+			// over all of them needlessly with dataL.
+			break
 		}
-		for _, key := range listResp.Contents {
-			t, err := time.Parse(time.RFC3339, key.LastModified)
-			if err != nil {
-				return err
-			}
-			if !v.isKeepBlock(key.Key) {
+		if !v.isKeepBlock(data.Key) {
+			continue
+		}
+
+		// stamp is the list entry we should use to report the
+		// last-modified time for this data block: it will be
+		// the recent/X entry if one exists, otherwise the
+		// entry for the data block itself.
+		stamp := data
+
+		// Advance to the corresponding recent/X marker, if any
+		for recent != nil {
+			if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
+				recent = recentL.Next()
 				continue
+			} else if cmp == 0 {
+				stamp = recent
+				recent = recentL.Next()
+				break
+			} else {
+				// recent/X marker is missing: we'll
+				// use the timestamp on the data
+				// object.
+				break
 			}
-			fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.UnixNano())
 		}
-		if !listResp.IsTruncated {
-			break
+		t, err := time.Parse(time.RFC3339, stamp.LastModified)
+		if err != nil {
+			return err
 		}
-		nextMarker = listResp.NextMarker
+		fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
 	}
 	return nil
 }
@@ -264,23 +334,85 @@ func (v *S3Volume) Trash(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
-	if trashLifetime != 0 {
-		return ErrNotImplemented
-	}
 	if t, err := v.Mtime(loc); err != nil {
 		return err
 	} else if time.Since(t) < blobSignatureTTL {
 		return nil
 	}
-	if !s3UnsafeDelete {
-		return ErrS3DeleteNotAvailable
+	if trashLifetime == 0 {
+		if !s3UnsafeDelete {
+			return ErrS3TrashNotAvailable
+		}
+		return v.Bucket.Del(loc)
+	}
+
+	// Make sure we're not in the race window.
+	{
+		resp, err := v.Bucket.Head("trash/"+loc, nil)
+		err = v.translateError(err)
+		if os.IsNotExist(err) {
+			// OK, trash/X doesn't exist so we're not in
+			// the race window
+		} else if err != nil {
+			// Error looking up trash/X. Unsafe to proceed
+			// without knowing whether we're in the race
+			// window
+			return err
+		} else if t, err := v.lastModified(resp); err != nil {
+			// Can't parse timestamp
+			return err
+		} else if safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow)); safeWindow <= 0 {
+			// We can't count on "touch trash/X" to
+			// prolong trash/X's lifetime. The new
+			// timestamp might not become visible until
+			// now+raceWindow, and EmptyTrash is allowed
+			// to delete trash/X before then.
+			return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+		}
+	}
+
+	err := v.safeCopy("trash/"+loc, loc)
+	if err != nil {
+		return err
+	}
+	return v.translateError(v.Bucket.Del(loc))
+}
+
+func (v *S3Volume) safeCopy(dst, src string) error {
+	resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+		ContentType:       "application/octet-stream",
+		MetadataDirective: "REPLACE",
+	}, v.Bucket.Name+"/"+src)
+	err = v.translateError(err)
+	if err != nil {
+		return err
+	}
+	if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
+		return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
+	} else if time.Now().Sub(t) > maxClockSkew {
+		return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
 	}
-	return v.Bucket.Del(loc)
+	return nil
+}
+
+// Get the LastModified header from resp, and parse it as RFC1123 or
+// -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
+func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
+	s := resp.Header.Get("Last-Modified")
+	t, err = time.Parse(time.RFC1123, s)
+	if err != nil && s != "" {
+		// AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
+		// which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
+		// as required by HTTP spec. If it's not a valid HTTP
+		// header value, it's probably AWS (or s3test) giving
+		// us a nearly-RFC1123 timestamp.
+		t, err = time.Parse(nearlyRFC1123, s)
+	}
+	return
 }
 
-// TBD
 func (v *S3Volume) Untrash(loc string) error {
-	return ErrNotImplemented
+	return v.safeCopy(loc, "trash/"+loc)
 }
 
 func (v *S3Volume) Status() *VolumeStatus {
@@ -308,6 +440,52 @@ func (v *S3Volume) isKeepBlock(s string) bool {
 	return s3KeepBlockRegexp.MatchString(s)
 }
 
+// fixRace(X) is called when "recent/X" exists but "X" doesn't
+// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
+// there was a race between Put and Trash, fixRace recovers from the
+// race by Untrashing the block.
+func (v *S3Volume) fixRace(loc string) bool {
+	trash, err := v.Bucket.Head("trash/"+loc, nil)
+	if err != nil {
+		if !os.IsNotExist(v.translateError(err)) {
+			log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+		}
+		return false
+	}
+	trashTime, err := v.lastModified(trash)
+	if err != nil {
+		log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+		return false
+	}
+
+	recent, err := v.Bucket.Head("recent/"+loc, nil)
+	if err != nil {
+		log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+		return false
+	}
+	recentTime, err := v.lastModified(recent)
+	if err != nil {
+		log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+		return false
+	}
+
+	ageWhenTrashed := trashTime.Sub(recentTime)
+	if ageWhenTrashed >= blobSignatureTTL {
+		// No evidence of a race: block hasn't been written
+		// since it became eligible for Trash. No fix needed.
+		return false
+	}
+
+	log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+	log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+	err = v.safeCopy(loc, "trash/"+loc)
+	if err != nil {
+		log.Printf("error: fixRace: %s", err)
+		return false
+	}
+	return true
+}
+
 func (v *S3Volume) translateError(err error) error {
 	switch err := err.(type) {
 	case *s3.Error:
@@ -325,6 +503,115 @@ func (v *S3Volume) translateError(err error) error {
 
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
-// TBD
 func (v *S3Volume) EmptyTrash() {
+	// Use a merge sort to find matching sets of trash/X and recent/X.
+	trashL := s3Lister{
+		Bucket:   v.Bucket,
+		Prefix:   "trash/",
+		PageSize: v.indexPageSize,
+	}
+	// Define "ready to delete" as "...when EmptyTrash started".
+	now := time.Now()
+	for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+		loc := trash.Key[6:]
+		if !v.isKeepBlock(loc) {
+			continue
+		}
+		recent, err := v.Bucket.Head("recent/"+loc, nil)
+		if err != nil {
+			log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+			continue
+		}
+		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+		if err != nil {
+			continue
+		}
+		recentT, err := v.lastModified(recent)
+		if err != nil {
+			continue
+		}
+		if trashT.Sub(recentT) < blobSignatureTTL {
+			v.fixRace(loc)
+			continue
+		}
+		if now.Sub(trashT) < trashLifetime {
+			continue
+		}
+		err = v.Bucket.Del(trash.Key)
+		if err != nil {
+			log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+			continue
+		}
+		_, err = v.Bucket.Head(loc, nil)
+		if os.IsNotExist(err) {
+			err = v.Bucket.Del("recent/"+loc)
+			if err != nil {
+				log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+			}
+		} else if err != nil {
+			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+		}
+	}
+	if err := trashL.Error(); err != nil {
+		log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+	}
+}
+
+type s3Lister struct {
+	Bucket      *s3.Bucket
+	Prefix      string
+	PageSize    int
+	nextMarker  string
+	buf         []s3.Key
+	err         error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3Lister) First() *s3.Key {
+	lister.getPage()
+	return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3Lister) Next() *s3.Key {
+	if len(lister.buf) == 0 && lister.nextMarker != "" {
+		lister.getPage()
+	}
+	return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3Lister) Error() error {
+	return lister.err
+}
+
+func (lister *s3Lister) getPage() {
+	resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
+	lister.nextMarker = ""
+	if err != nil {
+		lister.err = err
+		return
+	}
+	if resp.IsTruncated {
+		lister.nextMarker = resp.NextMarker
+	}
+	lister.buf = make([]s3.Key, 0, len(resp.Contents))
+	for _, key := range resp.Contents {
+		if !strings.HasPrefix(key.Key, lister.Prefix) {
+			log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+			continue
+		}
+		lister.buf = append(lister.buf, key)
+	}
+}
+
+func (lister *s3Lister) pop() (k *s3.Key) {
+	if len(lister.buf) > 0 {
+		k = &lister.buf[0]
+		lister.buf = lister.buf[1:]
+	}
+	return
 }
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 0c2cd49..73c6b76 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -4,7 +4,6 @@ import (
 	"bytes"
 	"fmt"
 	"log"
-	"strings"
 	"time"
 
 	"github.com/AdRoll/goamz/aws"
@@ -41,7 +40,7 @@ func init() {
 	s3UnsafeDelete = true
 }
 
-func NewTestableS3Volume(c *check.C, readonly bool, replication int) *TestableS3Volume {
+func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
 	clock := &fakeClock{}
 	srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
 	c.Assert(err, check.IsNil)
@@ -59,7 +58,7 @@ func NewTestableS3Volume(c *check.C, readonly bool, replication int) *TestableS3
 	c.Assert(err, check.IsNil)
 
 	return &TestableS3Volume{
-		S3Volume:    NewS3Volume(auth, region, TestBucketName, readonly, replication),
+		S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
 		server:      srv,
 		serverClock: clock,
 	}
@@ -73,18 +72,20 @@ type StubbedS3Suite struct {
 
 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
 	DoGenericVolumeTests(c, func(t TB) TestableVolume {
-		return NewTestableS3Volume(c, false, 2)
+		// Use a negative raceWindow so s3test's 1-second
+		// timestamp precision doesn't confuse fixRace.
+		return NewTestableS3Volume(c, -time.Second, false, 2)
 	})
 }
 
 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
 	DoGenericVolumeTests(c, func(t TB) TestableVolume {
-		return NewTestableS3Volume(c, true, 2)
+		return NewTestableS3Volume(c, -time.Second, true, 2)
 	})
 }
 
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
-	v := NewTestableS3Volume(c, false, 2)
+	v := NewTestableS3Volume(c, 0, false, 2)
 	v.indexPageSize = 3
 	for i := 0; i < 256; i++ {
 		v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
@@ -121,9 +122,9 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
 	v.serverClock.now = &lastPut
-	err := v.Touch(locator)
-	if err != nil && !strings.Contains(err.Error(), "PutCopy returned old LastModified") {
-		log.Printf("Touch: %+v", err)
+	err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+	if err != nil {
+		panic(err)
 	}
 	v.serverClock.now = nil
 }
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 4291c6c..c17c837 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -741,7 +741,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
 	defer func() {
-		trashLifetime = 0 * time.Second
+		trashLifetime = 0
 	}()
 
 	trashLifetime = 3600 * time.Second
@@ -830,7 +830,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
 	// First set: EmptyTrash before reaching the trash deadline.
 
-	trashLifetime = 1 * time.Hour
+	trashLifetime = time.Hour
 
 	v.PutRaw(TestHash, TestBlock)
 	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
@@ -880,14 +880,15 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	// Because we Touch'ed, need to backdate again for next set of tests
 	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-	// Untrash should fail if the only block in the trash has
-	// already been untrashed.
+	// If the only block in the trash has already been untrashed,
+	// most volumes will fail a subsequent Untrash with a 404, but
+	// it's also acceptable for Untrash to succeed.
 	err = v.Untrash(TestHash)
-	if err == nil || !os.IsNotExist(err) {
-		t.Fatalf("os.IsNotExist(%v) should have been true", err)
+	if err != nil && !os.IsNotExist(err) {
+		t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
 	}
 
-	// The failed Untrash should not interfere with our
+	// The additional Untrash should not interfere with our
 	// already-untrashed copy.
 	err = checkGet()
 	if err != nil {
@@ -896,7 +897,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
 	// Second set: EmptyTrash after the trash deadline has passed.
 
-	trashLifetime = 1 * time.Nanosecond
+	trashLifetime = time.Nanosecond
 
 	err = v.Trash(TestHash)
 	if err != nil {
@@ -927,7 +928,6 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	if err == nil || !os.IsNotExist(err) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
-	// EmptryTrash
 	v.EmptyTrash()
 
 	// Untrash won't find it

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list