[ARVADOS] updated: 1.3.0-2815-g5771cf273
Git user
git at public.arvados.org
Mon Aug 3 16:22:56 UTC 2020
Summary of changes:
build/run-library.sh | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
discards c71daa26278eaf3d09ce20f563f985d43c9799fb (commit)
discards 95babd9e21eb871eed9535fad3d2af8ecdeb471d (commit)
discards 8f3b2dedef2677654197e9838939d9abe7cc3791 (commit)
discards ea57684c255434bcd25ec150a3979ce783a2183c (commit)
via 5771cf273a4e09a5666122a7f67b4f088927e29d (commit)
via 63be17fdad6232fc9ff59738b6b358410953274c (commit)
via a79a27e215ce709455d1f5354b4ae5d045ed32dd (commit)
via ffeb31033857f36d26d06f8b7c2550a7950f941f (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (c71daa26278eaf3d09ce20f563f985d43c9799fb)
\
N -- N -- N (5771cf273a4e09a5666122a7f67b4f088927e29d)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 5771cf273a4e09a5666122a7f67b4f088927e29d
Author: Ward Vandewege <ward at curii.com>
Date: Mon Aug 3 11:14:29 2020 -0400
10477: Fix timezone handling in s3aws tests, to avoid issues when the tests are
run in non-UTC environments.
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go
index 57d81dbe0..97045a660 100644
--- a/services/keepstore/s3aws_volume_test.go
+++ b/services/keepstore/s3aws_volume_test.go
@@ -41,9 +41,9 @@ type s3AWSFakeClock struct {
func (c *s3AWSFakeClock) Now() time.Time {
if c.now == nil {
- return time.Now()
+ return time.Now().UTC()
}
- return *c.now
+ return c.now.UTC()
}
func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
commit 63be17fdad6232fc9ff59738b6b358410953274c
Author: Ward Vandewege <ward at curii.com>
Date: Wed Jul 29 11:58:16 2020 -0400
10477: implement review comments.
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>
diff --git a/doc/install/configure-s3-object-storage.html.textile.liquid b/doc/install/configure-s3-object-storage.html.textile.liquid
index 40cbbb533..76a2f3ab5 100644
--- a/doc/install/configure-s3-object-storage.html.textile.liquid
+++ b/doc/install/configure-s3-object-storage.html.textile.liquid
@@ -64,8 +64,8 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
# might be needed for other S3-compatible services.
V2Signature: false
- # Use the AWS S3 Go driver instead of the goamz driver.
- AlternateDriver: false
+ # Use the AWS S3 v2 Go driver instead of the goamz driver.
+ UseAWSS3v2Driver: false
# Requested page size for "list bucket contents" requests.
IndexPageSize: 1000
@@ -98,20 +98,8 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
StorageClasses: null
</code></pre></notextile>
-Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @AlternateDriver@ flag to @true at .
+Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @UseAWSS3v2Driver@ flag to @true at .
The @aws-sdk-go-v2@ does not support the old S3 v2 signing algorithm. This will not affect interacting with AWS S3, but it might be an issue when Keep is backed by a very old version of a third party S3-compatible service.
-The @aws-sdk-go-v2@ driver has faster _single thread_ read and write performance than the @goamz@ driver. Here are some benchmark numbers against AWS S3, as measured in July 2020. They were generated with the @keep-exercise@ tool in an Arvados installation with one dedicated Keepstore node (c5n.2xlarge) and one dedicated node for running @keep-exercise@ (c5n.2xlarge). The Keepstore node was backed by one S3 bucket, in a VPC with an S3 endpoint installed. Versioning, Server Access logging, Static website hosting, Object-level logging and Default encryption were disabled. Object lock, Transfer acceleration and Requester pays were also disabled. There were no Active notifications. Each test consisted of 4 one minute runs, which were averaged into one number. The tests were repeated 3 times, and of those 3 runs, the highest average speed was selected and included in the table below.
-
-table(table table-bordered table-condensed).
-||_. goamz |_. aws-sdk-go-v2 |_. command line|
-|single thread read performance (average)|32.53 MiB/s|79.48 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
-|single thread write performance (average)|39.75 MiB/s|41.05 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
-
-Because both S3 and Keep are optimized for _aggregate_ througput, the single thread performance is not as important as it may seem at first glance. When using 20 concurrent read or write threads, the numbers from both drivers are more closely aligned:
-
-table(table table-bordered table-condensed).
-||_. goamz |_. aws-sdk-go-v2 |_. command line|
-|20 thread read performance (average)|585.60 MiB/s|898.93 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 20 -wthreads 0|
-|20 thread write performance (average)|610.40 MiB/s|688.25 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 0 -wthreads 20|
+The @aws-sdk-go-v2@ driver can improve read performance by 50-100% over the @goamz@ driver, but it has not had as much production use. See the "wiki":https://dev.arvados.org/projects/arvados/wiki/Keep_real_world_performance_numbers for details.
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index d2ccefe8b..01d399943 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1073,7 +1073,7 @@ Clusters:
ReadTimeout: 10m
RaceWindow: 24h
# Use aws-s3-go (v2) instead of goamz
- AlternateDriver: false
+ UseAWSS3v2Driver: false
# For S3 driver, potentially unsafe tuning parameter,
# intentionally excluded from main documentation.
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index d6d198429..508652a8a 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -1079,7 +1079,7 @@ Clusters:
ReadTimeout: 10m
RaceWindow: 24h
# Use aws-s3-go (v2) instead of goamz
- AlternateDriver: false
+ UseAWSS3v2Driver: false
# For S3 driver, potentially unsafe tuning parameter,
# intentionally excluded from main documentation.
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 515dc7973..9cf1ed3cd 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -277,7 +277,7 @@ type S3VolumeDriverParameters struct {
Bucket string
LocationConstraint bool
V2Signature bool
- AlternateDriver bool
+ UseAWSS3v2Driver bool
IndexPageSize int
ConnectTimeout Duration
ReadTimeout Duration
@@ -553,7 +553,7 @@ func (ss *StringSet) UnmarshalJSON(data []byte) error {
return err
}
*ss = make(map[string]struct{}, len(hash))
- for t, _ := range hash {
+ for t := range hash {
(*ss)[t] = struct{}{}
}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 8e32e592b..235d369b5 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -37,7 +37,7 @@ func init() {
func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
- err := json.Unmarshal(volume.DriverParameters, &v)
+ err := json.Unmarshal(volume.DriverParameters, v)
if err != nil {
return nil, err
}
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
index 9bbb3c5b1..c9fa7fce5 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3aws_volume.go
@@ -59,22 +59,28 @@ type s3AWSbucket struct {
}
// chooseS3VolumeDriver distinguishes between the old goamz driver and
-// aws-sdk-go based on the AlternateDriver feature flag
+// aws-sdk-go based on the UseAWSS3v2Driver feature flag
func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
- err := json.Unmarshal(volume.DriverParameters, &v)
+ err := json.Unmarshal(volume.DriverParameters, v)
if err != nil {
return nil, err
}
- if v.AlternateDriver {
- logger.Debugln("Using alternate S3 driver (aws-go)")
+ if v.UseAWSS3v2Driver {
+ logger.Debugln("Using AWS S3 v2 driver")
return newS3AWSVolume(cluster, volume, logger, metrics)
} else {
- logger.Debugln("Using standard S3 driver (goamz)")
+ logger.Debugln("Using goamz S3 driver")
return newS3Volume(cluster, volume, logger, metrics)
}
}
+const (
+ PartSize = 5 * 1024 * 1024
+ ReadConcurrency = 13
+ WriteConcurrency = 5
+)
+
var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
var s3AWSZeroTime time.Time
@@ -83,14 +89,12 @@ func (v *S3AWSVolume) isKeepBlock(s string) bool {
}
func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
- logger.Debugln("in newS3AWSVolume")
v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
- err := json.Unmarshal(volume.DriverParameters, &v)
+ err := json.Unmarshal(volume.DriverParameters, v)
if err != nil {
return nil, err
}
v.logger = logger.WithField("Volume", v.String())
- v.logger.Debugln("in newS3AWSVolume after volume set")
return v, v.check("")
}
@@ -148,6 +152,10 @@ func (v *S3AWSVolume) check(ec2metadataHostname string) error {
return errors.New("DriverParameters: RaceWindow must not be negative")
}
+ if v.V2Signature {
+ return errors.New("DriverParameters: V2Signature is not supported")
+ }
+
defaultResolver := endpoints.NewDefaultResolver()
cfg := defaults.Config()
@@ -276,7 +284,6 @@ func (v *S3AWSVolume) EmptyTrash() {
startT := time.Now()
emptyOneKey := func(trash *s3.Object) {
- v.logger.Warnf("EmptyTrash: looking for trash marker %s with last modified date %s", *trash.Key, *trash.LastModified)
loc := strings.TrimPrefix(*trash.Key, "trash/")
if !v.isKeepBlock(loc) {
return
@@ -285,7 +292,6 @@ func (v *S3AWSVolume) EmptyTrash() {
atomic.AddInt64(&blocksInTrash, 1)
trashT := *(trash.LastModified)
- v.logger.Infof("HEEEEEEE trashT key: %s, type: %T val: %s, startT is %s", *trash.Key, trashT, trashT, startT)
recent, err := v.Head("recent/" + loc)
if err != nil && os.IsNotExist(v.translateError(err)) {
v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
@@ -298,9 +304,7 @@ func (v *S3AWSVolume) EmptyTrash() {
v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
return
}
- v.logger.Infof("recent.LastModified type: %T val: %s", recent.LastModified, recent.LastModified)
if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
- v.logger.Infof("HERE! recent.lastmodified is smaller than blobsigningttl")
if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
// recent/loc is too old to protect
// loc from being Trashed again during
@@ -326,7 +330,6 @@ func (v *S3AWSVolume) EmptyTrash() {
}
}
if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
- v.logger.Infof("HERE! trashT for %s is smaller than blobtrashlifetime: %s < %s", *trash.Key, startT.Sub(trashT), v.cluster.Collections.BlobTrashLifetime.Duration())
return
}
err = v.bucket.Del(*trash.Key)
@@ -337,7 +340,6 @@ func (v *S3AWSVolume) EmptyTrash() {
atomic.AddInt64(&bytesDeleted, *trash.Size)
atomic.AddInt64(&blocksDeleted, 1)
- v.logger.Infof("HERE! trash.Key %s should have been deleted", *trash.Key)
_, err = v.Head(loc)
if err == nil {
v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
@@ -351,7 +353,6 @@ func (v *S3AWSVolume) EmptyTrash() {
if err != nil {
v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
}
- v.logger.Infof("HERE! recent/%s should have been deleted", loc)
}
var wg sync.WaitGroup
@@ -382,7 +383,7 @@ func (v *S3AWSVolume) EmptyTrash() {
if err := trashL.Error(); err != nil {
v.logger.WithError(err).Error("EmptyTrash: lister failed")
}
- v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
// fixRace(X) is called when "recent/X" exists but "X" doesn't
@@ -454,8 +455,8 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCl
awsBuf := aws.NewWriteAtBuffer(buf)
downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
- u.PartSize = 5 * 1024 * 1024
- u.Concurrency = 13
+ u.PartSize = PartSize
+ u.Concurrency = ReadConcurrency
})
v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
@@ -517,70 +518,36 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) er
return err
}
-func (b *s3AWSbucket) PutReader(path string, r io.Reader, length int64, contType string, contentMD5 string, contentSHA256 string) error {
- if length == 0 {
- // aws-sdk-go will only send Content-Length: 0 when reader
- // is nil due to net.http.Request.ContentLength
- // behavior. Otherwise, Content-Length header is
- // omitted which will cause some S3 services
- // (including AWS and Ceph RadosGW) to fail to create
- // empty objects.
- r = bytes.NewReader([]byte{})
- } else {
- r = NewCountingReader(r, b.stats.TickOutBytes)
+func (v *S3AWSVolume) writeObject(ctx context.Context, name string, r io.Reader) error {
+ if r == nil {
+ // r == nil leads to a memory violation in func readFillBuf in
+ // aws-sdk-go-v2 at v0.23.0/service/s3/s3manager/upload.go
+ r = bytes.NewReader(nil)
}
- uploader := s3manager.NewUploaderWithClient(b.svc)
- _, err := uploader.Upload(&s3manager.UploadInput{
- Bucket: aws.String(b.bucket),
- Key: aws.String(path),
- Body: r,
- }, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
- r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
- }))
- b.stats.TickOps("put")
- b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
- b.stats.TickErr(err)
- return err
-}
-
-// Put writes a block.
-func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
- return putWithPipe(ctx, loc, block, v)
-}
-
-// WriteBlock implements BlockWriter.
-func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
- if v.volume.ReadOnly {
- return MethodDisabledError
- }
-
- r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
uploadInput := s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
- Key: aws.String(loc),
+ Key: aws.String(name),
Body: r,
}
- //var contentMD5, contentSHA256 string
- var contentMD5 string
- md5, err := hex.DecodeString(loc)
- if err != nil {
- return err
- }
- contentMD5 = base64.StdEncoding.EncodeToString(md5)
- // See if this is the empty block
- if contentMD5 != "d41d8cd98f00b204e9800998ecf8427e" {
+ if len(name) == 32 {
+ var contentMD5 string
+ md5, err := hex.DecodeString(name)
+ if err != nil {
+ return err
+ }
+ contentMD5 = base64.StdEncoding.EncodeToString(md5)
uploadInput.ContentMD5 = &contentMD5
}
- // Some experimentation indicated that using concurrency 5 yields the best
+ // Experimentation indicated that using concurrency 5 yields the best
// throughput, better than higher concurrency (10 or 13) by ~5%.
// Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
// is detrimental to througput (minus ~15%).
uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
- u.PartSize = 5 * 1024 * 1024
- u.Concurrency = 5
+ u.PartSize = PartSize
+ u.Concurrency = WriteConcurrency
})
// Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
@@ -589,35 +556,39 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
// makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
// calculating the Sha-256 because we don't need it; we already use md5sum
// hashes that match the name of the block.
- _, err = uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
+ _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
}))
v.bucket.stats.TickOps("put")
v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
v.bucket.stats.TickErr(err)
+
+ return err
+}
+
+// Put writes a block.
+func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
+ return putWithPipe(ctx, loc, block, v)
+}
+
+// WriteBlock implements BlockWriter.
+func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
+ if v.volume.ReadOnly {
+ return MethodDisabledError
+ }
+
+ r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
+ err := v.writeObject(ctx, loc, r)
if err != nil {
return err
}
-
- empty := bytes.NewReader([]byte{})
- _, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
- Bucket: aws.String(v.bucket.bucket),
- Key: aws.String("recent/" + loc),
- Body: empty,
- }, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
- r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
- }))
- v.bucket.stats.TickOps("put")
- v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
- v.bucket.stats.TickErr(err)
-
- return err
+ return v.writeObject(ctx, "recent/"+loc, nil)
}
type s3awsLister struct {
Logger logrus.FieldLogger
- Bucket *s3AWSbucket //*s3.Bucket
+ Bucket *s3AWSbucket
Prefix string
PageSize int
Stats *s3awsbucketStats
@@ -772,12 +743,12 @@ func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+ err = v.writeObject(context.Background(), "recent/"+loc, nil)
if err != nil {
v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
return s3AWSZeroTime, v.translateError(err)
}
- v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
+ v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+loc)
resp, err = v.Head("recent/" + loc)
if err != nil {
v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
@@ -819,7 +790,7 @@ func (v *S3AWSVolume) Touch(loc string) error {
} else if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+ err = v.writeObject(context.Background(), "recent/"+loc, nil)
return v.translateError(err)
}
@@ -898,7 +869,7 @@ func (v *S3AWSVolume) Untrash(loc string) error {
if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+ err = v.writeObject(context.Background(), "recent/"+loc, nil)
return v.translateError(err)
}
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go
index 46fe07d16..57d81dbe0 100644
--- a/services/keepstore/s3aws_volume_test.go
+++ b/services/keepstore/s3aws_volume_test.go
@@ -167,7 +167,6 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
}
err := v.check(s.metadata.URL + "/latest")
creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
- fmt.Printf("%+v, %s\n", creds, err)
c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
@@ -321,9 +320,8 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
return
}
v.serverClock.now = &t
- fmt.Printf("USING TIMESTAMP %s to write key %s", t, key)
uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
- resp, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
Bucket: aws.String(v.bucket.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
@@ -331,10 +329,11 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
if err != nil {
panic(err)
}
- fmt.Println(resp)
v.serverClock.now = nil
- resp2, err := v.Head(key)
- fmt.Printf("KEY: %s\n%s", key, resp2)
+ _, err = v.Head(key)
+ if err != nil {
+ panic(err)
+ }
}
t0 := time.Now()
@@ -544,10 +543,11 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Clust
// fake s3
backend := s3mem.New(s3mem.WithTimeSource(clock))
- logger := new(LogrusLog)
+ // To enable GoFakeS3 debug logging, pass logger to gofakes3.WithLogger()
+ /* logger := new(LogrusLog)
ctxLogger := ctxlog.FromContext(context.Background())
- logger.log = &ctxLogger
- faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(logger), gofakes3.WithTimeSkewLimit(0))
+ logger.log = &ctxLogger */
+ faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(nil), gofakes3.WithTimeSkewLimit(0))
srv := httptest.NewServer(faker.Server())
endpoint := srv.URL
commit a79a27e215ce709455d1f5354b4ae5d045ed32dd
Author: Ward Vandewege <ward at curii.com>
Date: Fri Jul 24 13:55:51 2020 -0400
10477: disable sha-256 calculation by the S3 driver; we don't need it
and it slows uploads down because it's CPU bound.
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>
diff --git a/doc/install/configure-s3-object-storage.html.textile.liquid b/doc/install/configure-s3-object-storage.html.textile.liquid
index cfd436dd6..40cbbb533 100644
--- a/doc/install/configure-s3-object-storage.html.textile.liquid
+++ b/doc/install/configure-s3-object-storage.html.textile.liquid
@@ -107,11 +107,11 @@ The @aws-sdk-go-v2@ driver has faster _single thread_ read and write performance
table(table table-bordered table-condensed).
||_. goamz |_. aws-sdk-go-v2 |_. command line|
|single thread read performance (average)|32.53 MiB/s|79.48 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
-|single thread write performance (average)|39.75 MiB/s|49.58 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
+|single thread write performance (average)|39.75 MiB/s|41.05 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
Because both S3 and Keep are optimized for _aggregate_ througput, the single thread performance is not as important as it may seem at first glance. When using 20 concurrent read or write threads, the numbers from both drivers are more closely aligned:
table(table table-bordered table-condensed).
||_. goamz |_. aws-sdk-go-v2 |_. command line|
|20 thread read performance (average)|585.60 MiB/s|898.93 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 20 -wthreads 0|
-|20 thread write performance (average)|610.40 MiB/s|542.40 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 0 -wthreads 20|
+|20 thread write performance (average)|610.40 MiB/s|688.25 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 0 -wthreads 20|
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
index d4fe9fd15..9bbb3c5b1 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3aws_volume.go
@@ -534,7 +534,9 @@ func (b *s3AWSbucket) PutReader(path string, r io.Reader, length int64, contType
Bucket: aws.String(b.bucket),
Key: aws.String(path),
Body: r,
- })
+ }, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
+ r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
+ }))
b.stats.TickOps("put")
b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
@@ -570,10 +572,6 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
// See if this is the empty block
if contentMD5 != "d41d8cd98f00b204e9800998ecf8427e" {
uploadInput.ContentMD5 = &contentMD5
- // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
- // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
- // block, so there is no extra memory use to be concerned about. See
- // makeSha256Reader in aws/signer/v4/v4.go.
}
// Some experimentation indicated that using concurrency 5 yields the best
@@ -585,7 +583,15 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
u.Concurrency = 5
})
- _, err = uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions())
+ // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
+ // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
+ // block, so there is no extra memory use to be concerned about. See
+ // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
+ // calculating the Sha-256 because we don't need it; we already use md5sum
+ // hashes that match the name of the block.
+ _, err = uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
+ r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
+ }))
v.bucket.stats.TickOps("put")
v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
@@ -599,7 +605,9 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
Bucket: aws.String(v.bucket.bucket),
Key: aws.String("recent/" + loc),
Body: empty,
- })
+ }, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
+ r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
+ }))
v.bucket.stats.TickOps("put")
v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
v.bucket.stats.TickErr(err)
commit ffeb31033857f36d26d06f8b7c2550a7950f941f
Author: Ward Vandewege <ward at curii.com>
Date: Wed Jul 22 07:10:14 2020 -0400
10477: first version of the aws-sdk-go-v2 driver.
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>
diff --git a/doc/install/configure-s3-object-storage.html.textile.liquid b/doc/install/configure-s3-object-storage.html.textile.liquid
index b960ac1fd..cfd436dd6 100644
--- a/doc/install/configure-s3-object-storage.html.textile.liquid
+++ b/doc/install/configure-s3-object-storage.html.textile.liquid
@@ -64,6 +64,9 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
# might be needed for other S3-compatible services.
V2Signature: false
+ # Use the AWS S3 Go driver instead of the goamz driver.
+ AlternateDriver: false
+
# Requested page size for "list bucket contents" requests.
IndexPageSize: 1000
@@ -94,3 +97,21 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
# classes" in the "Admin" section of doc.arvados.org.
StorageClasses: null
</code></pre></notextile>
+
+Two S3 drivers are available. Historically, Arvados has used the @goamz@ driver to talk to S3-compatible services. More recently, support for the @aws-sdk-go-v2@ driver was added. This driver can be activated by setting the @AlternateDriver@ flag to @true at .
+
+The @aws-sdk-go-v2@ does not support the old S3 v2 signing algorithm. This will not affect interacting with AWS S3, but it might be an issue when Keep is backed by a very old version of a third party S3-compatible service.
+
+The @aws-sdk-go-v2@ driver has faster _single thread_ read and write performance than the @goamz@ driver. Here are some benchmark numbers against AWS S3, as measured in July 2020. They were generated with the @keep-exercise@ tool in an Arvados installation with one dedicated Keepstore node (c5n.2xlarge) and one dedicated node for running @keep-exercise@ (c5n.2xlarge). The Keepstore node was backed by one S3 bucket, in a VPC with an S3 endpoint installed. Versioning, Server Access logging, Static website hosting, Object-level logging and Default encryption were disabled. Object lock, Transfer acceleration and Requester pays were also disabled. There were no Active notifications. Each test consisted of 4 one minute runs, which were averaged into one number. The tests were repeated 3 times, and of those 3 runs, the highest average speed was selected and included in the table below.
+
+table(table table-bordered table-condensed).
+||_. goamz |_. aws-sdk-go-v2 |_. command line|
+|single thread read performance (average)|32.53 MiB/s|79.48 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
+|single thread write performance (average)|39.75 MiB/s|49.58 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 1 -wthreads 1|
+
+Because both S3 and Keep are optimized for _aggregate_ througput, the single thread performance is not as important as it may seem at first glance. When using 20 concurrent read or write threads, the numbers from both drivers are more closely aligned:
+
+table(table table-bordered table-condensed).
+||_. goamz |_. aws-sdk-go-v2 |_. command line|
+|20 thread read performance (average)|585.60 MiB/s|898.93 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 20 -wthreads 0|
+|20 thread write performance (average)|610.40 MiB/s|542.40 MiB/s|keep-exercise -repeat 4 -run-time 60s -vary-request -use-index -rthreads 0 -wthreads 20|
diff --git a/go.mod b/go.mod
index 884d1fcda..71052882a 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
github.com/arvados/cgofuse v1.2.0-arvados1
github.com/aws/aws-sdk-go v1.25.30
+ github.com/aws/aws-sdk-go-v2 v0.23.0
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092
github.com/coreos/go-oidc v2.1.0+incompatible
@@ -37,6 +38,7 @@ require (
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jmcvetta/randutil v0.0.0-20150817122601-2bb1b664bcff
github.com/jmoiron/sqlx v1.2.0
+ github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc
github.com/julienschmidt/httprouter v1.2.0
github.com/karalabe/xgo v0.0.0-20191115072854-c5ccff8648a7 // indirect
github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
@@ -58,7 +60,7 @@ require (
github.com/stretchr/testify v1.4.0 // indirect
github.com/xanzy/ssh-agent v0.1.0 // indirect
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
- golang.org/x/net v0.0.0-20190620200207-3b0461eec859
+ golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd
google.golang.org/api v0.13.0
diff --git a/go.sum b/go.sum
index ead655c9b..2565964e7 100644
--- a/go.sum
+++ b/go.sum
@@ -21,14 +21,18 @@ github.com/arvados/cgofuse v1.2.0-arvados1 h1:4Q4vRJ4hbTCcI4gGEaa6hqwj3rqlUuzeFQ
github.com/arvados/cgofuse v1.2.0-arvados1/go.mod h1:79WFV98hrkRHK9XPhh2IGGOwpFSjocsWubgxAs2KhRc=
github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef h1:cl7DIRbiAYNqaVxg3CZY8qfZoBOKrj06H/x9SPGaxas=
github.com/arvados/goamz v0.0.0-20190905141525-1bba09f407ef/go.mod h1:rCtgyMmBGEbjTm37fCuBYbNL0IhztiALzo3OB9HyiOM=
+github.com/aws/aws-sdk-go v1.17.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.30 h1:I9qj6zW3mMfsg91e+GMSN/INcaX9tTFvr/l/BAHKaIY=
github.com/aws/aws-sdk-go v1.25.30/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go-v2 v0.23.0 h1:+E1q1LLSfHSDn/DzOtdJOX+pLZE2HiNV2yO5AjZINwM=
+github.com/aws/aws-sdk-go-v2 v0.23.0/go.mod h1:2LhT7UgHOXK3UXONKI5OMgIyoQL6zTAw/jwIeX6yqzw=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092 h1:0Di2onNnlN5PAyWPbqlPyN45eOQ+QW/J9eqLynt4IV4=
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092/go.mod h1:8IzBjZCRSnsvM6MJMG8HNNtnzMl48H22rbJL2kRUJ0Y=
github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
@@ -74,7 +78,7 @@ github.com/go-ldap/ldap v3.0.3+incompatible h1:HTeSZO8hWMS1Rgb2Ziku6b8a7qRIZZMHj
github.com/go-ldap/ldap v3.0.3+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
-github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
+github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -91,6 +95,7 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -114,6 +119,8 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5i
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc h1:JJPhSHowepOF2+ElJVyb9jgt5ZyBkPMkPuhS0uODSFs=
+github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc/go.mod h1:fNiSoOiEI5KlkWXn26OwKnNe58ilTIkpBlgOrt7Olu8=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
@@ -171,13 +178,17 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8=
github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8=
+github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8=
github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5 h1:Jw7W4WMfQDxsXvfeFSaS2cHlY7bAF4MGrgnbd0+Uo78=
github.com/satori/go.uuid v1.2.1-0.20180103174451-36e9d2ebbde5/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
+github.com/shabbyrobe/gocovmerge v0.0.0-20180507124511-f6ea450bfb63/go.mod h1:n+VKSARF5y/tS9XFSP7vWDfS+GUC5vs/YT7M5XDTUEM=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/src-d/gcfg v1.3.0 h1:2BEDr8r0I0b8h/fOqwtxCEiq2HJu8n2JGZJQFGXWLjg=
github.com/src-d/gcfg v1.3.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -206,6 +217,7 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190310074541-c10a0554eabf/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw=
@@ -214,6 +226,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowK
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
@@ -227,6 +240,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190310054646-10058d7d4faa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -242,10 +256,12 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190308174544-00c44ba9c14f/go.mod h1:25r3+/G6/xytQM8iWZKq3Hn0kr0rgFKPUNVEL/dr3z4=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c h1:97SnQk1GYRXJgvwZ8fadnxDOWfKvkNQHH3CtZntPSrM=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.13.0 h1:Q3Ui3V3/CVinFWFiW39Iw0kMuVrRzYX0wN6OPFp0lTA=
google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
@@ -267,6 +283,7 @@ gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUy
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/square/go-jose.v2 v2.3.1 h1:SK5KegNXmKmqE342YYN2qPHEnUYeoMiXXl1poUlI+o4=
gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/src-d/go-billy.v4 v4.0.1 h1:iMxwQPj2cuKRyaIZ985zxClkcdTtT5VpXYf4PTJc0Ek=
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 907acdc87..d2ccefe8b 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1072,6 +1072,8 @@ Clusters:
ConnectTimeout: 1m
ReadTimeout: 10m
RaceWindow: 24h
+ # Use aws-s3-go (v2) instead of goamz
+ AlternateDriver: false
# For S3 driver, potentially unsafe tuning parameter,
# intentionally excluded from main documentation.
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 96da19dfc..d6d198429 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -1078,6 +1078,8 @@ Clusters:
ConnectTimeout: 1m
ReadTimeout: 10m
RaceWindow: 24h
+ # Use aws-s3-go (v2) instead of goamz
+ AlternateDriver: false
# For S3 driver, potentially unsafe tuning parameter,
# intentionally excluded from main documentation.
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index a54712f33..515dc7973 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -277,6 +277,7 @@ type S3VolumeDriverParameters struct {
Bucket string
LocationConstraint bool
V2Signature bool
+ AlternateDriver bool
IndexPageSize int
ConnectTimeout Duration
ReadTimeout Duration
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 96f2e7db3..8e32e592b 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -32,7 +32,7 @@ import (
)
func init() {
- driver["S3"] = newS3Volume
+ driver["S3"] = chooseS3VolumeDriver
}
func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
new file mode 100644
index 000000000..d4fe9fd15
--- /dev/null
+++ b/services/keepstore/s3aws_volume.go
@@ -0,0 +1,921 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "encoding/hex"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "regexp"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/awserr"
+ "github.com/aws/aws-sdk-go-v2/aws/defaults"
+ "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
+ "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
+ "github.com/aws/aws-sdk-go-v2/aws/endpoints"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+)
+
+// S3Volume implements Volume using an S3 bucket.
+type S3AWSVolume struct {
+ arvados.S3VolumeDriverParameters
+ AuthToken string // populated automatically when IAMRole is used
+ AuthExpiration time.Time // populated automatically when IAMRole is used
+
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bucket *s3AWSbucket
+ region string
+ startOnce sync.Once
+}
+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
+// wrapped bucket can be replaced atomically with SetBucket in order
+// to update credentials.
+type s3AWSbucket struct {
+ bucket string
+ svc *s3.Client
+ stats s3awsbucketStats
+ mu sync.Mutex
+}
+
+// chooseS3VolumeDriver distinguishes between the old goamz driver and
+// aws-sdk-go based on the AlternateDriver feature flag
+func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+ v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
+ err := json.Unmarshal(volume.DriverParameters, &v)
+ if err != nil {
+ return nil, err
+ }
+ if v.AlternateDriver {
+ logger.Debugln("Using alternate S3 driver (aws-go)")
+ return newS3AWSVolume(cluster, volume, logger, metrics)
+ } else {
+ logger.Debugln("Using standard S3 driver (goamz)")
+ return newS3Volume(cluster, volume, logger, metrics)
+ }
+}
+
+var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+var s3AWSZeroTime time.Time
+
+func (v *S3AWSVolume) isKeepBlock(s string) bool {
+ return s3AWSKeepBlockRegexp.MatchString(s)
+}
+
+func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+ logger.Debugln("in newS3AWSVolume")
+ v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
+ err := json.Unmarshal(volume.DriverParameters, &v)
+ if err != nil {
+ return nil, err
+ }
+ v.logger = logger.WithField("Volume", v.String())
+ v.logger.Debugln("in newS3AWSVolume after volume set")
+ return v, v.check("")
+}
+
+func (v *S3AWSVolume) translateError(err error) error {
+ if aerr, ok := err.(awserr.Error); ok {
+ switch aerr.Code() {
+ case "NotFound":
+ return os.ErrNotExist
+ case "NoSuchKey":
+ return os.ErrNotExist
+ }
+ }
+ return err
+}
+
+// safeCopy calls CopyObjectRequest, and checks the response to make sure the
+// copy succeeded and updated the timestamp on the destination object
+//
+// (If something goes wrong during the copy, the error will be embedded in the
+// 200 OK response)
+func (v *S3AWSVolume) safeCopy(dst, src string) error {
+ input := &s3.CopyObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ ContentType: aws.String("application/octet-stream"),
+ CopySource: aws.String(v.bucket.bucket + "/" + src),
+ Key: aws.String(dst),
+ }
+
+ req := v.bucket.svc.CopyObjectRequest(input)
+ resp, err := req.Send(context.Background())
+
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ return err
+ } else if err != nil {
+ return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
+ }
+
+ if resp.CopyObjectResult.LastModified == nil {
+ return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
+ } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
+ return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
+ }
+ return nil
+}
+
+func (v *S3AWSVolume) check(ec2metadataHostname string) error {
+ if v.Bucket == "" {
+ return errors.New("DriverParameters: Bucket must be provided")
+ }
+ if v.IndexPageSize == 0 {
+ v.IndexPageSize = 1000
+ }
+ if v.RaceWindow < 0 {
+ return errors.New("DriverParameters: RaceWindow must not be negative")
+ }
+
+ defaultResolver := endpoints.NewDefaultResolver()
+
+ cfg := defaults.Config()
+
+ if v.Endpoint == "" && v.Region == "" {
+ return fmt.Errorf("AWS region or endpoint must be specified")
+ } else if v.Endpoint != "" || ec2metadataHostname != "" {
+ myCustomResolver := func(service, region string) (aws.Endpoint, error) {
+ if v.Endpoint != "" && service == "s3" {
+ return aws.Endpoint{
+ URL: v.Endpoint,
+ SigningRegion: v.Region,
+ }, nil
+ } else if service == "ec2metadata" && ec2metadataHostname != "" {
+ return aws.Endpoint{
+ URL: ec2metadataHostname,
+ }, nil
+ }
+
+ return defaultResolver.ResolveEndpoint(service, region)
+ }
+ cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
+ }
+
+ cfg.Region = v.Region
+
+ // Zero timeouts mean "wait forever", which is a bad
+ // default. Default to long timeouts instead.
+ if v.ConnectTimeout == 0 {
+ v.ConnectTimeout = s3DefaultConnectTimeout
+ }
+ if v.ReadTimeout == 0 {
+ v.ReadTimeout = s3DefaultReadTimeout
+ }
+
+ creds := aws.NewChainProvider(
+ []aws.CredentialsProvider{
+ aws.NewStaticCredentialsProvider(v.AccessKey, v.SecretKey, v.AuthToken),
+ ec2rolecreds.New(ec2metadata.New(cfg)),
+ })
+
+ cfg.Credentials = creds
+
+ v.bucket = &s3AWSbucket{
+ bucket: v.Bucket,
+ svc: s3.New(cfg),
+ }
+
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+ v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
+
+ return nil
+}
+
+// String implements fmt.Stringer.
+func (v *S3AWSVolume) String() string {
+ return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
+}
+
+// GetDeviceID returns a globally unique ID for the storage bucket.
+func (v *S3AWSVolume) GetDeviceID() string {
+ return "s3://" + v.Endpoint + "/" + v.Bucket
+}
+
+// Compare the given data with the stored data.
+func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+ errChan := make(chan error, 1)
+ go func() {
+ _, err := v.Head("recent/" + loc)
+ errChan <- err
+ }()
+ var err error
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err = <-errChan:
+ }
+ if err != nil {
+ // Checking for "loc" itself here would interfere with
+ // future GET requests.
+ //
+ // On AWS, if X doesn't exist, a HEAD or GET request
+ // for X causes X's non-existence to be cached. Thus,
+ // if we test for X, then create X and return a
+ // signature to our client, the client might still get
+ // 404 from all keepstores when trying to read it.
+ //
+ // To avoid this, we avoid doing HEAD X or GET X until
+ // we know X has been written.
+ //
+ // Note that X might exist even though recent/X
+ // doesn't: for example, the response to HEAD recent/X
+ // might itself come from a stale cache. In such
+ // cases, we will return a false negative and
+ // PutHandler might needlessly create another replica
+ // on a different volume. That's not ideal, but it's
+ // better than passing the eventually-consistent
+ // problem on to our clients.
+ return v.translateError(err)
+ }
+
+ input := &s3.GetObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ }
+
+ req := v.bucket.svc.GetObjectRequest(input)
+ result, err := req.Send(ctx)
+ if err != nil {
+ return v.translateError(err)
+ }
+ return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
+}
+
+// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
+// and deletes them from the volume.
+func (v *S3AWSVolume) EmptyTrash() {
+ if v.cluster.Collections.BlobDeleteConcurrency < 1 {
+ return
+ }
+
+ var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
+
+ // Define "ready to delete" as "...when EmptyTrash started".
+ startT := time.Now()
+
+ emptyOneKey := func(trash *s3.Object) {
+ v.logger.Warnf("EmptyTrash: looking for trash marker %s with last modified date %s", *trash.Key, *trash.LastModified)
+ loc := strings.TrimPrefix(*trash.Key, "trash/")
+ if !v.isKeepBlock(loc) {
+ return
+ }
+ atomic.AddInt64(&bytesInTrash, *trash.Size)
+ atomic.AddInt64(&blocksInTrash, 1)
+
+ trashT := *(trash.LastModified)
+ v.logger.Infof("HEEEEEEE trashT key: %s, type: %T val: %s, startT is %s", *trash.Key, trashT, trashT, startT)
+ recent, err := v.Head("recent/" + loc)
+ if err != nil && os.IsNotExist(v.translateError(err)) {
+ v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
+ err = v.Untrash(loc)
+ if err != nil {
+ v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
+ }
+ return
+ } else if err != nil {
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+ return
+ }
+ v.logger.Infof("recent.LastModified type: %T val: %s", recent.LastModified, recent.LastModified)
+ if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
+ v.logger.Infof("HERE! recent.lastmodified is smaller than blobsigningttl")
+ if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
+ // recent/loc is too old to protect
+ // loc from being Trashed again during
+ // the raceWindow that starts if we
+ // delete trash/X now.
+ //
+ // Note this means (TrashSweepInterval
+ // < BlobSigningTTL - raceWindow) is
+ // necessary to avoid starvation.
+ v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
+ v.fixRace(loc)
+ v.Touch(loc)
+ return
+ }
+ _, err := v.Head(loc)
+ if os.IsNotExist(err) {
+ v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
+ v.fixRace(loc)
+ return
+ } else if err != nil {
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+ return
+ }
+ }
+ if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
+ v.logger.Infof("HERE! trashT for %s is smaller than blobtrashlifetime: %s < %s", *trash.Key, startT.Sub(trashT), v.cluster.Collections.BlobTrashLifetime.Duration())
+ return
+ }
+ err = v.bucket.Del(*trash.Key)
+ if err != nil {
+ v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
+ return
+ }
+ atomic.AddInt64(&bytesDeleted, *trash.Size)
+ atomic.AddInt64(&blocksDeleted, 1)
+
+ v.logger.Infof("HERE! trash.Key %s should have been deleted", *trash.Key)
+ _, err = v.Head(loc)
+ if err == nil {
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
+ return
+ }
+ if !os.IsNotExist(v.translateError(err)) {
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+ return
+ }
+ err = v.bucket.Del("recent/" + loc)
+ if err != nil {
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+ }
+ v.logger.Infof("HERE! recent/%s should have been deleted", loc)
+ }
+
+ var wg sync.WaitGroup
+ todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
+ for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for key := range todo {
+ emptyOneKey(key)
+ }
+ }()
+ }
+
+ trashL := s3awsLister{
+ Logger: v.logger,
+ Bucket: v.bucket,
+ Prefix: "trash/",
+ PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
+ }
+ for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+ todo <- trash
+ }
+ close(todo)
+ wg.Wait()
+
+ if err := trashL.Error(); err != nil {
+ v.logger.WithError(err).Error("EmptyTrash: lister failed")
+ }
+ v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}
+
+// fixRace(X) is called when "recent/X" exists but "X" doesn't
+// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
+// there was a race between Put and Trash, fixRace recovers from the
+// race by Untrashing the block.
+func (v *S3AWSVolume) fixRace(loc string) bool {
+ trash, err := v.Head("trash/" + loc)
+ if err != nil {
+ if !os.IsNotExist(v.translateError(err)) {
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+ }
+ return false
+ }
+
+ recent, err := v.Head("recent/" + loc)
+ if err != nil {
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+ return false
+ }
+
+ recentTime := *recent.LastModified
+ trashTime := *trash.LastModified
+ ageWhenTrashed := trashTime.Sub(recentTime)
+ if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
+ // No evidence of a race: block hasn't been written
+ // since it became eligible for Trash. No fix needed.
+ return false
+ }
+
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+ err = v.safeCopy(loc, "trash/"+loc)
+ if err != nil {
+ v.logger.WithError(err).Error("fixRace: copy failed")
+ return false
+ }
+ return true
+}
+
+func (v *S3AWSVolume) Head(loc string) (result *s3.HeadObjectOutput, err error) {
+ input := &s3.HeadObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ }
+
+ req := v.bucket.svc.HeadObjectRequest(input)
+ res, err := req.Send(context.TODO())
+
+ v.bucket.stats.TickOps("head")
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
+ v.bucket.stats.TickErr(err)
+
+ if err != nil {
+ return nil, v.translateError(err)
+ }
+ result = res.HeadObjectOutput
+ return
+}
+
+// Get a block: copy the block data into buf, and return the number of
+// bytes copied.
+func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ return getWithPipe(ctx, loc, buf, v)
+}
+
+func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+ buf := make([]byte, 0, 67108864)
+ awsBuf := aws.NewWriteAtBuffer(buf)
+
+ downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
+ u.PartSize = 5 * 1024 * 1024
+ u.Concurrency = 13
+ })
+
+ v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
+
+ _, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ })
+ v.bucket.stats.TickOps("get")
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
+ v.bucket.stats.TickErr(err)
+ if err != nil {
+ return nil, v.translateError(err)
+ }
+ buf = awsBuf.Bytes()
+
+ rdr = NewCountingReader(bytes.NewReader(buf), v.bucket.stats.TickInBytes)
+ return
+}
+
+// ReadBlock implements BlockReader.
+func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
+ rdr, err := v.readWorker(ctx, loc)
+
+ if err == nil {
+ _, err2 := io.Copy(w, rdr)
+ if err2 != nil {
+ return err2
+ }
+ return err
+ }
+
+ err = v.translateError(err)
+ if !os.IsNotExist(err) {
+ return err
+ }
+
+ _, err = v.Head("recent/" + loc)
+ err = v.translateError(err)
+ if err != nil {
+ // If we can't read recent/X, there's no point in
+ // trying fixRace. Give up.
+ return err
+ }
+ if !v.fixRace(loc) {
+ err = os.ErrNotExist
+ return err
+ }
+
+ rdr, err = v.readWorker(ctx, loc)
+ if err != nil {
+ v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
+ err = v.translateError(err)
+ return err
+ }
+
+ _, err = io.Copy(w, rdr)
+
+ return err
+}
+
+func (b *s3AWSbucket) PutReader(path string, r io.Reader, length int64, contType string, contentMD5 string, contentSHA256 string) error {
+ if length == 0 {
+ // aws-sdk-go will only send Content-Length: 0 when reader
+ // is nil due to net.http.Request.ContentLength
+ // behavior. Otherwise, Content-Length header is
+ // omitted which will cause some S3 services
+ // (including AWS and Ceph RadosGW) to fail to create
+ // empty objects.
+ r = bytes.NewReader([]byte{})
+ } else {
+ r = NewCountingReader(r, b.stats.TickOutBytes)
+ }
+ uploader := s3manager.NewUploaderWithClient(b.svc)
+ _, err := uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(b.bucket),
+ Key: aws.String(path),
+ Body: r,
+ })
+
+ b.stats.TickOps("put")
+ b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
+ b.stats.TickErr(err)
+ return err
+}
+
+// Put writes a block.
+func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
+ return putWithPipe(ctx, loc, block, v)
+}
+
+// WriteBlock implements BlockWriter.
+func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
+ if v.volume.ReadOnly {
+ return MethodDisabledError
+ }
+
+ r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
+ uploadInput := s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ Body: r,
+ }
+
+ //var contentMD5, contentSHA256 string
+ var contentMD5 string
+ md5, err := hex.DecodeString(loc)
+ if err != nil {
+ return err
+ }
+ contentMD5 = base64.StdEncoding.EncodeToString(md5)
+ // See if this is the empty block
+ if contentMD5 != "d41d8cd98f00b204e9800998ecf8427e" {
+ uploadInput.ContentMD5 = &contentMD5
+ // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
+ // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
+ // block, so there is no extra memory use to be concerned about. See
+ // makeSha256Reader in aws/signer/v4/v4.go.
+ }
+
+ // Some experimentation indicated that using concurrency 5 yields the best
+ // throughput, better than higher concurrency (10 or 13) by ~5%.
+ // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
+ // is detrimental to througput (minus ~15%).
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+ u.PartSize = 5 * 1024 * 1024
+ u.Concurrency = 5
+ })
+
+ _, err = uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions())
+
+ v.bucket.stats.TickOps("put")
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
+ v.bucket.stats.TickErr(err)
+ if err != nil {
+ return err
+ }
+
+ empty := bytes.NewReader([]byte{})
+ _, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String("recent/" + loc),
+ Body: empty,
+ })
+ v.bucket.stats.TickOps("put")
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
+ v.bucket.stats.TickErr(err)
+
+ return err
+}
+
+type s3awsLister struct {
+ Logger logrus.FieldLogger
+ Bucket *s3AWSbucket //*s3.Bucket
+ Prefix string
+ PageSize int
+ Stats *s3awsbucketStats
+ ContinuationToken string
+ buf []s3.Object
+ err error
+}
+
+// First fetches the first page and returns the first item. It returns
+// nil if the response is the empty set or an error occurs.
+func (lister *s3awsLister) First() *s3.Object {
+ lister.getPage()
+ return lister.pop()
+}
+
+// Next returns the next item, fetching the next page if necessary. It
+// returns nil if the last available item has already been fetched, or
+// an error occurs.
+func (lister *s3awsLister) Next() *s3.Object {
+ if len(lister.buf) == 0 && lister.ContinuationToken != "" {
+ lister.getPage()
+ }
+ return lister.pop()
+}
+
+// Return the most recent error encountered by First or Next.
+func (lister *s3awsLister) Error() error {
+ return lister.err
+}
+
+func (lister *s3awsLister) getPage() {
+ lister.Stats.TickOps("list")
+ lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
+
+ var input *s3.ListObjectsV2Input
+ if lister.ContinuationToken == "" {
+ input = &s3.ListObjectsV2Input{
+ Bucket: aws.String(lister.Bucket.bucket),
+ MaxKeys: aws.Int64(int64(lister.PageSize)),
+ Prefix: aws.String(lister.Prefix),
+ }
+ } else {
+ input = &s3.ListObjectsV2Input{
+ Bucket: aws.String(lister.Bucket.bucket),
+ MaxKeys: aws.Int64(int64(lister.PageSize)),
+ Prefix: aws.String(lister.Prefix),
+ ContinuationToken: &lister.ContinuationToken,
+ }
+ }
+
+ req := lister.Bucket.svc.ListObjectsV2Request(input)
+ resp, err := req.Send(context.Background())
+ if err != nil {
+ if aerr, ok := err.(awserr.Error); ok {
+ lister.err = aerr
+ } else {
+ lister.err = err
+ }
+ return
+ }
+
+ if *resp.IsTruncated {
+ lister.ContinuationToken = *resp.NextContinuationToken
+ } else {
+ lister.ContinuationToken = ""
+ }
+ lister.buf = make([]s3.Object, 0, len(resp.Contents))
+ for _, key := range resp.Contents {
+ if !strings.HasPrefix(*key.Key, lister.Prefix) {
+ lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
+ continue
+ }
+ lister.buf = append(lister.buf, key)
+ }
+}
+
+func (lister *s3awsLister) pop() (k *s3.Object) {
+ if len(lister.buf) > 0 {
+ k = &lister.buf[0]
+ lister.buf = lister.buf[1:]
+ }
+ return
+}
+
+// IndexTo writes a complete list of locators with the given prefix
+// for which Get() can retrieve data.
+func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
+ // Use a merge sort to find matching sets of X and recent/X.
+ dataL := s3awsLister{
+ Logger: v.logger,
+ Bucket: v.bucket,
+ Prefix: prefix,
+ PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
+ }
+ recentL := s3awsLister{
+ Logger: v.logger,
+ Bucket: v.bucket,
+ Prefix: "recent/" + prefix,
+ PageSize: v.IndexPageSize,
+ Stats: &v.bucket.stats,
+ }
+ for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
+ if *data.Key >= "g" {
+ // Conveniently, "recent/*" and "trash/*" are
+ // lexically greater than all hex-encoded data
+ // hashes, so stopping here avoids iterating
+ // over all of them needlessly with dataL.
+ break
+ }
+ if !v.isKeepBlock(*data.Key) {
+ continue
+ }
+
+ // stamp is the list entry we should use to report the
+ // last-modified time for this data block: it will be
+ // the recent/X entry if one exists, otherwise the
+ // entry for the data block itself.
+ stamp := data
+
+ // Advance to the corresponding recent/X marker, if any
+ for recent != nil && recentL.Error() == nil {
+ if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
+ recent = recentL.Next()
+ continue
+ } else if cmp == 0 {
+ stamp = recent
+ recent = recentL.Next()
+ break
+ } else {
+ // recent/X marker is missing: we'll
+ // use the timestamp on the data
+ // object.
+ break
+ }
+ }
+ if err := recentL.Error(); err != nil {
+ return err
+ }
+ fmt.Fprintf(writer, "%s+%d %d\n", *data.Key, *data.Size, stamp.LastModified.UnixNano())
+ }
+ return dataL.Error()
+}
+
+// Mtime returns the stored timestamp for the given locator.
+func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
+ _, err := v.Head(loc)
+ if err != nil {
+ return s3AWSZeroTime, v.translateError(err)
+ }
+ resp, err := v.Head("recent/" + loc)
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ // The data object X exists, but recent/X is missing.
+ err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+ if err != nil {
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+ return s3AWSZeroTime, v.translateError(err)
+ }
+ v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
+ resp, err = v.Head("recent/" + loc)
+ if err != nil {
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+ return s3AWSZeroTime, v.translateError(err)
+ }
+ } else if err != nil {
+ // HEAD recent/X failed for some other reason.
+ return s3AWSZeroTime, err
+ }
+ return *resp.LastModified, err
+}
+
+// Status returns a *VolumeStatus representing the current in-use
+// storage capacity and a fake available capacity that doesn't make
+// the volume seem full or nearly-full.
+func (v *S3AWSVolume) Status() *VolumeStatus {
+ return &VolumeStatus{
+ DeviceNum: 1,
+ BytesFree: BlockSize * 1000,
+ BytesUsed: 1,
+ }
+}
+
+// InternalStats returns bucket I/O and API call counters.
+func (v *S3AWSVolume) InternalStats() interface{} {
+ return &v.bucket.stats
+}
+
+// Touch sets the timestamp for the given locator to the current time.
+func (v *S3AWSVolume) Touch(loc string) error {
+ if v.volume.ReadOnly {
+ return MethodDisabledError
+ }
+ _, err := v.Head(loc)
+ err = v.translateError(err)
+ if os.IsNotExist(err) && v.fixRace(loc) {
+ // The data object got trashed in a race, but fixRace
+ // rescued it.
+ } else if err != nil {
+ return err
+ }
+ err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+ return v.translateError(err)
+}
+
+// checkRaceWindow returns a non-nil error if trash/loc is, or might
+// be, in the race window (i.e., it's not safe to trash loc).
+func (v *S3AWSVolume) checkRaceWindow(loc string) error {
+ resp, err := v.Head("trash/" + loc)
+ err = v.translateError(err)
+ if os.IsNotExist(err) {
+ // OK, trash/X doesn't exist so we're not in the race
+ // window
+ return nil
+ } else if err != nil {
+ // Error looking up trash/X. We don't know whether
+ // we're in the race window
+ return err
+ }
+ t := resp.LastModified
+ safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
+ if safeWindow <= 0 {
+ // We can't count on "touch trash/X" to prolong
+ // trash/X's lifetime. The new timestamp might not
+ // become visible until now+raceWindow, and EmptyTrash
+ // is allowed to delete trash/X before then.
+ return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ }
+ // trash/X exists, but it won't be eligible for deletion until
+ // after now+raceWindow, so it's safe to overwrite it.
+ return nil
+}
+
+func (b *s3AWSbucket) Del(path string) error {
+ input := &s3.DeleteObjectInput{
+ Bucket: aws.String(b.bucket),
+ Key: aws.String(path),
+ }
+ req := b.svc.DeleteObjectRequest(input)
+ _, err := req.Send(context.Background())
+ //err := b.Bucket().Del(path)
+ b.stats.TickOps("delete")
+ b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
+ b.stats.TickErr(err)
+ return err
+}
+
+// Trash a Keep block.
+func (v *S3AWSVolume) Trash(loc string) error {
+ if v.volume.ReadOnly {
+ return MethodDisabledError
+ }
+ if t, err := v.Mtime(loc); err != nil {
+ return err
+ } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
+ return nil
+ }
+ if v.cluster.Collections.BlobTrashLifetime == 0 {
+ if !v.UnsafeDelete {
+ return ErrS3TrashDisabled
+ }
+ return v.translateError(v.bucket.Del(loc))
+ }
+ err := v.checkRaceWindow(loc)
+ if err != nil {
+ return err
+ }
+ err = v.safeCopy("trash/"+loc, loc)
+ if err != nil {
+ return err
+ }
+ return v.translateError(v.bucket.Del(loc))
+}
+
+// Untrash moves block from trash back into store
+func (v *S3AWSVolume) Untrash(loc string) error {
+ err := v.safeCopy(loc, "trash/"+loc)
+ if err != nil {
+ return err
+ }
+ err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", "", "")
+ return v.translateError(err)
+}
+
+type s3awsbucketStats struct {
+ statsTicker
+ Ops uint64
+ GetOps uint64
+ PutOps uint64
+ HeadOps uint64
+ DelOps uint64
+ ListOps uint64
+}
+
+func (s *s3awsbucketStats) TickErr(err error) {
+ if err == nil {
+ return
+ }
+ errType := fmt.Sprintf("%T", err)
+ if aerr, ok := err.(awserr.Error); ok {
+ if reqErr, ok := err.(awserr.RequestFailure); ok {
+ // A service error occurred
+ errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
+ } else {
+ errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
+ }
+ }
+ s.statsTicker.TickErr(err, errType)
+}
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go
new file mode 100644
index 000000000..46fe07d16
--- /dev/null
+++ b/services/keepstore/s3aws_volume_test.go
@@ -0,0 +1,657 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "strings"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
+
+ "github.com/johannesboyne/gofakes3"
+ "github.com/johannesboyne/gofakes3/backend/s3mem"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+const (
+ S3AWSTestBucketName = "testbucket"
+)
+
+type s3AWSFakeClock struct {
+ now *time.Time
+}
+
+func (c *s3AWSFakeClock) Now() time.Time {
+ if c.now == nil {
+ return time.Now()
+ }
+ return *c.now
+}
+
+func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
+ return c.Now().Sub(t)
+}
+
+var _ = check.Suite(&StubbedS3AWSSuite{})
+
+var srv httptest.Server
+
+type StubbedS3AWSSuite struct {
+ s3server *httptest.Server
+ metadata *httptest.Server
+ cluster *arvados.Cluster
+ handler *handler
+ volumes []*TestableS3AWSVolume
+}
+
+func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
+ s.s3server = nil
+ s.metadata = nil
+ s.cluster = testCluster(c)
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
+ "zzzzz-nyw5e-111111111111111": {Driver: "S3"},
+ }
+ s.handler = &handler{}
+}
+
+func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ // Use a negative raceWindow so s3test's 1-second
+ // timestamp precision doesn't confuse fixRace.
+ return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
+ DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ return s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
+ v.IndexPageSize = 3
+ for i := 0; i < 256; i++ {
+ v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
+ }
+ for _, spec := range []struct {
+ prefix string
+ expectMatch int
+ }{
+ {"", 256},
+ {"c", 16},
+ {"bc", 1},
+ {"abc", 0},
+ } {
+ buf := new(bytes.Buffer)
+ err := v.IndexTo(spec.prefix, buf)
+ c.Check(err, check.IsNil)
+
+ idx := bytes.SplitAfter(buf.Bytes(), []byte{10})
+ c.Check(len(idx), check.Equals, spec.expectMatch+1)
+ c.Check(len(idx[len(idx)-1]), check.Equals, 0)
+ }
+}
+
+func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
+ var header http.Header
+ stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ header = r.Header
+ }))
+ defer stub.Close()
+
+ // The aws-sdk-go-v2 driver only supports S3 V4 signatures. S3 v2 signatures are being phased out
+ // as of June 24, 2020. Cf. https://forums.aws.amazon.com/ann.jspa?annID=5816
+ vol := S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ AccessKey: "xxx",
+ SecretKey: "xxx",
+ Endpoint: stub.URL,
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err := vol.check("")
+ // Our test S3 server uses the older 'Path Style'
+ vol.bucket.svc.ForcePathStyle = true
+
+ c.Check(err, check.IsNil)
+ err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+}
+
+func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
+ s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+ exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+ // Literal example from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+ // but with updated timestamps
+ io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+ }))
+ defer s.metadata.Close()
+
+ v := &S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ IAMRole: s.metadata.URL + "/latest/api/token",
+ Endpoint: "http://localhost:12345",
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err := v.check(s.metadata.URL + "/latest")
+ creds, err := v.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+ fmt.Printf("%+v, %s\n", creds, err)
+ c.Check(creds.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+ c.Check(creds.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+
+ s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusNotFound)
+ }))
+ deadv := &S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ IAMRole: s.metadata.URL + "/fake-metadata/test-role",
+ Endpoint: "http://localhost:12345",
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ },
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err = deadv.check(s.metadata.URL + "/latest")
+ _, err = deadv.bucket.svc.Client.Config.Credentials.Retrieve(context.Background())
+ c.Check(err, check.ErrorMatches, `(?s).*EC2RoleRequestError: no EC2 instance role found.*`)
+ c.Check(err, check.ErrorMatches, `(?s).*404.*`)
+}
+
+func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ stats := func() string {
+ buf, err := json.Marshal(v.InternalStats())
+ c.Check(err, check.IsNil)
+ return string(buf)
+ }
+
+ c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ _, err := v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.NotNil)
+ c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+ c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
+ c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+ err = v.Put(context.Background(), loc, []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+ c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
+
+ _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
+type s3AWSBlockingHandler struct {
+ requested chan *http.Request
+ unblock chan struct{}
+}
+
+func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "PUT" && !strings.Contains(strings.Trim(r.URL.Path, "/"), "/") {
+ // Accept PutBucket ("PUT /bucketname/"), called by
+ // newTestableVolume
+ return
+ }
+ if h.requested != nil {
+ h.requested <- r
+ }
+ if h.unblock != nil {
+ <-h.unblock
+ }
+ http.Error(w, "nothing here", http.StatusNotFound)
+}
+
+func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := make([]byte, 3)
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ _, err := v.Get(ctx, loc, buf)
+ return err
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestCompareContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := []byte("bar")
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ return v.Compare(ctx, loc, buf)
+ })
+}
+
+func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := []byte("foo")
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3AWSVolume) error {
+ return v.Put(ctx, loc, buf)
+ })
+}
+
+func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3AWSVolume) error) {
+ handler := &s3AWSBlockingHandler{}
+ s.s3server = httptest.NewServer(handler)
+ defer s.s3server.Close()
+
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ handler.requested = make(chan *http.Request)
+ handler.unblock = make(chan struct{})
+ defer close(handler.unblock)
+
+ doneFunc := make(chan struct{})
+ go func() {
+ err := testFunc(ctx, v)
+ c.Check(err, check.Equals, context.Canceled)
+ close(doneFunc)
+ }()
+
+ timeout := time.After(10 * time.Second)
+
+ // Wait for the stub server to receive a request, meaning
+ // Get() is waiting for an s3 operation.
+ select {
+ case <-timeout:
+ c.Fatal("timed out waiting for test func to call our handler")
+ case <-doneFunc:
+ c.Fatal("test func finished without even calling our handler!")
+ case <-handler.requested:
+ }
+
+ cancel()
+
+ select {
+ case <-timeout:
+ c.Fatal("timed out")
+ case <-doneFunc:
+ }
+}
+
+func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
+ s.cluster.Collections.BlobTrashLifetime.Set("1h")
+ s.cluster.Collections.BlobSigningTTL.Set("1h")
+
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ var none time.Time
+
+ putS3Obj := func(t time.Time, key string, data []byte) {
+ if t == none {
+ return
+ }
+ v.serverClock.now = &t
+ fmt.Printf("USING TIMESTAMP %s to write key %s", t, key)
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+ resp, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(key),
+ Body: bytes.NewReader(data),
+ })
+ if err != nil {
+ panic(err)
+ }
+ fmt.Println(resp)
+ v.serverClock.now = nil
+ resp2, err := v.Head(key)
+ fmt.Printf("KEY: %s\n%s", key, resp2)
+ }
+
+ t0 := time.Now()
+ nextKey := 0
+ for _, scenario := range []struct {
+ label string
+ dataT time.Time
+ recentT time.Time
+ trashT time.Time
+ canGet bool
+ canTrash bool
+ canGetAfterTrash bool
+ canUntrash bool
+ haveTrashAfterEmpty bool
+ freshAfterEmpty bool
+ }{
+ {
+ "No related objects",
+ none, none, none,
+ false, false, false, false, false, false,
+ },
+ {
+ // Stored by older version, or there was a
+ // race between EmptyTrash and Put: Trash is a
+ // no-op even though the data object is very
+ // old
+ "No recent/X",
+ t0.Add(-48 * time.Hour), none, none,
+ true, true, true, false, false, false,
+ },
+ {
+ "Not trash, but old enough to be eligible for trash",
+ t0.Add(-24 * time.Hour), t0.Add(-2 * time.Hour), none,
+ true, true, false, false, false, false,
+ },
+ {
+ "Not trash, and not old enough to be eligible for trash",
+ t0.Add(-24 * time.Hour), t0.Add(-30 * time.Minute), none,
+ true, true, true, false, false, false,
+ },
+ {
+ "Trashed + untrashed copies exist, due to recent race between Trash and Put",
+ t0.Add(-24 * time.Hour), t0.Add(-3 * time.Minute), t0.Add(-2 * time.Minute),
+ true, true, true, true, true, false,
+ },
+ {
+ "Trashed + untrashed copies exist, trash nearly eligible for deletion: prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ true, false, true, true, true, false,
+ },
+ {
+ "Trashed + untrashed copies exist, trash is eligible for deletion: prone to Trash race",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-61 * time.Minute),
+ true, false, true, true, false, false,
+ },
+ {
+ "Trashed + untrashed copies exist, due to old race between Put and unfinished Trash: emptying trash is unsafe",
+ t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour), t0.Add(-12 * time.Hour),
+ true, false, true, true, true, true,
+ },
+ {
+ "Trashed + untrashed copies exist, used to be unsafe to empty, but since made safe by fixRace+Touch",
+ t0.Add(-time.Second), t0.Add(-time.Second), t0.Add(-12 * time.Hour),
+ true, true, true, true, false, false,
+ },
+ {
+ "Trashed + untrashed copies exist because Trash operation was interrupted (no race)",
+ t0.Add(-24 * time.Hour), t0.Add(-24 * time.Hour), t0.Add(-12 * time.Hour),
+ true, false, true, true, false, false,
+ },
+ {
+ "Trash, not yet eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-time.Minute),
+ false, false, false, true, true, false,
+ },
+ {
+ "Trash, not yet eligible for deletion, prone to races",
+ none, t0.Add(-12 * time.Hour), t0.Add(-59 * time.Minute),
+ false, false, false, true, true, false,
+ },
+ {
+ "Trash, eligible for deletion",
+ none, t0.Add(-12 * time.Hour), t0.Add(-2 * time.Hour),
+ false, false, false, true, false, false,
+ },
+ {
+ "Erroneously trashed during a race, detected before BlobTrashLifetime",
+ none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
+ true, false, true, true, true, false,
+ },
+ {
+ "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
+ none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
+ true, false, true, true, true, false,
+ },
+ {
+ "Trashed copy exists with no recent/* marker (cause unknown); repair by untrashing",
+ none, none, t0.Add(-time.Minute),
+ false, false, false, true, true, true,
+ },
+ } {
+ c.Log("Scenario: ", scenario.label)
+
+ // We have a few tests to run for each scenario, and
+ // the tests are expected to change state. By calling
+ // this setup func between tests, we (re)create the
+ // scenario as specified, using a new unique block
+ // locator to prevent interference from previous
+ // tests.
+
+ setupScenario := func() (string, []byte) {
+ nextKey++
+ blk := []byte(fmt.Sprintf("%d", nextKey))
+ loc := fmt.Sprintf("%x", md5.Sum(blk))
+ c.Log("\t", loc)
+ putS3Obj(scenario.dataT, loc, blk)
+ putS3Obj(scenario.recentT, "recent/"+loc, nil)
+ putS3Obj(scenario.trashT, "trash/"+loc, blk)
+ v.serverClock.now = &t0
+ return loc, blk
+ }
+
+ // Check canGet
+ loc, blk := setupScenario()
+ buf := make([]byte, len(blk))
+ _, err := v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGet)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Trash, then check canTrash and canGetAfterTrash
+ loc, _ = setupScenario()
+ err = v.Trash(loc)
+ c.Check(err == nil, check.Equals, scenario.canTrash)
+ _, err = v.Get(context.Background(), loc, buf)
+ c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+ if err != nil {
+ c.Check(os.IsNotExist(err), check.Equals, true)
+ }
+
+ // Call Untrash, then check canUntrash
+ loc, _ = setupScenario()
+ err = v.Untrash(loc)
+ c.Check(err == nil, check.Equals, scenario.canUntrash)
+ if scenario.dataT != none || scenario.trashT != none {
+ // In all scenarios where the data exists, we
+ // should be able to Get after Untrash --
+ // regardless of timestamps, errors, race
+ // conditions, etc.
+ _, err = v.Get(context.Background(), loc, buf)
+ c.Check(err, check.IsNil)
+ }
+
+ // Call EmptyTrash, then check haveTrashAfterEmpty and
+ // freshAfterEmpty
+ loc, _ = setupScenario()
+ v.EmptyTrash()
+ _, err = v.Head("trash/" + loc)
+ c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+ if scenario.freshAfterEmpty {
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ // new mtime must be current (with an
+ // allowance for 1s timestamp precision)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
+
+ // Check for current Mtime after Put (applies to all
+ // scenarios)
+ loc, blk = setupScenario()
+ err = v.Put(context.Background(), loc, blk)
+ c.Check(err, check.IsNil)
+ t, err := v.Mtime(loc)
+ c.Check(err, check.IsNil)
+ c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+ }
+}
+
+type TestableS3AWSVolume struct {
+ *S3AWSVolume
+ server *httptest.Server
+ c *check.C
+ serverClock *s3AWSFakeClock
+}
+
+type LogrusLog struct {
+ log *logrus.FieldLogger
+}
+
+func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
+ switch level {
+ case gofakes3.LogErr:
+ (*l.log).Errorln(v...)
+ case gofakes3.LogWarn:
+ (*l.log).Warnln(v...)
+ case gofakes3.LogInfo:
+ (*l.log).Infoln(v...)
+ default:
+ panic("unknown level")
+ }
+}
+
+func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, raceWindow time.Duration) *TestableS3AWSVolume {
+
+ clock := &s3AWSFakeClock{}
+ // fake s3
+ backend := s3mem.New(s3mem.WithTimeSource(clock))
+
+ logger := new(LogrusLog)
+ ctxLogger := ctxlog.FromContext(context.Background())
+ logger.log = &ctxLogger
+ faker := gofakes3.New(backend, gofakes3.WithTimeSource(clock), gofakes3.WithLogger(logger), gofakes3.WithTimeSkewLimit(0))
+ srv := httptest.NewServer(faker.Server())
+
+ endpoint := srv.URL
+ if s.s3server != nil {
+ endpoint = s.s3server.URL
+ }
+
+ iamRole, accessKey, secretKey := "", "xxx", "xxx"
+ if s.metadata != nil {
+ iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+ }
+
+ v := &TestableS3AWSVolume{
+ S3AWSVolume: &S3AWSVolume{
+ S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+ IAMRole: iamRole,
+ AccessKey: accessKey,
+ SecretKey: secretKey,
+ Bucket: S3AWSTestBucketName,
+ Endpoint: endpoint,
+ Region: "test-region-1",
+ LocationConstraint: true,
+ UnsafeDelete: true,
+ IndexPageSize: 1000,
+ },
+ cluster: cluster,
+ volume: volume,
+ logger: ctxlog.TestLogger(c),
+ metrics: metrics,
+ },
+ c: c,
+ server: srv,
+ serverClock: clock,
+ }
+ c.Assert(v.S3AWSVolume.check(""), check.IsNil)
+ // Our test S3 server uses the older 'Path Style'
+ v.S3AWSVolume.bucket.svc.ForcePathStyle = true
+ // Create the testbucket
+ input := &s3.CreateBucketInput{
+ Bucket: aws.String(S3AWSTestBucketName),
+ }
+ req := v.S3AWSVolume.bucket.svc.CreateBucketRequest(input)
+ _, err := req.Send(context.Background())
+ c.Assert(err, check.IsNil)
+ // We couldn't set RaceWindow until now because check()
+ // rejects negative values.
+ v.S3AWSVolume.RaceWindow = arvados.Duration(raceWindow)
+ return v
+}
+
+// PutRaw skips the ContentMD5 test
+func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
+
+ r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
+
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
+ u.PartSize = 5 * 1024 * 1024
+ u.Concurrency = 13
+ })
+
+ _, err := uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String(loc),
+ Body: r,
+ })
+ if err != nil {
+ v.logger.Printf("PutRaw: %s: %+v", loc, err)
+ }
+
+ empty := bytes.NewReader([]byte{})
+ _, err = uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String("recent/" + loc),
+ Body: empty,
+ })
+ if err != nil {
+ v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+ }
+}
+
+// TouchWithDate turns back the clock while doing a Touch(). We assume
+// there are no other operations happening on the same s3test server
+// while we do this.
+func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+ v.serverClock.now = &lastPut
+
+ uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
+ empty := bytes.NewReader([]byte{})
+ _, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
+ Bucket: aws.String(v.bucket.bucket),
+ Key: aws.String("recent/" + locator),
+ Body: empty,
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ v.serverClock.now = nil
+}
+
+func (v *TestableS3AWSVolume) Teardown() {
+ v.server.Close()
+}
+
+func (v *TestableS3AWSVolume) ReadWriteOperationLabelValues() (r, w string) {
+ return "get", "put"
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list