[ARVADOS] updated: 2905ec29a7a5d2b45acd76c81bb4b6e1af20390b

Git user git at public.curoverse.com
Thu Jul 14 15:10:22 EDT 2016


Summary of changes:
 services/keepstore/s3_volume.go | 33 +++++++++++++++++++++++----------
 1 file changed, 23 insertions(+), 10 deletions(-)

  discards  68b029276cc9edc39003beaa23ab38b9dd04db4f (commit)
       via  2905ec29a7a5d2b45acd76c81bb4b6e1af20390b (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (68b029276cc9edc39003beaa23ab38b9dd04db4f)
            \
             N -- N -- N (2905ec29a7a5d2b45acd76c81bb4b6e1af20390b)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2905ec29a7a5d2b45acd76c81bb4b6e1af20390b
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Jul 14 15:09:38 2016 -0400

    8555: Implement trash with race-recovery for S3 volumes.
    
    See https://dev.arvados.org/projects/arvados/wiki/S3_bucket_volume_implementation

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 98e1203..a519951 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,7 +18,7 @@ import (
 )
 
 var (
-	ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
+	ErrS3TrashNotAvailable = fmt.Errorf("trash is not possible because -trash-lifetime=0 and -s3-unsafe-delete=false")
 
 	s3AccessKeyFile string
 	s3SecretKeyFile string
@@ -26,6 +26,7 @@ var (
 	s3Endpoint      string
 	s3Replication   int
 	s3UnsafeDelete  bool
+	s3RaceWindow    time.Duration
 
 	s3ACL = s3.Private
 )
@@ -77,7 +78,7 @@ func (s *s3VolumeAdder) Set(bucketName string) error {
 	if flagSerializeIO {
 		log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
 	}
-	v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
+	v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
 	if err := v.Check(); err != nil {
 		return err
 	}
@@ -116,6 +117,11 @@ func init() {
 		"s3-secret-key-file",
 		"",
 		"File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+	flag.DurationVar(
+		&s3RaceWindow,
+		"s3-race-window",
+		24*time.Hour,
+		"Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
 	flag.IntVar(
 		&s3Replication,
 		"s3-replication",
@@ -130,6 +136,7 @@ func init() {
 
 type S3Volume struct {
 	*s3.Bucket
+	raceWindow    time.Duration
 	readonly      bool
 	replication   int
 	indexPageSize int
@@ -138,12 +145,13 @@ type S3Volume struct {
 // NewS3Volume returns a new S3Volume using the given auth, region,
 // and bucket name. The replication argument specifies the replication
 // level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
+func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
 	return &S3Volume{
 		Bucket: &s3.Bucket{
 			S3:   s3.New(auth, region),
 			Name: bucket,
 		},
+		raceWindow:    raceWindow,
 		readonly:      readonly,
 		replication:   replication,
 		indexPageSize: 1000,
@@ -154,10 +162,35 @@ func (v *S3Volume) Check() error {
 	return nil
 }
 
+func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err error) {
+	rdr, err = v.Bucket.GetReader(loc)
+	err = v.translateError(err)
+	if err == nil || !os.IsNotExist(err) {
+		return
+	}
+	_, err = v.Bucket.Head("recent/"+loc, nil)
+	err = v.translateError(err)
+	if err != nil {
+		// If we can't read recent/X, there's no point in
+		// trying fixRace. Give up.
+		return
+	}
+	if !v.fixRace(loc) {
+		err = os.ErrNotExist
+		return
+	}
+	rdr, err = v.Bucket.GetReader(loc)
+	if err != nil {
+		log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+		err = v.translateError(err)
+	}
+	return
+}
+
 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
-	rdr, err := v.Bucket.GetReader(loc)
+	rdr, err := v.getReaderWithFixRace(loc)
 	if err != nil {
-		return 0, v.translateError(err)
+		return 0, err
 	}
 	defer rdr.Close()
 	n, err := io.ReadFull(rdr, buf)
@@ -170,9 +203,9 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
 }
 
 func (v *S3Volume) Compare(loc string, expect []byte) error {
-	rdr, err := v.Bucket.GetReader(loc)
+	rdr, err := v.getReaderWithFixRace(loc)
 	if err != nil {
-		return v.translateError(err)
+		return err
 	}
 	defer rdr.Close()
 	return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
@@ -190,71 +223,108 @@ func (v *S3Volume) Put(loc string, block []byte) error {
 		}
 		opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
 	}
-	return v.translateError(
-		v.Bucket.Put(
-			loc, block, "application/octet-stream", s3ACL, opts))
+	err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+	if err != nil {
+		return v.translateError(err)
+	}
+	err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+	return v.translateError(err)
 }
 
 func (v *S3Volume) Touch(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
-	result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
-		ContentType:       "application/octet-stream",
-		MetadataDirective: "REPLACE",
-	}, v.Bucket.Name+"/"+loc)
-	if err != nil {
-		return v.translateError(err)
-	}
-	t, err := time.Parse(time.RFC3339, result.LastModified)
-	if err != nil {
+	_, err := v.Bucket.Head(loc, nil)
+	err = v.translateError(err)
+	if os.IsNotExist(err) && v.fixRace(loc) {
+		// The data object got trashed in a race, but fixRace
+		// rescued it.
+	} else if err != nil {
 		return err
 	}
-	if time.Since(t) > maxClockSkew {
-		return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
-	}
-	return nil
+	err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+	return v.translateError(err)
 }
 
 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
-	resp, err := v.Bucket.Head(loc, nil)
+	_, err := v.Bucket.Head(loc, nil)
 	if err != nil {
 		return zeroTime, v.translateError(err)
 	}
-	hdr := resp.Header.Get("Last-Modified")
-	t, err := time.Parse(time.RFC1123, hdr)
-	if err != nil && hdr != "" {
-		// AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
-		// which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
-		// as required by HTTP spec. If it's not a valid HTTP
-		// header value, it's probably AWS (or s3test) giving
-		// us a nearly-RFC1123 timestamp.
-		t, err = time.Parse(nearlyRFC1123, hdr)
+	resp, err := v.Bucket.Head("recent/"+loc, nil)
+	err = v.translateError(err)
+	if os.IsNotExist(err) {
+		// The data object X exists, but recent/X is missing.
+		err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+		if err != nil {
+			log.Printf("error: creating %q: %s", "recent/"+loc, err)
+			return zeroTime, v.translateError(err)
+		}
+		log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+		resp, err = v.Bucket.Head("recent/"+loc, nil)
+		if err != nil {
+			log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+			return zeroTime, v.translateError(err)
+		}
+	} else if err != nil {
+		// HEAD recent/X failed for some other reason.
+		return zeroTime, err
 	}
-	return t, err
+	return v.lastModified(resp)
 }
 
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
-	nextMarker := ""
-	for {
-		listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
-		if err != nil {
-			return err
+	// Use a merge sort to find matching sets of X and recent/X.
+	dataL := s3Lister{
+		Bucket:   v.Bucket,
+		Prefix:   prefix,
+		PageSize: v.indexPageSize,
+	}
+	recentL := s3Lister{
+		Bucket:   v.Bucket,
+		Prefix:   "recent/"+prefix,
+		PageSize: v.indexPageSize,
+	}
+	for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
+		if data.Key >= "g" {
+			// Conveniently, "recent/*" and "trash/*" are
+			// lexically greater than all hex-encoded data
+			// hashes, so stopping here avoids iterating
+			// over all of them needlessly with dataL.
+			break
 		}
-		for _, key := range listResp.Contents {
-			t, err := time.Parse(time.RFC3339, key.LastModified)
-			if err != nil {
-				return err
-			}
-			if !v.isKeepBlock(key.Key) {
+		if !v.isKeepBlock(data.Key) {
+			continue
+		}
+
+		// stamp is the list entry we should use to report the
+		// last-modified time for this data block: it will be
+		// the recent/X entry if one exists, otherwise the
+		// entry for the data block itself.
+		stamp := data
+
+		// Advance to the corresponding recent/X marker, if any
+		for recent != nil {
+			if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
+				recent = recentL.Next()
 				continue
+			} else if cmp == 0 {
+				stamp = recent
+				recent = recentL.Next()
+				break
+			} else {
+				// recent/X marker is missing: we'll
+				// use the timestamp on the data
+				// object.
+				break
 			}
-			fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.UnixNano())
 		}
-		if !listResp.IsTruncated {
-			break
+		t, err := time.Parse(time.RFC3339, stamp.LastModified)
+		if err != nil {
+			return err
 		}
-		nextMarker = listResp.NextMarker
+		fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
 	}
 	return nil
 }
@@ -264,23 +334,86 @@ func (v *S3Volume) Trash(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
-	if trashLifetime != 0 {
-		return ErrNotImplemented
-	}
 	if t, err := v.Mtime(loc); err != nil {
 		return err
 	} else if time.Since(t) < blobSignatureTTL {
 		return nil
 	}
-	if !s3UnsafeDelete {
-		return ErrS3DeleteNotAvailable
+	if trashLifetime == 0 {
+		if !s3UnsafeDelete {
+			return ErrS3TrashNotAvailable
+		}
+		return v.Bucket.Del(loc)
+	}
+
+	// Make sure we're not in the race window.
+	{
+		resp, err := v.Bucket.Head("trash/"+loc, nil)
+		err = v.translateError(err)
+		if os.IsNotExist(err) {
+			// OK, trash/X doesn't exist so we're not in
+			// the race window
+		} else if err != nil {
+			// Error looking up trash/X. Unsafe to proceed
+			// without knowing whether we're in the race
+			// window
+			return err
+		} else if t, err := v.lastModified(resp); err != nil {
+			// Can't parse timestamp
+			return err
+		} else if safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow)); safeWindow <= 0 {
+			// We can't count on "touch trash/X" to
+			// prolong trash/X's lifetime. The new
+			// timestamp might not become visible until
+			// now+raceWindow, and EmptyTrash is allowed
+			// to delete trash/X before then.
+			log.Printf("raceWindow %s, trashLifetime %s, now %s, t %s", v.raceWindow, trashLifetime, time.Now().UTC().Format(time.RFC3339Nano), t.UTC().Format(time.RFC3339Nano))
+			return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+		}
+	}
+
+	err := v.safeCopy("trash/"+loc, loc)
+	if err != nil {
+		return err
+	}
+	return v.translateError(v.Bucket.Del(loc))
+}
+
+func (v *S3Volume) safeCopy(dst, src string) error {
+	resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+		ContentType:       "application/octet-stream",
+		MetadataDirective: "REPLACE",
+	}, v.Bucket.Name+"/"+src)
+	err = v.translateError(err)
+	if err != nil {
+		return err
+	}
+	if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
+		return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
+	} else if time.Now().Sub(t) > maxClockSkew {
+		return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
 	}
-	return v.Bucket.Del(loc)
+	return nil
+}
+
+// Get the LastModified header from resp, and parse it as RFC1123 or
+// -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
+func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
+	s := resp.Header.Get("Last-Modified")
+	t, err = time.Parse(time.RFC1123, s)
+	if err != nil && s != "" {
+		// AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
+		// which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
+		// as required by HTTP spec. If it's not a valid HTTP
+		// header value, it's probably AWS (or s3test) giving
+		// us a nearly-RFC1123 timestamp.
+		t, err = time.Parse(nearlyRFC1123, s)
+	}
+	return
 }
 
-// TBD
 func (v *S3Volume) Untrash(loc string) error {
-	return ErrNotImplemented
+	return v.safeCopy(loc, "trash/"+loc)
 }
 
 func (v *S3Volume) Status() *VolumeStatus {
@@ -308,6 +441,52 @@ func (v *S3Volume) isKeepBlock(s string) bool {
 	return s3KeepBlockRegexp.MatchString(s)
 }
 
+// fixRace(X) is called when "recent/X" exists but "X" doesn't
+// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
+// there was a race between Put and Trash, fixRace recovers from the
+// race by Untrashing the block.
+func (v *S3Volume) fixRace(loc string) bool {
+	trash, err := v.Bucket.Head("trash/"+loc, nil)
+	if err != nil {
+		if !os.IsNotExist(v.translateError(err)) {
+			log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+		}
+		return false
+	}
+	trashTime, err := v.lastModified(trash)
+	if err != nil {
+		log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+		return false
+	}
+
+	recent, err := v.Bucket.Head("recent/"+loc, nil)
+	if err != nil {
+		log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+		return false
+	}
+	recentTime, err := v.lastModified(recent)
+	if err != nil {
+		log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+		return false
+	}
+
+	ageWhenTrashed := trashTime.Sub(recentTime)
+	if ageWhenTrashed >= blobSignatureTTL {
+		// No evidence of a race: block hasn't been written
+		// since it became eligible for Trash. No fix needed.
+		return false
+	}
+
+	log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+	log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+	err = v.safeCopy(loc, "trash/"+loc)
+	if err != nil {
+		log.Printf("error: fixRace: %s", err)
+		return false
+	}
+	return true
+}
+
 func (v *S3Volume) translateError(err error) error {
 	switch err := err.(type) {
 	case *s3.Error:
@@ -325,6 +504,115 @@ func (v *S3Volume) translateError(err error) error {
 
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
-// TBD
 func (v *S3Volume) EmptyTrash() {
+	// Use a merge sort to find matching sets of trash/X and recent/X.
+	trashL := s3Lister{
+		Bucket:   v.Bucket,
+		Prefix:   "trash/",
+		PageSize: v.indexPageSize,
+	}
+	// Define "ready to delete" as "...when EmptyTrash started".
+	now := time.Now()
+	for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+		loc := trash.Key[6:]
+		if !v.isKeepBlock(loc) {
+			continue
+		}
+		recent, err := v.Bucket.Head("recent/"+loc, nil)
+		if err != nil {
+			log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
+			continue
+		}
+		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
+		if err != nil {
+			continue
+		}
+		recentT, err := v.lastModified(recent)
+		if err != nil {
+			continue
+		}
+		if trashT.Sub(recentT) < blobSignatureTTL {
+			v.fixRace(loc)
+			continue
+		}
+		if now.Sub(trashT) < trashLifetime {
+			continue
+		}
+		err = v.Bucket.Del(trash.Key)
+		if err != nil {
+			log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+			continue
+		}
+		_, err = v.Bucket.Head(loc, nil)
+		if os.IsNotExist(err) {
+			err = v.Bucket.Del("recent/"+loc)
+			if err != nil {
+				log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+			}
+		} else if err != nil {
+			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+		}
+	}
+	if err := trashL.Error(); err != nil {
+		log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+	}
+}
+
+type s3Lister struct {
+	Bucket      *s3.Bucket
+	Prefix      string
+	PageSize    int
+	nextMarker  string
+	buf         []s3.Key
+	err         error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3Lister) First() *s3.Key {
+	lister.getPage()
+	return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3Lister) Next() *s3.Key {
+	if len(lister.buf) == 0 && lister.nextMarker != "" {
+		lister.getPage()
+	}
+	return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3Lister) Error() error {
+	return lister.err
+}
+
+func (lister *s3Lister) getPage() {
+	resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
+	lister.nextMarker = ""
+	if err != nil {
+		lister.err = err
+		return
+	}
+	if resp.IsTruncated {
+		lister.nextMarker = resp.NextMarker
+	}
+	lister.buf = make([]s3.Key, 0, len(resp.Contents))
+	for _, key := range resp.Contents {
+		if !strings.HasPrefix(key.Key, lister.Prefix) {
+			log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+			continue
+		}
+		lister.buf = append(lister.buf, key)
+	}
+}
+
+func (lister *s3Lister) pop() (k *s3.Key) {
+	if len(lister.buf) > 0 {
+		k = &lister.buf[0]
+		lister.buf = lister.buf[1:]
+	}
+	return
 }
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 0c2cd49..cc07092 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -4,7 +4,6 @@ import (
 	"bytes"
 	"fmt"
 	"log"
-	"strings"
 	"time"
 
 	"github.com/AdRoll/goamz/aws"
@@ -41,7 +40,7 @@ func init() {
 	s3UnsafeDelete = true
 }
 
-func NewTestableS3Volume(c *check.C, readonly bool, replication int) *TestableS3Volume {
+func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
 	clock := &fakeClock{}
 	srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
 	c.Assert(err, check.IsNil)
@@ -59,7 +58,7 @@ func NewTestableS3Volume(c *check.C, readonly bool, replication int) *TestableS3
 	c.Assert(err, check.IsNil)
 
 	return &TestableS3Volume{
-		S3Volume:    NewS3Volume(auth, region, TestBucketName, readonly, replication),
+		S3Volume:    NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
 		server:      srv,
 		serverClock: clock,
 	}
@@ -73,18 +72,20 @@ type StubbedS3Suite struct {
 
 func (s *StubbedS3Suite) TestGeneric(c *check.C) {
 	DoGenericVolumeTests(c, func(t TB) TestableVolume {
-		return NewTestableS3Volume(c, false, 2)
+		// Use a negative raceWindow so s3test's 1-second
+		// timestamp precision doesn't confuse fixRace.
+		return NewTestableS3Volume(c, -time.Second, false, 2)
 	})
 }
 
 func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
 	DoGenericVolumeTests(c, func(t TB) TestableVolume {
-		return NewTestableS3Volume(c, true, 2)
+		return NewTestableS3Volume(c, -time.Second, true, 2)
 	})
 }
 
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
-	v := NewTestableS3Volume(c, false, 2)
+	v := NewTestableS3Volume(c, 0, false, 2)
 	v.indexPageSize = 3
 	for i := 0; i < 256; i++ {
 		v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
@@ -121,9 +122,10 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
 	v.serverClock.now = &lastPut
-	err := v.Touch(locator)
-	if err != nil && !strings.Contains(err.Error(), "PutCopy returned old LastModified") {
-		log.Printf("Touch: %+v", err)
+	log.Printf("TouchWithDate(%q, %q)", locator, v.serverClock.now)
+	err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+	if err != nil {
+		panic(err)
 	}
 	v.serverClock.now = nil
 }
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 4291c6c..c17c837 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -741,7 +741,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
 	defer func() {
-		trashLifetime = 0 * time.Second
+		trashLifetime = 0
 	}()
 
 	trashLifetime = 3600 * time.Second
@@ -830,7 +830,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
 	// First set: EmptyTrash before reaching the trash deadline.
 
-	trashLifetime = 1 * time.Hour
+	trashLifetime = time.Hour
 
 	v.PutRaw(TestHash, TestBlock)
 	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
@@ -880,14 +880,15 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	// Because we Touch'ed, need to backdate again for next set of tests
 	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-	// Untrash should fail if the only block in the trash has
-	// already been untrashed.
+	// If the only block in the trash has already been untrashed,
+	// most volumes will fail a subsequent Untrash with a 404, but
+	// it's also acceptable for Untrash to succeed.
 	err = v.Untrash(TestHash)
-	if err == nil || !os.IsNotExist(err) {
-		t.Fatalf("os.IsNotExist(%v) should have been true", err)
+	if err != nil && !os.IsNotExist(err) {
+		t.Fatalf("Expected success or os.IsNotExist(), but got: %v", err)
 	}
 
-	// The failed Untrash should not interfere with our
+	// The additional Untrash should not interfere with our
 	// already-untrashed copy.
 	err = checkGet()
 	if err != nil {
@@ -896,7 +897,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
 	// Second set: EmptyTrash after the trash deadline has passed.
 
-	trashLifetime = 1 * time.Nanosecond
+	trashLifetime = time.Nanosecond
 
 	err = v.Trash(TestHash)
 	if err != nil {
@@ -927,7 +928,6 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	if err == nil || !os.IsNotExist(err) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
-	// EmptryTrash
 	v.EmptyTrash()
 
 	// Untrash won't find it

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list