[ARVADOS] updated: 90c663a9f4272db22532a07a6488a2ec4061284c
Git user
git at public.curoverse.com
Wed Jul 20 11:57:08 EDT 2016
Summary of changes:
services/keep-balance/balance_run_test.go | 2 +-
services/keepstore/s3_volume.go | 50 +++++++++++++---
services/keepstore/s3_volume_test.go | 94 +++++++++++++++++++++++--------
services/keepstore/volume_generic_test.go | 3 +
services/keepstore/volume_unix.go | 13 +++--
5 files changed, 123 insertions(+), 39 deletions(-)
discards bde7e79b31d4265793e8617882bf411056a05c58 (commit)
via 90c663a9f4272db22532a07a6488a2ec4061284c (commit)
via 75ebd82ac561e65fa7b9d84d1d85d9ac92facdc5 (commit)
via 9738140a6c95ac94d82fb2715cbd33da332406e8 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (bde7e79b31d4265793e8617882bf411056a05c58)
\
N -- N -- N (90c663a9f4272db22532a07a6488a2ec4061284c)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 90c663a9f4272db22532a07a6488a2ec4061284c
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 20 11:56:59 2016 -0400
8555: golint
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 9b7b51a..5b00d4a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,6 +18,8 @@ import (
)
var (
+ // Returned by Trash if that operation is impossible with the
+ // current config.
ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
s3AccessKeyFile string
@@ -134,6 +136,7 @@ func init() {
"EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
}
+// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
*s3.Bucket
raceWindow time.Duration
@@ -158,6 +161,8 @@ func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow tim
}
}
+// Check returns an error if the volume is inaccessible (e.g., config
+// error).
func (v *S3Volume) Check() error {
return nil
}
@@ -192,6 +197,8 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
return
}
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
rdr, err := v.getReader(loc)
if err != nil {
@@ -207,6 +214,7 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
}
}
+// Compare the given data with the stored data.
func (v *S3Volume) Compare(loc string, expect []byte) error {
rdr, err := v.getReader(loc)
if err != nil {
@@ -216,6 +224,7 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
}
+// Put writes a block.
func (v *S3Volume) Put(loc string, block []byte) error {
if v.readonly {
return MethodDisabledError
@@ -236,6 +245,7 @@ func (v *S3Volume) Put(loc string, block []byte) error {
return v.translateError(err)
}
+// Touch sets the timestamp for the given locator to the current time.
func (v *S3Volume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
@@ -252,6 +262,7 @@ func (v *S3Volume) Touch(loc string) error {
return v.translateError(err)
}
+// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
_, err := v.Bucket.Head(loc, nil)
if err != nil {
@@ -279,6 +290,8 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
return v.lastModified(resp)
}
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
@@ -393,6 +406,10 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
return nil
}
+// safeCopy calls PutCopy, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+// (PutCopy returns 200 OK if the request was received, even if the
+// copy failed).
func (v *S3Volume) safeCopy(dst, src string) error {
resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
@@ -426,6 +443,7 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
return
}
+// Untrash moves block from trash back into store
func (v *S3Volume) Untrash(loc string) error {
err := v.safeCopy(loc, "trash/"+loc)
if err != nil {
@@ -435,6 +453,9 @@ func (v *S3Volume) Untrash(loc string) error {
return v.translateError(err)
}
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
func (v *S3Volume) Status() *VolumeStatus {
return &VolumeStatus{
DeviceNum: 1,
@@ -443,13 +464,19 @@ func (v *S3Volume) Status() *VolumeStatus {
}
}
+// String implements fmt.Stringer.
func (v *S3Volume) String() string {
return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
}
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
func (v *S3Volume) Writable() bool {
return !v.readonly
}
+
+// Replication returns the storage redundancy of the underlying
+// device. Configured via command line flag.
func (v *S3Volume) Replication() int {
return v.replication
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 90189dc..5982fb0 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -306,7 +306,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
//
func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
- var lastErr error = nil
+ var lastErr error
rootdir, err := os.Open(v.root)
if err != nil {
return err
@@ -397,10 +397,8 @@ func (v *UnixVolume) Trash(loc string) error {
// anyway (because the permission signatures have expired).
if fi, err := os.Stat(p); err != nil {
return err
- } else {
- if time.Since(fi.ModTime()) < blobSignatureTTL {
- return nil
- }
+ } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+ return nil
}
if trashLifetime == 0 {
@@ -506,11 +504,14 @@ func (v *UnixVolume) String() string {
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
-// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
func (v *UnixVolume) Writable() bool {
return !v.readonly
}
+// Replication returns the number of replicas promised by the
+// underlying device (currently assumed to be 1).
func (v *UnixVolume) Replication() int {
return 1
}
commit 75ebd82ac561e65fa7b9d84d1d85d9ac92facdc5
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 20 11:42:14 2016 -0400
8555: gofmt
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index edc88aa..23d74fe 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -371,5 +371,5 @@ func (s *runSuite) TestRunForever(c *check.C) {
}
stop <- true
c.Check(pullReqs.Count() >= 16, check.Equals, true)
- c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
+ c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 5eb9941..9b7b51a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -288,7 +288,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
}
recentL := s3Lister{
Bucket: v.Bucket,
- Prefix: "recent/"+prefix,
+ Prefix: "recent/" + prefix,
PageSize: v.indexPageSize,
}
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
@@ -566,7 +566,7 @@ func (v *S3Volume) EmptyTrash() {
}
_, err = v.Bucket.Head(loc, nil)
if os.IsNotExist(err) {
- err = v.Bucket.Del("recent/"+loc)
+ err = v.Bucket.Del("recent/" + loc)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
}
@@ -580,12 +580,12 @@ func (v *S3Volume) EmptyTrash() {
}
type s3Lister struct {
- Bucket *s3.Bucket
- Prefix string
- PageSize int
- nextMarker string
- buf []s3.Key
- err error
+ Bucket *s3.Bucket
+ Prefix string
+ PageSize int
+ nextMarker string
+ buf []s3.Key
+ err error
}
// First fetches the first page and returns the first item. It returns
commit 9738140a6c95ac94d82fb2715cbd33da332406e8
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jul 20 11:40:41 2016 -0400
8555: Test various backend states. Update recent/X timestamp during Untrash.
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index a0a6860..5eb9941 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -427,7 +427,12 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
}
func (v *S3Volume) Untrash(loc string) error {
- return v.safeCopy(loc, "trash/"+loc)
+ err := v.safeCopy(loc, "trash/"+loc)
+ if err != nil {
+ return err
+ }
+ err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ return v.translateError(err)
}
func (v *S3Volume) Status() *VolumeStatus {
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 73c6b76..a40b84f 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -2,8 +2,10 @@ package main
import (
"bytes"
+ "crypto/md5"
"fmt"
"log"
+ "os"
"time"
"github.com/AdRoll/goamz/aws"
@@ -74,13 +76,13 @@ func (s *StubbedS3Suite) TestGeneric(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return NewTestableS3Volume(c, -time.Second, false, 2)
+ return NewTestableS3Volume(c, -2*time.Second, false, 2)
})
}
func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
- return NewTestableS3Volume(c, -time.Second, true, 2)
+ return NewTestableS3Volume(c, -2*time.Second, true, 2)
})
}
@@ -109,6 +111,143 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
}
}
+func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
+ defer func(tl, bs time.Duration) {
+ trashLifetime = tl
+ blobSignatureTTL = bs
+ }(trashLifetime, blobSignatureTTL)
+ trashLifetime = time.Hour
+ blobSignatureTTL = time.Hour
+
+ v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+ var none time.Time
+
+ stubKey := func(t time.Time, key string, data []byte) {
+ if t == none {
+ return
+ }
+ v.serverClock.now = &t
+ v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+ }
+
+ t0 := time.Now()
+ nextKey := 0
+ for _, test := range []struct {
+ label string
+ data time.Time
+ recent time.Time
+ trash time.Time
+ canGet bool
+ canTrash bool
+ canGetAfterTrash bool
+ canUntrash bool
+ haveTrashAfterEmpty bool
+ }{
+ {
+ "No related objects",
+ none, none, none,
+ false, false, false, false, false},
+ {
+ // Stored by older version, or there was a
+ // race between EmptyTrash and Put: Trash is a
+ // no-op even though the data object is very
+ // old
+ "No recent/X",
+ t0.Add(-48 * time.Hour), none, none,
+ true, true, true, false, false},
+ {
+ "Not trash; old enough to trash",
+ t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
+ true, true, false, false, false},
+ {
+ "Not trash; not old enough to trash",
+ t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
+ true, true, true, false, false},
+ {
+ "Trash + not-trash: recent race between Trash and Put",
+ t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
+ true, true, true, true, true},
+ {
+ "Trash + not-trash, nearly eligible for deletion, prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ true, false, true, true, true},
+ {
+ "Trash + not-trash, eligible for deletion, prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
+ true, false, true, true, false},
+ // FIXME: old trash never gets deleted!
+ // {
+ // "Not trash; old race between Trash and Put, or incomplete Trash",
+ // t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+ // true, false, true, true, false},
+ {
+ "Trash operation was interrupted",
+ t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
+ true, false, true, true, false},
+ {
+ "Trash, not yet eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
+ false, false, false, true, true},
+ {
+ "Trash, not yet eligible for deletion, prone to races",
+ none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ false, false, false, true, true},
+ {
+ "Trash, eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
+ false, false, false, true, false},
+ {
+ "Erroneously trashed during a race, detected before trashLifetime",
+ none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
+ true, false, true, true, true},
+ {
+ "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+ none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
+ true, false, true, true, true},
+ } {
+ c.Log("Scenario: ", test.label)
+ var loc string
+ var blk []byte
+
+ setup := func() {
+ nextKey++
+ blk = []byte(fmt.Sprintf("%d", nextKey))
+ loc = fmt.Sprintf("%x", md5.Sum(blk))
+ c.Log("\t", loc)
+ stubKey(test.data, loc, blk)
+ stubKey(test.recent, "recent/"+loc, nil)
+ stubKey(test.trash, "trash/"+loc, blk)
+ v.serverClock.now = &t0
+ }
+
+ setup()
+ buf := make([]byte, len(blk))
+ _, err := v.Get(loc, buf)
+ c.Check(err == nil, check.Equals, test.canGet)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ setup()
+ err = v.Trash(loc)
+ c.Check(err == nil, check.Equals, test.canTrash)
+ _, err = v.Get(loc, buf)
+ c.Check(err == nil, check.Equals, test.canGetAfterTrash)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ setup()
+ err = v.Untrash(loc)
+ c.Check(err == nil, check.Equals, test.canUntrash)
+
+ setup()
+ v.EmptyTrash()
+ _, err = v.Bucket.Head("trash/"+loc, nil)
+ c.Check(err == nil, check.Equals, test.haveTrashAfterEmpty)
+ }
+}
+
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index c17c837..bc3e537 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -895,6 +895,9 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
t.Fatal(err)
}
+ // Untrash might have updated the timestamp, so backdate again
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
// Second set: EmptyTrash after the trash deadline has passed.
trashLifetime = time.Nanosecond
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list