[ARVADOS] created: 2.1.0-1381-g9bbd276ee
Git user
git at public.arvados.org
Mon Sep 20 21:47:21 UTC 2021
at 9bbd276ee93883669b5872e9afed425dab2e79f1 (commit)
commit 9bbd276ee93883669b5872e9afed425dab2e79f1
Author: Tom Clegg <tom at curii.com>
Date: Mon Sep 20 17:47:07 2021 -0400
17749: Document PrefixLength config.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/doc/install/configure-s3-object-storage.html.textile.liquid b/doc/install/configure-s3-object-storage.html.textile.liquid
index 6a3c396d6..e6b1e095e 100644
--- a/doc/install/configure-s3-object-storage.html.textile.liquid
+++ b/doc/install/configure-s3-object-storage.html.textile.liquid
@@ -67,6 +67,23 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
# Use the AWS S3 v2 Go driver instead of the goamz driver.
UseAWSS3v2Driver: false
+ # By default keepstore stores data using the MD5 checksum
+ # (32 hexadecimal characters) as the object name, e.g.,
+ # "0123456abc...". Setting PrefixLength to 3 changes this
+ # naming scheme to "012/0123456abc...". This can improve
+ # performance, depending on the S3 service being used. For
+ # example, PrefixLength 3 is recommended to avoid AWS
+ # limitations on the number of read/write operations per
+ # second per prefix (see
+ # https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/).
+ #
+ # Note that changing PrefixLength on an existing volume is
+ # not currently supported. Once you have started using a
+ # bucket as an Arvados volume, you should not change its
+ # configured PrefixLength, or configure another volume using
+ # the same bucket and a different PrefixLength.
+ PrefixLength: 0
+
# Requested page size for "list bucket contents" requests.
IndexPageSize: 1000
commit 587977ae5cf96da6eae6cdcb47b7af63ec73c57e
Author: Tom Clegg <tom at curii.com>
Date: Mon Sep 20 17:46:46 2021 -0400
17749: Support PrefixLength in old S3 driver.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index fdc2ed56a..4c43b3f46 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -281,17 +281,17 @@ func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
return ttl, nil
}
-func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+func (v *S3Volume) getReaderWithContext(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
ready := make(chan bool)
go func() {
- rdr, err = v.getReader(loc)
+ rdr, err = v.getReader(key)
close(ready)
}()
select {
case <-ready:
return
case <-ctx.Done():
- v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
+ v.logger.Debugf("s3: abandoning getReader(%s): %s", key, ctx.Err())
go func() {
<-ready
if err == nil {
@@ -307,28 +307,28 @@ func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io
// In situations where (Bucket)GetReader would fail because the block
// disappeared in a Trash race, getReader calls fixRace to recover the
// data, and tries again.
-func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
- rdr, err = v.bucket.GetReader(loc)
+func (v *S3Volume) getReader(key string) (rdr io.ReadCloser, err error) {
+ rdr, err = v.bucket.GetReader(key)
err = v.translateError(err)
if err == nil || !os.IsNotExist(err) {
return
}
- _, err = v.bucket.Head("recent/"+loc, nil)
+ _, err = v.bucket.Head("recent/"+key, nil)
err = v.translateError(err)
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
return
}
- if !v.fixRace(loc) {
+ if !v.fixRace(key) {
err = os.ErrNotExist
return
}
- rdr, err = v.bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(key)
if err != nil {
- v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
+ v.logger.Warnf("reading %s after successful fixRace: %s", key, err)
err = v.translateError(err)
}
return
@@ -337,7 +337,8 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
// Get a block: copy the block data into buf, and return the number of
// bytes copied.
func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
- rdr, err := v.getReaderWithContext(ctx, loc)
+ key := v.key(loc)
+ rdr, err := v.getReaderWithContext(ctx, key)
if err != nil {
return 0, err
}
@@ -373,9 +374,10 @@ func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error)
// Compare the given data with the stored data.
func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+ key := v.key(loc)
errChan := make(chan error, 1)
go func() {
- _, err := v.bucket.Head("recent/"+loc, nil)
+ _, err := v.bucket.Head("recent/"+key, nil)
errChan <- err
}()
var err error
@@ -407,7 +409,7 @@ func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error
// problem on to our clients.
return v.translateError(err)
}
- rdr, err := v.getReaderWithContext(ctx, loc)
+ rdr, err := v.getReaderWithContext(ctx, key)
if err != nil {
return err
}
@@ -438,6 +440,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
}
+ key := v.key(loc)
+
// Send the block data through a pipe, so that (if we need to)
// we can close the pipe early and abandon our PutReader()
// goroutine, without worrying about PutReader() accessing our
@@ -457,11 +461,11 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
}
}()
defer close(ready)
- err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+ err = v.bucket.PutReader(key, bufr, int64(size), "application/octet-stream", s3ACL, opts)
if err != nil {
return
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
}()
select {
case <-ctx.Done():
@@ -487,37 +491,39 @@ func (v *S3Volume) Touch(loc string) error {
if v.volume.ReadOnly {
return MethodDisabledError
}
- _, err := v.bucket.Head(loc, nil)
+ key := v.key(loc)
+ _, err := v.bucket.Head(key, nil)
err = v.translateError(err)
- if os.IsNotExist(err) && v.fixRace(loc) {
+ if os.IsNotExist(err) && v.fixRace(key) {
// The data object got trashed in a race, but fixRace
// rescued it.
} else if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
- _, err := v.bucket.Head(loc, nil)
+ key := v.key(loc)
+ _, err := v.bucket.Head(key, nil)
if err != nil {
return zeroTime, v.translateError(err)
}
- resp, err := v.bucket.Head("recent/"+loc, nil)
+ resp, err := v.bucket.Head("recent/"+key, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
return zeroTime, v.translateError(err)
}
- v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
- resp, err = v.bucket.Head("recent/"+loc, nil)
+ v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+key)
+ resp, err = v.bucket.Head("recent/"+key, nil)
if err != nil {
- v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
return zeroTime, v.translateError(err)
}
} else if err != nil {
@@ -534,14 +540,14 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
dataL := s3Lister{
Logger: v.logger,
Bucket: v.bucket.Bucket(),
- Prefix: prefix,
+ Prefix: v.key(prefix),
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
recentL := s3Lister{
Logger: v.logger,
Bucket: v.bucket.Bucket(),
- Prefix: "recent/" + prefix,
+ Prefix: "recent/" + v.key(prefix),
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
@@ -553,7 +559,8 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// over all of them needlessly with dataL.
break
}
- if !v.isKeepBlock(data.Key) {
+ loc, isBlk := v.isKeepBlock(data.Key)
+ if !isBlk {
continue
}
@@ -589,7 +596,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// We truncate sub-second precision here. Otherwise
// timestamps will never match the RFC1123-formatted
// Last-Modified values parsed by Mtime().
- fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.Unix()*1000000000)
+ fmt.Fprintf(writer, "%s+%d %d\n", loc, data.Size, t.Unix()*1000000000)
}
return dataL.Error()
}
@@ -604,27 +611,28 @@ func (v *S3Volume) Trash(loc string) error {
} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
return nil
}
+ key := v.key(loc)
if v.cluster.Collections.BlobTrashLifetime == 0 {
if !v.UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
- err := v.checkRaceWindow(loc)
+ err := v.checkRaceWindow(key)
if err != nil {
return err
}
- err = v.safeCopy("trash/"+loc, loc)
+ err = v.safeCopy("trash/"+key, key)
if err != nil {
return err
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
-// checkRaceWindow returns a non-nil error if trash/loc is, or might
-// be, in the race window (i.e., it's not safe to trash loc).
-func (v *S3Volume) checkRaceWindow(loc string) error {
- resp, err := v.bucket.Head("trash/"+loc, nil)
+// checkRaceWindow returns a non-nil error if trash/key is, or might
+// be, in the race window (i.e., it's not safe to trash key).
+func (v *S3Volume) checkRaceWindow(key string) error {
+ resp, err := v.bucket.Head("trash/"+key, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// OK, trash/X doesn't exist so we're not in the race
@@ -646,7 +654,7 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
// trash/X's lifetime. The new timestamp might not
// become visible until now+raceWindow, and EmptyTrash
// is allowed to delete trash/X before then.
- return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
}
// trash/X exists, but it won't be eligible for deletion until
// after now+raceWindow, so it's safe to overwrite it.
@@ -694,11 +702,12 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
// Untrash moves block from trash back into store
func (v *S3Volume) Untrash(loc string) error {
- err := v.safeCopy(loc, "trash/"+loc)
+ key := v.key(loc)
+ err := v.safeCopy(key, "trash/"+key)
if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
@@ -725,19 +734,33 @@ func (v *S3Volume) String() string {
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-func (v *S3Volume) isKeepBlock(s string) bool {
- return s3KeepBlockRegexp.MatchString(s)
+func (v *S3Volume) isKeepBlock(s string) (string, bool) {
+ if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
+ s = s[v.PrefixLength+1:]
+ }
+ return s, s3KeepBlockRegexp.MatchString(s)
+}
+
+// Return the key used for a given loc. If PrefixLength==0 then
+// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
+// "abc/abcdef0123", etc.
+func (v *S3Volume) key(loc string) string {
+ if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
+ return loc[:v.PrefixLength] + "/" + loc
+ } else {
+ return loc
+ }
}
// fixRace(X) is called when "recent/X" exists but "X" doesn't
-// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
-// there was a race between Put and Trash, fixRace recovers from the
-// race by Untrashing the block.
-func (v *S3Volume) fixRace(loc string) bool {
- trash, err := v.bucket.Head("trash/"+loc, nil)
+// exist. If the timestamps on "recent/X" and "trash/X" indicate there
+// was a race between Put and Trash, fixRace recovers from the race by
+// Untrashing the block.
+func (v *S3Volume) fixRace(key string) bool {
+ trash, err := v.bucket.Head("trash/"+key, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
- v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
}
return false
}
@@ -747,9 +770,9 @@ func (v *S3Volume) fixRace(loc string) bool {
return false
}
- recent, err := v.bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+key, nil)
if err != nil {
- v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
return false
}
recentTime, err := v.lastModified(recent)
@@ -765,9 +788,9 @@ func (v *S3Volume) fixRace(loc string) bool {
return false
}
- v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
- v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
- err = v.safeCopy(loc, "trash/"+loc)
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
+ err = v.safeCopy(key, "trash/"+key)
if err != nil {
v.logger.WithError(err).Error("fixRace: copy failed")
return false
@@ -803,8 +826,9 @@ func (v *S3Volume) EmptyTrash() {
startT := time.Now()
emptyOneKey := func(trash *s3.Key) {
- loc := trash.Key[6:]
- if !v.isKeepBlock(loc) {
+ key := trash.Key[6:]
+ loc, isBlk := v.isKeepBlock(key)
+ if !isBlk {
return
}
atomic.AddInt64(&bytesInTrash, trash.Size)
@@ -815,7 +839,7 @@ func (v *S3Volume) EmptyTrash() {
v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
return
}
- recent, err := v.bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+key, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
@@ -824,12 +848,12 @@ func (v *S3Volume) EmptyTrash() {
}
return
} else if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
return
}
recentT, err := v.lastModified(recent)
if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
+ v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+key, recent.Header.Get("Last-Modified"))
return
}
if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -843,14 +867,14 @@ func (v *S3Volume) EmptyTrash() {
// < BlobSigningTTL - raceWindow) is
// necessary to avoid starvation.
v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
- v.fixRace(loc)
+ v.fixRace(key)
v.Touch(loc)
return
}
- _, err := v.bucket.Head(loc, nil)
+ _, err := v.bucket.Head(key, nil)
if os.IsNotExist(err) {
v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
- v.fixRace(loc)
+ v.fixRace(key)
return
} else if err != nil {
v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
@@ -868,18 +892,18 @@ func (v *S3Volume) EmptyTrash() {
atomic.AddInt64(&bytesDeleted, trash.Size)
atomic.AddInt64(&blocksDeleted, 1)
- _, err = v.bucket.Head(loc, nil)
+ _, err = v.bucket.Head(key, nil)
if err == nil {
- v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", key, key)
return
}
if !os.IsNotExist(v.translateError(err)) {
- v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
return
}
- err = v.bucket.Del("recent/" + loc)
+ err = v.bucket.Del("recent/" + key)
if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
}
}
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index cb08e44ac..caf460571 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -76,6 +76,14 @@ func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
})
}
+func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ v.PrefixLength = 3
+ return v
+ })
+}
+
func (s *StubbedS3Suite) TestIndex(c *check.C) {
v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
v.IndexPageSize = 3
@@ -547,13 +555,14 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
- err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+ key := v.key(loc)
+ err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
v.logger.Printf("PutRaw: %s: %+v", loc, err)
}
- err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+ v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
}
}
@@ -562,7 +571,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
// while we do this.
func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
v.serverClock.now = &lastPut
- err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
panic(err)
}
commit 669e81ca39e6bccf915b93cfbee66772a7a9e1ec
Author: Tom Clegg <tom at curii.com>
Date: Mon Sep 20 17:46:43 2021 -0400
17749: Support PrefixLength in new S3 driver.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 4bcffd909..5bd575b12 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1297,6 +1297,7 @@ Clusters:
ConnectTimeout: 1m
ReadTimeout: 10m
RaceWindow: 24h
+ PrefixLength: 0
# Use aws-s3-go (v2) instead of goamz
UseAWSS3v2Driver: false
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index fd07592e9..4e24a5c7f 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -1303,6 +1303,7 @@ Clusters:
ConnectTimeout: 1m
ReadTimeout: 10m
RaceWindow: 24h
+ PrefixLength: 0
# Use aws-s3-go (v2) instead of goamz
UseAWSS3v2Driver: false
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index ad9a4da03..f1d27b8dc 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -311,6 +311,7 @@ type S3VolumeDriverParameters struct {
ReadTimeout Duration
RaceWindow Duration
UnsafeDelete bool
+ PrefixLength int
}
type AzureVolumeDriverParameters struct {
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
index baded4116..ba0c05749 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3aws_volume.go
@@ -83,8 +83,22 @@ const (
var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
var s3AWSZeroTime time.Time
-func (v *S3AWSVolume) isKeepBlock(s string) bool {
- return s3AWSKeepBlockRegexp.MatchString(s)
+func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
+ if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
+ s = s[v.PrefixLength+1:]
+ }
+ return s, s3AWSKeepBlockRegexp.MatchString(s)
+}
+
+// Return the key used for a given loc. If PrefixLength==0 then
+// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
+// "abc/abcdef0123", etc.
+func (v *S3AWSVolume) key(loc string) string {
+ if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
+ return loc[:v.PrefixLength] + "/" + loc
+ } else {
+ return loc
+ }
}
func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
@@ -109,11 +123,12 @@ func (v *S3AWSVolume) translateError(err error) error {
return err
}
-// safeCopy calls CopyObjectRequest, and checks the response to make sure the
-// copy succeeded and updated the timestamp on the destination object
+// safeCopy calls CopyObjectRequest, and checks the response to make
+// sure the copy succeeded and updated the timestamp on the
+// destination object
//
-// (If something goes wrong during the copy, the error will be embedded in the
-// 200 OK response)
+// (If something goes wrong during the copy, the error will be
+// embedded in the 200 OK response)
func (v *S3AWSVolume) safeCopy(dst, src string) error {
input := &s3.CopyObjectInput{
Bucket: aws.String(v.bucket.bucket),
@@ -222,9 +237,10 @@ func (v *S3AWSVolume) GetDeviceID() string {
// Compare the given data with the stored data.
func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+ key := v.key(loc)
errChan := make(chan error, 1)
go func() {
- _, err := v.Head("recent/" + loc)
+ _, err := v.head("recent/" + key)
errChan <- err
}()
var err error
@@ -234,8 +250,8 @@ func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) er
case err = <-errChan:
}
if err != nil {
- // Checking for "loc" itself here would interfere with
- // future GET requests.
+ // Checking for the key itself here would interfere
+ // with future GET requests.
//
// On AWS, if X doesn't exist, a HEAD or GET request
// for X causes X's non-existence to be cached. Thus,
@@ -259,7 +275,7 @@ func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) er
input := &s3.GetObjectInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(loc),
+ Key: aws.String(key),
}
req := v.bucket.svc.GetObjectRequest(input)
@@ -283,29 +299,30 @@ func (v *S3AWSVolume) EmptyTrash() {
startT := time.Now()
emptyOneKey := func(trash *s3.Object) {
- loc := strings.TrimPrefix(*trash.Key, "trash/")
- if !v.isKeepBlock(loc) {
+ key := strings.TrimPrefix(*trash.Key, "trash/")
+ loc, isblk := v.isKeepBlock(key)
+ if !isblk {
return
}
atomic.AddInt64(&bytesInTrash, *trash.Size)
atomic.AddInt64(&blocksInTrash, 1)
trashT := *(trash.LastModified)
- recent, err := v.Head("recent/" + loc)
+ recent, err := v.head("recent/" + key)
if err != nil && os.IsNotExist(v.translateError(err)) {
- v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
+ v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+key, err)
err = v.Untrash(loc)
if err != nil {
v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
}
return
} else if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
return
}
if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
- // recent/loc is too old to protect
+ // recent/key is too old to protect
// loc from being Trashed again during
// the raceWindow that starts if we
// delete trash/X now.
@@ -314,14 +331,14 @@ func (v *S3AWSVolume) EmptyTrash() {
// < BlobSigningTTL - raceWindow) is
// necessary to avoid starvation.
v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
- v.fixRace(loc)
+ v.fixRace(key)
v.Touch(loc)
return
}
- _, err := v.Head(loc)
+ _, err := v.head(key)
if os.IsNotExist(err) {
v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
- v.fixRace(loc)
+ v.fixRace(key)
return
} else if err != nil {
v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
@@ -339,18 +356,18 @@ func (v *S3AWSVolume) EmptyTrash() {
atomic.AddInt64(&bytesDeleted, *trash.Size)
atomic.AddInt64(&blocksDeleted, 1)
- _, err = v.Head(loc)
+ _, err = v.head(key)
if err == nil {
v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
return
}
if !os.IsNotExist(v.translateError(err)) {
- v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
return
}
- err = v.bucket.Del("recent/" + loc)
+ err = v.bucket.Del("recent/" + key)
if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
}
}
@@ -386,21 +403,21 @@ func (v *S3AWSVolume) EmptyTrash() {
}
// fixRace(X) is called when "recent/X" exists but "X" doesn't
-// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
-// there was a race between Put and Trash, fixRace recovers from the
-// race by Untrashing the block.
-func (v *S3AWSVolume) fixRace(loc string) bool {
- trash, err := v.Head("trash/" + loc)
+// exist. If the timestamps on "recent/X" and "trash/X" indicate there
+// was a race between Put and Trash, fixRace recovers from the race by
+// Untrashing the block.
+func (v *S3AWSVolume) fixRace(key string) bool {
+ trash, err := v.Head("trash/" + key)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
- v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
}
return false
}
- recent, err := v.Head("recent/" + loc)
+ recent, err := v.Head("recent/" + key)
if err != nil {
- v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
return false
}
@@ -413,9 +430,9 @@ func (v *S3AWSVolume) fixRace(loc string) bool {
return false
}
- v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
- v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
- err = v.safeCopy(loc, "trash/"+loc)
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
+ err = v.safeCopy(key, "trash/"+key)
if err != nil {
v.logger.WithError(err).Error("fixRace: copy failed")
return false
@@ -424,9 +441,13 @@ func (v *S3AWSVolume) fixRace(loc string) bool {
}
func (v *S3AWSVolume) Head(loc string) (result *s3.HeadObjectOutput, err error) {
+ return v.head(v.key(loc))
+}
+
+func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
input := &s3.HeadObjectInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(loc),
+ Key: aws.String(key),
}
req := v.bucket.svc.HeadObjectRequest(input)
@@ -449,7 +470,7 @@ func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, err
return getWithPipe(ctx, loc, buf, v)
}
-func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+func (v *S3AWSVolume) readWorker(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
buf := make([]byte, 0, 67108864)
awsBuf := aws.NewWriteAtBuffer(buf)
@@ -462,7 +483,7 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCl
_, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(loc),
+ Key: aws.String(key),
})
v.bucket.stats.TickOps("get")
v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
@@ -478,7 +499,8 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCl
// ReadBlock implements BlockReader.
func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
- rdr, err := v.readWorker(ctx, loc)
+ key := v.key(loc)
+ rdr, err := v.readWorker(ctx, key)
if err == nil {
_, err2 := io.Copy(w, rdr)
@@ -493,19 +515,19 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) er
return err
}
- _, err = v.Head("recent/" + loc)
+ _, err = v.Head("recent/" + key)
err = v.translateError(err)
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
return err
}
- if !v.fixRace(loc) {
+ if !v.fixRace(key) {
err = os.ErrNotExist
return err
}
- rdr, err = v.readWorker(ctx, loc)
+ rdr, err = v.readWorker(ctx, key)
if err != nil {
v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
err = v.translateError(err)
@@ -517,7 +539,7 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) er
return err
}
-func (v *S3AWSVolume) writeObject(ctx context.Context, name string, r io.Reader) error {
+func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
if r == nil {
// r == nil leads to a memory violation in func readFillBuf in
// aws-sdk-go-v2 at v0.23.0/service/s3/s3manager/upload.go
@@ -526,13 +548,13 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, name string, r io.Reader)
uploadInput := s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(name),
+ Key: aws.String(key),
Body: r,
}
- if len(name) == 32 {
+ if loc, ok := v.isKeepBlock(key); ok {
var contentMD5 string
- md5, err := hex.DecodeString(name)
+ md5, err := hex.DecodeString(loc)
if err != nil {
return err
}
@@ -578,11 +600,12 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
}
r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
- err := v.writeObject(ctx, loc, r)
+ key := v.key(loc)
+ err := v.writeObject(ctx, key, r)
if err != nil {
return err
}
- return v.writeObject(ctx, "recent/"+loc, nil)
+ return v.writeObject(ctx, "recent/"+key, nil)
}
type s3awsLister struct {
@@ -675,6 +698,7 @@ func (lister *s3awsLister) pop() (k *s3.Object) {
// IndexTo writes a complete list of locators with the given prefix
// for which Get() can retrieve data.
func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
+ prefix = v.key(prefix)
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3awsLister{
Logger: v.logger,
@@ -698,7 +722,8 @@ func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
// over all of them needlessly with dataL.
break
}
- if !v.isKeepBlock(*data.Key) {
+ loc, isblk := v.isKeepBlock(*data.Key)
+ if !isblk {
continue
}
@@ -730,30 +755,31 @@ func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
// We truncate sub-second precision here. Otherwise
// timestamps will never match the RFC1123-formatted
// Last-Modified values parsed by Mtime().
- fmt.Fprintf(writer, "%s+%d %d\n", *data.Key, *data.Size, stamp.LastModified.Unix()*1000000000)
+ fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
}
return dataL.Error()
}
// Mtime returns the stored timestamp for the given locator.
func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
- _, err := v.Head(loc)
+ key := v.key(loc)
+ _, err := v.head(key)
if err != nil {
return s3AWSZeroTime, v.translateError(err)
}
- resp, err := v.Head("recent/" + loc)
+ resp, err := v.head("recent/" + key)
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.writeObject(context.Background(), "recent/"+loc, nil)
+ err = v.writeObject(context.Background(), "recent/"+key, nil)
if err != nil {
- v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
return s3AWSZeroTime, v.translateError(err)
}
- v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+loc)
- resp, err = v.Head("recent/" + loc)
+ v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
+ resp, err = v.head("recent/" + key)
if err != nil {
- v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
return s3AWSZeroTime, v.translateError(err)
}
} else if err != nil {
@@ -784,22 +810,23 @@ func (v *S3AWSVolume) Touch(loc string) error {
if v.volume.ReadOnly {
return MethodDisabledError
}
- _, err := v.Head(loc)
+ key := v.key(loc)
+ _, err := v.head(key)
err = v.translateError(err)
- if os.IsNotExist(err) && v.fixRace(loc) {
+ if os.IsNotExist(err) && v.fixRace(key) {
// The data object got trashed in a race, but fixRace
// rescued it.
} else if err != nil {
return err
}
- err = v.writeObject(context.Background(), "recent/"+loc, nil)
+ err = v.writeObject(context.Background(), "recent/"+key, nil)
return v.translateError(err)
}
-// checkRaceWindow returns a non-nil error if trash/loc is, or might
-// be, in the race window (i.e., it's not safe to trash loc).
-func (v *S3AWSVolume) checkRaceWindow(loc string) error {
- resp, err := v.Head("trash/" + loc)
+// checkRaceWindow returns a non-nil error if trash/key is, or might
+// be, in the race window (i.e., it's not safe to trash key).
+func (v *S3AWSVolume) checkRaceWindow(key string) error {
+ resp, err := v.head("trash/" + key)
err = v.translateError(err)
if os.IsNotExist(err) {
// OK, trash/X doesn't exist so we're not in the race
@@ -817,7 +844,7 @@ func (v *S3AWSVolume) checkRaceWindow(loc string) error {
// trash/X's lifetime. The new timestamp might not
// become visible until now+raceWindow, and EmptyTrash
// is allowed to delete trash/X before then.
- return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
}
// trash/X exists, but it won't be eligible for deletion until
// after now+raceWindow, so it's safe to overwrite it.
@@ -831,7 +858,6 @@ func (b *s3AWSbucket) Del(path string) error {
}
req := b.svc.DeleteObjectRequest(input)
_, err := req.Send(context.Background())
- //err := b.Bucket().Del(path)
b.stats.TickOps("delete")
b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
b.stats.TickErr(err)
@@ -848,30 +874,32 @@ func (v *S3AWSVolume) Trash(loc string) error {
} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
return nil
}
+ key := v.key(loc)
if v.cluster.Collections.BlobTrashLifetime == 0 {
if !v.UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
- err := v.checkRaceWindow(loc)
+ err := v.checkRaceWindow(key)
if err != nil {
return err
}
- err = v.safeCopy("trash/"+loc, loc)
+ err = v.safeCopy("trash/"+key, key)
if err != nil {
return err
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
// Untrash moves block from trash back into store
func (v *S3AWSVolume) Untrash(loc string) error {
- err := v.safeCopy(loc, "trash/"+loc)
+ key := v.key(loc)
+ err := v.safeCopy(key, "trash/"+key)
if err != nil {
return err
}
- err = v.writeObject(context.Background(), "recent/"+loc, nil)
+ err = v.writeObject(context.Background(), "recent/"+key, nil)
return v.translateError(err)
}
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go
index 0387d52f1..23c607ce3 100644
--- a/services/keepstore/s3aws_volume_test.go
+++ b/services/keepstore/s3aws_volume_test.go
@@ -87,6 +87,14 @@ func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
})
}
+func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ v.PrefixLength = 3
+ return v
+ })
+}
+
func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
v.IndexPageSize = 3
@@ -603,7 +611,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Clust
// PutRaw skips the ContentMD5 test
func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
-
+ key := v.key(loc)
r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
@@ -613,35 +621,35 @@ func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(loc),
+ Key: aws.String(key),
Body: r,
})
if err != nil {
- v.logger.Printf("PutRaw: %s: %+v", loc, err)
+ v.logger.Printf("PutRaw: %s: %+v", key, err)
}
empty := bytes.NewReader([]byte{})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String("recent/" + loc),
+ Key: aws.String("recent/" + key),
Body: empty,
})
if err != nil {
- v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+ v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
}
}
// TouchWithDate turns back the clock while doing a Touch(). We assume
// there are no other operations happening on the same s3test server
// while we do this.
-func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
v.serverClock.now = &lastPut
uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
empty := bytes.NewReader([]byte{})
_, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String("recent/" + locator),
+ Key: aws.String("recent/" + v.key(loc)),
Body: empty,
})
if err != nil {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list