[ARVADOS] created: 2.1.0-1381-gadccfe35c

Git user git at public.arvados.org
Tue Sep 21 03:05:45 UTC 2021


        at  adccfe35ccc68a865a2fd2356ca2b81e0366a4b4 (commit)


commit adccfe35ccc68a865a2fd2356ca2b81e0366a4b4
Author: Tom Clegg <tom at curii.com>
Date:   Mon Sep 20 17:47:07 2021 -0400

    17749: Document PrefixLength config.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/doc/install/configure-s3-object-storage.html.textile.liquid b/doc/install/configure-s3-object-storage.html.textile.liquid
index 6a3c396d6..e6b1e095e 100644
--- a/doc/install/configure-s3-object-storage.html.textile.liquid
+++ b/doc/install/configure-s3-object-storage.html.textile.liquid
@@ -67,6 +67,23 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
           # Use the AWS S3 v2 Go driver instead of the goamz driver.
           UseAWSS3v2Driver: false
 
+          # By default keepstore stores data using the MD5 checksum
+          # (32 hexadecimal characters) as the object name, e.g.,
+          # "0123456abc...". Setting PrefixLength to 3 changes this
+          # naming scheme to "012/0123456abc...". This can improve
+          # performance, depending on the S3 service being used. For
+          # example, PrefixLength 3 is recommended to avoid AWS
+          # limitations on the number of read/write operations per
+          # second per prefix (see
+          # https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/).
+          #
+          # Note that changing PrefixLength on an existing volume is
+          # not currently supported. Once you have started using a
+          # bucket as an Arvados volume, you should not change its
+          # configured PrefixLength, or configure another volume using
+          # the same bucket and a different PrefixLength.
+          PrefixLength: 0
+
           # Requested page size for "list bucket contents" requests.
           IndexPageSize: 1000
 

commit 6be7fd5574f54ad388992dc2562081416c342d19
Author: Tom Clegg <tom at curii.com>
Date:   Mon Sep 20 17:46:46 2021 -0400

    17749: Support PrefixLength in old S3 driver.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index fdc2ed56a..4c43b3f46 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -281,17 +281,17 @@ func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
 	return ttl, nil
 }
 
-func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+func (v *S3Volume) getReaderWithContext(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
 	ready := make(chan bool)
 	go func() {
-		rdr, err = v.getReader(loc)
+		rdr, err = v.getReader(key)
 		close(ready)
 	}()
 	select {
 	case <-ready:
 		return
 	case <-ctx.Done():
-		v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
+		v.logger.Debugf("s3: abandoning getReader(%s): %s", key, ctx.Err())
 		go func() {
 			<-ready
 			if err == nil {
@@ -307,28 +307,28 @@ func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io
 // In situations where (Bucket)GetReader would fail because the block
 // disappeared in a Trash race, getReader calls fixRace to recover the
 // data, and tries again.
-func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
-	rdr, err = v.bucket.GetReader(loc)
+func (v *S3Volume) getReader(key string) (rdr io.ReadCloser, err error) {
+	rdr, err = v.bucket.GetReader(key)
 	err = v.translateError(err)
 	if err == nil || !os.IsNotExist(err) {
 		return
 	}
 
-	_, err = v.bucket.Head("recent/"+loc, nil)
+	_, err = v.bucket.Head("recent/"+key, nil)
 	err = v.translateError(err)
 	if err != nil {
 		// If we can't read recent/X, there's no point in
 		// trying fixRace. Give up.
 		return
 	}
-	if !v.fixRace(loc) {
+	if !v.fixRace(key) {
 		err = os.ErrNotExist
 		return
 	}
 
-	rdr, err = v.bucket.GetReader(loc)
+	rdr, err = v.bucket.GetReader(key)
 	if err != nil {
-		v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
+		v.logger.Warnf("reading %s after successful fixRace: %s", key, err)
 		err = v.translateError(err)
 	}
 	return
@@ -337,7 +337,8 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 // Get a block: copy the block data into buf, and return the number of
 // bytes copied.
 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-	rdr, err := v.getReaderWithContext(ctx, loc)
+	key := v.key(loc)
+	rdr, err := v.getReaderWithContext(ctx, key)
 	if err != nil {
 		return 0, err
 	}
@@ -373,9 +374,10 @@ func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error)
 
 // Compare the given data with the stored data.
 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+	key := v.key(loc)
 	errChan := make(chan error, 1)
 	go func() {
-		_, err := v.bucket.Head("recent/"+loc, nil)
+		_, err := v.bucket.Head("recent/"+key, nil)
 		errChan <- err
 	}()
 	var err error
@@ -407,7 +409,7 @@ func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error
 		// problem on to our clients.
 		return v.translateError(err)
 	}
-	rdr, err := v.getReaderWithContext(ctx, loc)
+	rdr, err := v.getReaderWithContext(ctx, key)
 	if err != nil {
 		return err
 	}
@@ -438,6 +440,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 		opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
 	}
 
+	key := v.key(loc)
+
 	// Send the block data through a pipe, so that (if we need to)
 	// we can close the pipe early and abandon our PutReader()
 	// goroutine, without worrying about PutReader() accessing our
@@ -457,11 +461,11 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 			}
 		}()
 		defer close(ready)
-		err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+		err = v.bucket.PutReader(key, bufr, int64(size), "application/octet-stream", s3ACL, opts)
 		if err != nil {
 			return
 		}
-		err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+		err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
 	}()
 	select {
 	case <-ctx.Done():
@@ -487,37 +491,39 @@ func (v *S3Volume) Touch(loc string) error {
 	if v.volume.ReadOnly {
 		return MethodDisabledError
 	}
-	_, err := v.bucket.Head(loc, nil)
+	key := v.key(loc)
+	_, err := v.bucket.Head(key, nil)
 	err = v.translateError(err)
-	if os.IsNotExist(err) && v.fixRace(loc) {
+	if os.IsNotExist(err) && v.fixRace(key) {
 		// The data object got trashed in a race, but fixRace
 		// rescued it.
 	} else if err != nil {
 		return err
 	}
-	err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+	err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
 	return v.translateError(err)
 }
 
 // Mtime returns the stored timestamp for the given locator.
 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
-	_, err := v.bucket.Head(loc, nil)
+	key := v.key(loc)
+	_, err := v.bucket.Head(key, nil)
 	if err != nil {
 		return zeroTime, v.translateError(err)
 	}
-	resp, err := v.bucket.Head("recent/"+loc, nil)
+	resp, err := v.bucket.Head("recent/"+key, nil)
 	err = v.translateError(err)
 	if os.IsNotExist(err) {
 		// The data object X exists, but recent/X is missing.
-		err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+		err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
 		if err != nil {
-			v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+			v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
 			return zeroTime, v.translateError(err)
 		}
-		v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
-		resp, err = v.bucket.Head("recent/"+loc, nil)
+		v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+key)
+		resp, err = v.bucket.Head("recent/"+key, nil)
 		if err != nil {
-			v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+			v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
 			return zeroTime, v.translateError(err)
 		}
 	} else if err != nil {
@@ -534,14 +540,14 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 	dataL := s3Lister{
 		Logger:   v.logger,
 		Bucket:   v.bucket.Bucket(),
-		Prefix:   prefix,
+		Prefix:   v.key(prefix),
 		PageSize: v.IndexPageSize,
 		Stats:    &v.bucket.stats,
 	}
 	recentL := s3Lister{
 		Logger:   v.logger,
 		Bucket:   v.bucket.Bucket(),
-		Prefix:   "recent/" + prefix,
+		Prefix:   "recent/" + v.key(prefix),
 		PageSize: v.IndexPageSize,
 		Stats:    &v.bucket.stats,
 	}
@@ -553,7 +559,8 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 			// over all of them needlessly with dataL.
 			break
 		}
-		if !v.isKeepBlock(data.Key) {
+		loc, isBlk := v.isKeepBlock(data.Key)
+		if !isBlk {
 			continue
 		}
 
@@ -589,7 +596,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 		// We truncate sub-second precision here. Otherwise
 		// timestamps will never match the RFC1123-formatted
 		// Last-Modified values parsed by Mtime().
-		fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.Unix()*1000000000)
+		fmt.Fprintf(writer, "%s+%d %d\n", loc, data.Size, t.Unix()*1000000000)
 	}
 	return dataL.Error()
 }
@@ -604,27 +611,28 @@ func (v *S3Volume) Trash(loc string) error {
 	} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
 		return nil
 	}
+	key := v.key(loc)
 	if v.cluster.Collections.BlobTrashLifetime == 0 {
 		if !v.UnsafeDelete {
 			return ErrS3TrashDisabled
 		}
-		return v.translateError(v.bucket.Del(loc))
+		return v.translateError(v.bucket.Del(key))
 	}
-	err := v.checkRaceWindow(loc)
+	err := v.checkRaceWindow(key)
 	if err != nil {
 		return err
 	}
-	err = v.safeCopy("trash/"+loc, loc)
+	err = v.safeCopy("trash/"+key, key)
 	if err != nil {
 		return err
 	}
-	return v.translateError(v.bucket.Del(loc))
+	return v.translateError(v.bucket.Del(key))
 }
 
-// checkRaceWindow returns a non-nil error if trash/loc is, or might
-// be, in the race window (i.e., it's not safe to trash loc).
-func (v *S3Volume) checkRaceWindow(loc string) error {
-	resp, err := v.bucket.Head("trash/"+loc, nil)
+// checkRaceWindow returns a non-nil error if trash/key is, or might
+// be, in the race window (i.e., it's not safe to trash key).
+func (v *S3Volume) checkRaceWindow(key string) error {
+	resp, err := v.bucket.Head("trash/"+key, nil)
 	err = v.translateError(err)
 	if os.IsNotExist(err) {
 		// OK, trash/X doesn't exist so we're not in the race
@@ -646,7 +654,7 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
 		// trash/X's lifetime. The new timestamp might not
 		// become visible until now+raceWindow, and EmptyTrash
 		// is allowed to delete trash/X before then.
-		return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+		return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
 	}
 	// trash/X exists, but it won't be eligible for deletion until
 	// after now+raceWindow, so it's safe to overwrite it.
@@ -694,11 +702,12 @@ func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
 
 // Untrash moves block from trash back into store
 func (v *S3Volume) Untrash(loc string) error {
-	err := v.safeCopy(loc, "trash/"+loc)
+	key := v.key(loc)
+	err := v.safeCopy(key, "trash/"+key)
 	if err != nil {
 		return err
 	}
-	err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+	err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
 	return v.translateError(err)
 }
 
@@ -725,19 +734,33 @@ func (v *S3Volume) String() string {
 
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-func (v *S3Volume) isKeepBlock(s string) bool {
-	return s3KeepBlockRegexp.MatchString(s)
+func (v *S3Volume) isKeepBlock(s string) (string, bool) {
+	if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
+		s = s[v.PrefixLength+1:]
+	}
+	return s, s3KeepBlockRegexp.MatchString(s)
+}
+
+// Return the key used for a given loc. If PrefixLength==0 then
+// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
+// "abc/abcdef0123", etc.
+func (v *S3Volume) key(loc string) string {
+	if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
+		return loc[:v.PrefixLength] + "/" + loc
+	} else {
+		return loc
+	}
 }
 
 // fixRace(X) is called when "recent/X" exists but "X" doesn't
-// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
-// there was a race between Put and Trash, fixRace recovers from the
-// race by Untrashing the block.
-func (v *S3Volume) fixRace(loc string) bool {
-	trash, err := v.bucket.Head("trash/"+loc, nil)
+// exist. If the timestamps on "recent/X" and "trash/X" indicate there
+// was a race between Put and Trash, fixRace recovers from the race by
+// Untrashing the block.
+func (v *S3Volume) fixRace(key string) bool {
+	trash, err := v.bucket.Head("trash/"+key, nil)
 	if err != nil {
 		if !os.IsNotExist(v.translateError(err)) {
-			v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+			v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
 		}
 		return false
 	}
@@ -747,9 +770,9 @@ func (v *S3Volume) fixRace(loc string) bool {
 		return false
 	}
 
-	recent, err := v.bucket.Head("recent/"+loc, nil)
+	recent, err := v.bucket.Head("recent/"+key, nil)
 	if err != nil {
-		v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+		v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
 		return false
 	}
 	recentTime, err := v.lastModified(recent)
@@ -765,9 +788,9 @@ func (v *S3Volume) fixRace(loc string) bool {
 		return false
 	}
 
-	v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
-	v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
-	err = v.safeCopy(loc, "trash/"+loc)
+	v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+	v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
+	err = v.safeCopy(key, "trash/"+key)
 	if err != nil {
 		v.logger.WithError(err).Error("fixRace: copy failed")
 		return false
@@ -803,8 +826,9 @@ func (v *S3Volume) EmptyTrash() {
 	startT := time.Now()
 
 	emptyOneKey := func(trash *s3.Key) {
-		loc := trash.Key[6:]
-		if !v.isKeepBlock(loc) {
+		key := trash.Key[6:]
+		loc, isBlk := v.isKeepBlock(key)
+		if !isBlk {
 			return
 		}
 		atomic.AddInt64(&bytesInTrash, trash.Size)
@@ -815,7 +839,7 @@ func (v *S3Volume) EmptyTrash() {
 			v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
 			return
 		}
-		recent, err := v.bucket.Head("recent/"+loc, nil)
+		recent, err := v.bucket.Head("recent/"+key, nil)
 		if err != nil && os.IsNotExist(v.translateError(err)) {
 			v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
 			err = v.Untrash(loc)
@@ -824,12 +848,12 @@ func (v *S3Volume) EmptyTrash() {
 			}
 			return
 		} else if err != nil {
-			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
 			return
 		}
 		recentT, err := v.lastModified(recent)
 		if err != nil {
-			v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
+			v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+key, recent.Header.Get("Last-Modified"))
 			return
 		}
 		if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -843,14 +867,14 @@ func (v *S3Volume) EmptyTrash() {
 				// < BlobSigningTTL - raceWindow) is
 				// necessary to avoid starvation.
 				v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
-				v.fixRace(loc)
+				v.fixRace(key)
 				v.Touch(loc)
 				return
 			}
-			_, err := v.bucket.Head(loc, nil)
+			_, err := v.bucket.Head(key, nil)
 			if os.IsNotExist(err) {
 				v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
-				v.fixRace(loc)
+				v.fixRace(key)
 				return
 			} else if err != nil {
 				v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
@@ -868,18 +892,18 @@ func (v *S3Volume) EmptyTrash() {
 		atomic.AddInt64(&bytesDeleted, trash.Size)
 		atomic.AddInt64(&blocksDeleted, 1)
 
-		_, err = v.bucket.Head(loc, nil)
+		_, err = v.bucket.Head(key, nil)
 		if err == nil {
-			v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
+			v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", key, key)
 			return
 		}
 		if !os.IsNotExist(v.translateError(err)) {
-			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
 			return
 		}
-		err = v.bucket.Del("recent/" + loc)
+		err = v.bucket.Del("recent/" + key)
 		if err != nil {
-			v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+			v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
 		}
 	}
 
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index cb08e44ac..5cb8a668a 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -76,6 +76,14 @@ func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
 	})
 }
 
+func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+	DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+		v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+		v.PrefixLength = 3
+		return v
+	})
+}
+
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
 	v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
 	v.IndexPageSize = 3
@@ -416,81 +424,88 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			false, false, false, true, true, true,
 		},
 	} {
-		c.Log("Scenario: ", scenario.label)
-
-		// We have a few tests to run for each scenario, and
-		// the tests are expected to change state. By calling
-		// this setup func between tests, we (re)create the
-		// scenario as specified, using a new unique block
-		// locator to prevent interference from previous
-		// tests.
-
-		setupScenario := func() (string, []byte) {
-			nextKey++
-			blk := []byte(fmt.Sprintf("%d", nextKey))
-			loc := fmt.Sprintf("%x", md5.Sum(blk))
-			c.Log("\t", loc)
-			putS3Obj(scenario.dataT, loc, blk)
-			putS3Obj(scenario.recentT, "recent/"+loc, nil)
-			putS3Obj(scenario.trashT, "trash/"+loc, blk)
-			v.serverClock.now = &t0
-			return loc, blk
-		}
-
-		// Check canGet
-		loc, blk := setupScenario()
-		buf := make([]byte, len(blk))
-		_, err := v.Get(context.Background(), loc, buf)
-		c.Check(err == nil, check.Equals, scenario.canGet)
-		if err != nil {
-			c.Check(os.IsNotExist(err), check.Equals, true)
-		}
-
-		// Call Trash, then check canTrash and canGetAfterTrash
-		loc, _ = setupScenario()
-		err = v.Trash(loc)
-		c.Check(err == nil, check.Equals, scenario.canTrash)
-		_, err = v.Get(context.Background(), loc, buf)
-		c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-		if err != nil {
-			c.Check(os.IsNotExist(err), check.Equals, true)
-		}
-
-		// Call Untrash, then check canUntrash
-		loc, _ = setupScenario()
-		err = v.Untrash(loc)
-		c.Check(err == nil, check.Equals, scenario.canUntrash)
-		if scenario.dataT != none || scenario.trashT != none {
-			// In all scenarios where the data exists, we
-			// should be able to Get after Untrash --
-			// regardless of timestamps, errors, race
-			// conditions, etc.
+		for _, prefixLength := range []int{0, 3} {
+			v.PrefixLength = prefixLength
+			c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+			// We have a few tests to run for each scenario, and
+			// the tests are expected to change state. By calling
+			// this setup func between tests, we (re)create the
+			// scenario as specified, using a new unique block
+			// locator to prevent interference from previous
+			// tests.
+
+			setupScenario := func() (string, []byte) {
+				nextKey++
+				blk := []byte(fmt.Sprintf("%d", nextKey))
+				loc := fmt.Sprintf("%x", md5.Sum(blk))
+				key := loc
+				if prefixLength > 0 {
+					key = loc[:prefixLength] + "/" + loc
+				}
+				c.Log("\t", loc)
+				putS3Obj(scenario.dataT, key, blk)
+				putS3Obj(scenario.recentT, "recent/"+key, nil)
+				putS3Obj(scenario.trashT, "trash/"+key, blk)
+				v.serverClock.now = &t0
+				return loc, blk
+			}
+
+			// Check canGet
+			loc, blk := setupScenario()
+			buf := make([]byte, len(blk))
+			_, err := v.Get(context.Background(), loc, buf)
+			c.Check(err == nil, check.Equals, scenario.canGet)
+			if err != nil {
+				c.Check(os.IsNotExist(err), check.Equals, true)
+			}
+
+			// Call Trash, then check canTrash and canGetAfterTrash
+			loc, _ = setupScenario()
+			err = v.Trash(loc)
+			c.Check(err == nil, check.Equals, scenario.canTrash)
 			_, err = v.Get(context.Background(), loc, buf)
+			c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+			if err != nil {
+				c.Check(os.IsNotExist(err), check.Equals, true)
+			}
+
+			// Call Untrash, then check canUntrash
+			loc, _ = setupScenario()
+			err = v.Untrash(loc)
+			c.Check(err == nil, check.Equals, scenario.canUntrash)
+			if scenario.dataT != none || scenario.trashT != none {
+				// In all scenarios where the data exists, we
+				// should be able to Get after Untrash --
+				// regardless of timestamps, errors, race
+				// conditions, etc.
+				_, err = v.Get(context.Background(), loc, buf)
+				c.Check(err, check.IsNil)
+			}
+
+			// Call EmptyTrash, then check haveTrashAfterEmpty and
+			// freshAfterEmpty
+			loc, _ = setupScenario()
+			v.EmptyTrash()
+			_, err = v.bucket.Head("trash/"+v.key(loc), nil)
+			c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+			if scenario.freshAfterEmpty {
+				t, err := v.Mtime(loc)
+				c.Check(err, check.IsNil)
+				// new mtime must be current (with an
+				// allowance for 1s timestamp precision)
+				c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+			}
+
+			// Check for current Mtime after Put (applies to all
+			// scenarios)
+			loc, blk = setupScenario()
+			err = v.Put(context.Background(), loc, blk)
 			c.Check(err, check.IsNil)
-		}
-
-		// Call EmptyTrash, then check haveTrashAfterEmpty and
-		// freshAfterEmpty
-		loc, _ = setupScenario()
-		v.EmptyTrash()
-		_, err = v.bucket.Head("trash/"+loc, nil)
-		c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-		if scenario.freshAfterEmpty {
 			t, err := v.Mtime(loc)
 			c.Check(err, check.IsNil)
-			// new mtime must be current (with an
-			// allowance for 1s timestamp precision)
 			c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
 		}
-
-		// Check for current Mtime after Put (applies to all
-		// scenarios)
-		loc, blk = setupScenario()
-		err = v.Put(context.Background(), loc, blk)
-		c.Check(err, check.IsNil)
-		t, err := v.Mtime(loc)
-		c.Check(err, check.IsNil)
-		c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
 	}
 }
 
@@ -547,13 +562,14 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-	err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+	key := v.key(loc)
+	err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
 		v.logger.Printf("PutRaw: %s: %+v", loc, err)
 	}
-	err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+	err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
-		v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+		v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
 	}
 }
 
@@ -562,7 +578,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
 	v.serverClock.now = &lastPut
-	err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+	err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
 		panic(err)
 	}

commit 940139d6752fd9c13dbc1ae6d921203a0b1087a9
Author: Tom Clegg <tom at curii.com>
Date:   Mon Sep 20 17:46:43 2021 -0400

    17749: Support PrefixLength in new S3 driver.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 4bcffd909..5bd575b12 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1297,6 +1297,7 @@ Clusters:
           ConnectTimeout: 1m
           ReadTimeout: 10m
           RaceWindow: 24h
+          PrefixLength: 0
           # Use aws-s3-go (v2) instead of goamz
           UseAWSS3v2Driver: false
 
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index fd07592e9..4e24a5c7f 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -1303,6 +1303,7 @@ Clusters:
           ConnectTimeout: 1m
           ReadTimeout: 10m
           RaceWindow: 24h
+          PrefixLength: 0
           # Use aws-s3-go (v2) instead of goamz
           UseAWSS3v2Driver: false
 
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index ad9a4da03..f1d27b8dc 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -311,6 +311,7 @@ type S3VolumeDriverParameters struct {
 	ReadTimeout        Duration
 	RaceWindow         Duration
 	UnsafeDelete       bool
+	PrefixLength       int
 }
 
 type AzureVolumeDriverParameters struct {
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
index baded4116..fb2e97efb 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3aws_volume.go
@@ -83,8 +83,22 @@ const (
 var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 var s3AWSZeroTime time.Time
 
-func (v *S3AWSVolume) isKeepBlock(s string) bool {
-	return s3AWSKeepBlockRegexp.MatchString(s)
+func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
+	if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
+		s = s[v.PrefixLength+1:]
+	}
+	return s, s3AWSKeepBlockRegexp.MatchString(s)
+}
+
+// Return the key used for a given loc. If PrefixLength==0 then
+// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
+// "abc/abcdef0123", etc.
+func (v *S3AWSVolume) key(loc string) string {
+	if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
+		return loc[:v.PrefixLength] + "/" + loc
+	} else {
+		return loc
+	}
 }
 
 func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
@@ -109,11 +123,12 @@ func (v *S3AWSVolume) translateError(err error) error {
 	return err
 }
 
-// safeCopy calls CopyObjectRequest, and checks the response to make sure the
-// copy succeeded and updated the timestamp on the destination object
+// safeCopy calls CopyObjectRequest, and checks the response to make
+// sure the copy succeeded and updated the timestamp on the
+// destination object
 //
-// (If something goes wrong during the copy, the error will be embedded in the
-// 200 OK response)
+// (If something goes wrong during the copy, the error will be
+// embedded in the 200 OK response)
 func (v *S3AWSVolume) safeCopy(dst, src string) error {
 	input := &s3.CopyObjectInput{
 		Bucket:      aws.String(v.bucket.bucket),
@@ -222,9 +237,10 @@ func (v *S3AWSVolume) GetDeviceID() string {
 
 // Compare the given data with the stored data.
 func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
+	key := v.key(loc)
 	errChan := make(chan error, 1)
 	go func() {
-		_, err := v.Head("recent/" + loc)
+		_, err := v.head("recent/" + key)
 		errChan <- err
 	}()
 	var err error
@@ -234,8 +250,8 @@ func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) er
 	case err = <-errChan:
 	}
 	if err != nil {
-		// Checking for "loc" itself here would interfere with
-		// future GET requests.
+		// Checking for the key itself here would interfere
+		// with future GET requests.
 		//
 		// On AWS, if X doesn't exist, a HEAD or GET request
 		// for X causes X's non-existence to be cached. Thus,
@@ -259,7 +275,7 @@ func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) er
 
 	input := &s3.GetObjectInput{
 		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String(loc),
+		Key:    aws.String(key),
 	}
 
 	req := v.bucket.svc.GetObjectRequest(input)
@@ -283,29 +299,30 @@ func (v *S3AWSVolume) EmptyTrash() {
 	startT := time.Now()
 
 	emptyOneKey := func(trash *s3.Object) {
-		loc := strings.TrimPrefix(*trash.Key, "trash/")
-		if !v.isKeepBlock(loc) {
+		key := strings.TrimPrefix(*trash.Key, "trash/")
+		loc, isblk := v.isKeepBlock(key)
+		if !isblk {
 			return
 		}
 		atomic.AddInt64(&bytesInTrash, *trash.Size)
 		atomic.AddInt64(&blocksInTrash, 1)
 
-		trashT := *(trash.LastModified)
-		recent, err := v.Head("recent/" + loc)
+		trashT := *trash.LastModified
+		recent, err := v.head("recent/" + key)
 		if err != nil && os.IsNotExist(v.translateError(err)) {
-			v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
+			v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
 			err = v.Untrash(loc)
 			if err != nil {
 				v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
 			}
 			return
 		} else if err != nil {
-			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
 			return
 		}
 		if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
 			if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
-				// recent/loc is too old to protect
+				// recent/key is too old to protect
 				// loc from being Trashed again during
 				// the raceWindow that starts if we
 				// delete trash/X now.
@@ -314,14 +331,14 @@ func (v *S3AWSVolume) EmptyTrash() {
 				// < BlobSigningTTL - raceWindow) is
 				// necessary to avoid starvation.
 				v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
-				v.fixRace(loc)
+				v.fixRace(key)
 				v.Touch(loc)
 				return
 			}
-			_, err := v.Head(loc)
+			_, err := v.head(key)
 			if os.IsNotExist(err) {
 				v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
-				v.fixRace(loc)
+				v.fixRace(key)
 				return
 			} else if err != nil {
 				v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
@@ -339,18 +356,18 @@ func (v *S3AWSVolume) EmptyTrash() {
 		atomic.AddInt64(&bytesDeleted, *trash.Size)
 		atomic.AddInt64(&blocksDeleted, 1)
 
-		_, err = v.Head(loc)
+		_, err = v.head(*trash.Key)
 		if err == nil {
 			v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
 			return
 		}
 		if !os.IsNotExist(v.translateError(err)) {
-			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
 			return
 		}
-		err = v.bucket.Del("recent/" + loc)
+		err = v.bucket.Del("recent/" + key)
 		if err != nil {
-			v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+			v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
 		}
 	}
 
@@ -386,21 +403,21 @@ func (v *S3AWSVolume) EmptyTrash() {
 }
 
 // fixRace(X) is called when "recent/X" exists but "X" doesn't
-// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
-// there was a race between Put and Trash, fixRace recovers from the
-// race by Untrashing the block.
-func (v *S3AWSVolume) fixRace(loc string) bool {
-	trash, err := v.Head("trash/" + loc)
+// exist. If the timestamps on "recent/X" and "trash/X" indicate there
+// was a race between Put and Trash, fixRace recovers from the race by
+// Untrashing the block.
+func (v *S3AWSVolume) fixRace(key string) bool {
+	trash, err := v.head("trash/" + key)
 	if err != nil {
 		if !os.IsNotExist(v.translateError(err)) {
-			v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+			v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
 		}
 		return false
 	}
 
-	recent, err := v.Head("recent/" + loc)
+	recent, err := v.head("recent/" + key)
 	if err != nil {
-		v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+		v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
 		return false
 	}
 
@@ -413,9 +430,9 @@ func (v *S3AWSVolume) fixRace(loc string) bool {
 		return false
 	}
 
-	v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
-	v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
-	err = v.safeCopy(loc, "trash/"+loc)
+	v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+	v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
+	err = v.safeCopy(key, "trash/"+key)
 	if err != nil {
 		v.logger.WithError(err).Error("fixRace: copy failed")
 		return false
@@ -423,10 +440,10 @@ func (v *S3AWSVolume) fixRace(loc string) bool {
 	return true
 }
 
-func (v *S3AWSVolume) Head(loc string) (result *s3.HeadObjectOutput, err error) {
+func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
 	input := &s3.HeadObjectInput{
 		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String(loc),
+		Key:    aws.String(key),
 	}
 
 	req := v.bucket.svc.HeadObjectRequest(input)
@@ -449,7 +466,7 @@ func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, err
 	return getWithPipe(ctx, loc, buf, v)
 }
 
-func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+func (v *S3AWSVolume) readWorker(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
 	buf := make([]byte, 0, 67108864)
 	awsBuf := aws.NewWriteAtBuffer(buf)
 
@@ -462,7 +479,7 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCl
 
 	_, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
 		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String(loc),
+		Key:    aws.String(key),
 	})
 	v.bucket.stats.TickOps("get")
 	v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
@@ -478,7 +495,8 @@ func (v *S3AWSVolume) readWorker(ctx context.Context, loc string) (rdr io.ReadCl
 
 // ReadBlock implements BlockReader.
 func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
-	rdr, err := v.readWorker(ctx, loc)
+	key := v.key(loc)
+	rdr, err := v.readWorker(ctx, key)
 
 	if err == nil {
 		_, err2 := io.Copy(w, rdr)
@@ -493,19 +511,19 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) er
 		return err
 	}
 
-	_, err = v.Head("recent/" + loc)
+	_, err = v.head("recent/" + key)
 	err = v.translateError(err)
 	if err != nil {
 		// If we can't read recent/X, there's no point in
 		// trying fixRace. Give up.
 		return err
 	}
-	if !v.fixRace(loc) {
+	if !v.fixRace(key) {
 		err = os.ErrNotExist
 		return err
 	}
 
-	rdr, err = v.readWorker(ctx, loc)
+	rdr, err = v.readWorker(ctx, key)
 	if err != nil {
 		v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
 		err = v.translateError(err)
@@ -517,7 +535,7 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) er
 	return err
 }
 
-func (v *S3AWSVolume) writeObject(ctx context.Context, name string, r io.Reader) error {
+func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
 	if r == nil {
 		// r == nil leads to a memory violation in func readFillBuf in
 		// aws-sdk-go-v2 at v0.23.0/service/s3/s3manager/upload.go
@@ -526,13 +544,13 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, name string, r io.Reader)
 
 	uploadInput := s3manager.UploadInput{
 		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String(name),
+		Key:    aws.String(key),
 		Body:   r,
 	}
 
-	if len(name) == 32 {
+	if loc, ok := v.isKeepBlock(key); ok {
 		var contentMD5 string
-		md5, err := hex.DecodeString(name)
+		md5, err := hex.DecodeString(loc)
 		if err != nil {
 			return err
 		}
@@ -578,11 +596,12 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 	}
 
 	r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
-	err := v.writeObject(ctx, loc, r)
+	key := v.key(loc)
+	err := v.writeObject(ctx, key, r)
 	if err != nil {
 		return err
 	}
-	return v.writeObject(ctx, "recent/"+loc, nil)
+	return v.writeObject(ctx, "recent/"+key, nil)
 }
 
 type s3awsLister struct {
@@ -675,6 +694,7 @@ func (lister *s3awsLister) pop() (k *s3.Object) {
 // IndexTo writes a complete list of locators with the given prefix
 // for which Get() can retrieve data.
 func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
+	prefix = v.key(prefix)
 	// Use a merge sort to find matching sets of X and recent/X.
 	dataL := s3awsLister{
 		Logger:   v.logger,
@@ -698,7 +718,8 @@ func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
 			// over all of them needlessly with dataL.
 			break
 		}
-		if !v.isKeepBlock(*data.Key) {
+		loc, isblk := v.isKeepBlock(*data.Key)
+		if !isblk {
 			continue
 		}
 
@@ -730,30 +751,31 @@ func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
 		// We truncate sub-second precision here. Otherwise
 		// timestamps will never match the RFC1123-formatted
 		// Last-Modified values parsed by Mtime().
-		fmt.Fprintf(writer, "%s+%d %d\n", *data.Key, *data.Size, stamp.LastModified.Unix()*1000000000)
+		fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
 	}
 	return dataL.Error()
 }
 
 // Mtime returns the stored timestamp for the given locator.
 func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
-	_, err := v.Head(loc)
+	key := v.key(loc)
+	_, err := v.head(key)
 	if err != nil {
 		return s3AWSZeroTime, v.translateError(err)
 	}
-	resp, err := v.Head("recent/" + loc)
+	resp, err := v.head("recent/" + key)
 	err = v.translateError(err)
 	if os.IsNotExist(err) {
 		// The data object X exists, but recent/X is missing.
-		err = v.writeObject(context.Background(), "recent/"+loc, nil)
+		err = v.writeObject(context.Background(), "recent/"+key, nil)
 		if err != nil {
-			v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+			v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
 			return s3AWSZeroTime, v.translateError(err)
 		}
-		v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+loc)
-		resp, err = v.Head("recent/" + loc)
+		v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
+		resp, err = v.head("recent/" + key)
 		if err != nil {
-			v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+			v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
 			return s3AWSZeroTime, v.translateError(err)
 		}
 	} else if err != nil {
@@ -784,22 +806,23 @@ func (v *S3AWSVolume) Touch(loc string) error {
 	if v.volume.ReadOnly {
 		return MethodDisabledError
 	}
-	_, err := v.Head(loc)
+	key := v.key(loc)
+	_, err := v.head(key)
 	err = v.translateError(err)
-	if os.IsNotExist(err) && v.fixRace(loc) {
+	if os.IsNotExist(err) && v.fixRace(key) {
 		// The data object got trashed in a race, but fixRace
 		// rescued it.
 	} else if err != nil {
 		return err
 	}
-	err = v.writeObject(context.Background(), "recent/"+loc, nil)
+	err = v.writeObject(context.Background(), "recent/"+key, nil)
 	return v.translateError(err)
 }
 
-// checkRaceWindow returns a non-nil error if trash/loc is, or might
-// be, in the race window (i.e., it's not safe to trash loc).
-func (v *S3AWSVolume) checkRaceWindow(loc string) error {
-	resp, err := v.Head("trash/" + loc)
+// checkRaceWindow returns a non-nil error if trash/key is, or might
+// be, in the race window (i.e., it's not safe to trash key).
+func (v *S3AWSVolume) checkRaceWindow(key string) error {
+	resp, err := v.head("trash/" + key)
 	err = v.translateError(err)
 	if os.IsNotExist(err) {
 		// OK, trash/X doesn't exist so we're not in the race
@@ -817,7 +840,7 @@ func (v *S3AWSVolume) checkRaceWindow(loc string) error {
 		// trash/X's lifetime. The new timestamp might not
 		// become visible until now+raceWindow, and EmptyTrash
 		// is allowed to delete trash/X before then.
-		return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+		return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
 	}
 	// trash/X exists, but it won't be eligible for deletion until
 	// after now+raceWindow, so it's safe to overwrite it.
@@ -831,7 +854,6 @@ func (b *s3AWSbucket) Del(path string) error {
 	}
 	req := b.svc.DeleteObjectRequest(input)
 	_, err := req.Send(context.Background())
-	//err := b.Bucket().Del(path)
 	b.stats.TickOps("delete")
 	b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
 	b.stats.TickErr(err)
@@ -848,30 +870,32 @@ func (v *S3AWSVolume) Trash(loc string) error {
 	} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
 		return nil
 	}
+	key := v.key(loc)
 	if v.cluster.Collections.BlobTrashLifetime == 0 {
 		if !v.UnsafeDelete {
 			return ErrS3TrashDisabled
 		}
-		return v.translateError(v.bucket.Del(loc))
+		return v.translateError(v.bucket.Del(key))
 	}
-	err := v.checkRaceWindow(loc)
+	err := v.checkRaceWindow(key)
 	if err != nil {
 		return err
 	}
-	err = v.safeCopy("trash/"+loc, loc)
+	err = v.safeCopy("trash/"+key, key)
 	if err != nil {
 		return err
 	}
-	return v.translateError(v.bucket.Del(loc))
+	return v.translateError(v.bucket.Del(key))
 }
 
 // Untrash moves block from trash back into store
 func (v *S3AWSVolume) Untrash(loc string) error {
-	err := v.safeCopy(loc, "trash/"+loc)
+	key := v.key(loc)
+	err := v.safeCopy(key, "trash/"+key)
 	if err != nil {
 		return err
 	}
-	err = v.writeObject(context.Background(), "recent/"+loc, nil)
+	err = v.writeObject(context.Background(), "recent/"+key, nil)
 	return v.translateError(err)
 }
 
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3aws_volume_test.go
index 0387d52f1..fa9e270e0 100644
--- a/services/keepstore/s3aws_volume_test.go
+++ b/services/keepstore/s3aws_volume_test.go
@@ -87,6 +87,14 @@ func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
 	})
 }
 
+func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
+	DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+		v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+		v.PrefixLength = 3
+		return v
+	})
+}
+
 func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
 	v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
 	v.IndexPageSize = 3
@@ -333,7 +341,7 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
 			panic(err)
 		}
 		v.serverClock.now = nil
-		_, err = v.Head(key)
+		_, err = v.head(key)
 		if err != nil {
 			panic(err)
 		}
@@ -438,81 +446,88 @@ func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
 			false, false, false, true, true, true,
 		},
 	} {
-		c.Log("Scenario: ", scenario.label)
-
-		// We have a few tests to run for each scenario, and
-		// the tests are expected to change state. By calling
-		// this setup func between tests, we (re)create the
-		// scenario as specified, using a new unique block
-		// locator to prevent interference from previous
-		// tests.
-
-		setupScenario := func() (string, []byte) {
-			nextKey++
-			blk := []byte(fmt.Sprintf("%d", nextKey))
-			loc := fmt.Sprintf("%x", md5.Sum(blk))
-			c.Log("\t", loc)
-			putS3Obj(scenario.dataT, loc, blk)
-			putS3Obj(scenario.recentT, "recent/"+loc, nil)
-			putS3Obj(scenario.trashT, "trash/"+loc, blk)
-			v.serverClock.now = &t0
-			return loc, blk
-		}
-
-		// Check canGet
-		loc, blk := setupScenario()
-		buf := make([]byte, len(blk))
-		_, err := v.Get(context.Background(), loc, buf)
-		c.Check(err == nil, check.Equals, scenario.canGet)
-		if err != nil {
-			c.Check(os.IsNotExist(err), check.Equals, true)
-		}
-
-		// Call Trash, then check canTrash and canGetAfterTrash
-		loc, _ = setupScenario()
-		err = v.Trash(loc)
-		c.Check(err == nil, check.Equals, scenario.canTrash)
-		_, err = v.Get(context.Background(), loc, buf)
-		c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-		if err != nil {
-			c.Check(os.IsNotExist(err), check.Equals, true)
-		}
-
-		// Call Untrash, then check canUntrash
-		loc, _ = setupScenario()
-		err = v.Untrash(loc)
-		c.Check(err == nil, check.Equals, scenario.canUntrash)
-		if scenario.dataT != none || scenario.trashT != none {
-			// In all scenarios where the data exists, we
-			// should be able to Get after Untrash --
-			// regardless of timestamps, errors, race
-			// conditions, etc.
+		for _, prefixLength := range []int{0, 3} {
+			v.PrefixLength = prefixLength
+			c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+			// We have a few tests to run for each scenario, and
+			// the tests are expected to change state. By calling
+			// this setup func between tests, we (re)create the
+			// scenario as specified, using a new unique block
+			// locator to prevent interference from previous
+			// tests.
+
+			setupScenario := func() (string, []byte) {
+				nextKey++
+				blk := []byte(fmt.Sprintf("%d", nextKey))
+				loc := fmt.Sprintf("%x", md5.Sum(blk))
+				key := loc
+				if prefixLength > 0 {
+					key = loc[:prefixLength] + "/" + loc
+				}
+				c.Log("\t", loc, "\t", key)
+				putS3Obj(scenario.dataT, key, blk)
+				putS3Obj(scenario.recentT, "recent/"+key, nil)
+				putS3Obj(scenario.trashT, "trash/"+key, blk)
+				v.serverClock.now = &t0
+				return loc, blk
+			}
+
+			// Check canGet
+			loc, blk := setupScenario()
+			buf := make([]byte, len(blk))
+			_, err := v.Get(context.Background(), loc, buf)
+			c.Check(err == nil, check.Equals, scenario.canGet)
+			if err != nil {
+				c.Check(os.IsNotExist(err), check.Equals, true)
+			}
+
+			// Call Trash, then check canTrash and canGetAfterTrash
+			loc, _ = setupScenario()
+			err = v.Trash(loc)
+			c.Check(err == nil, check.Equals, scenario.canTrash)
 			_, err = v.Get(context.Background(), loc, buf)
+			c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+			if err != nil {
+				c.Check(os.IsNotExist(err), check.Equals, true)
+			}
+
+			// Call Untrash, then check canUntrash
+			loc, _ = setupScenario()
+			err = v.Untrash(loc)
+			c.Check(err == nil, check.Equals, scenario.canUntrash)
+			if scenario.dataT != none || scenario.trashT != none {
+				// In all scenarios where the data exists, we
+				// should be able to Get after Untrash --
+				// regardless of timestamps, errors, race
+				// conditions, etc.
+				_, err = v.Get(context.Background(), loc, buf)
+				c.Check(err, check.IsNil)
+			}
+
+			// Call EmptyTrash, then check haveTrashAfterEmpty and
+			// freshAfterEmpty
+			loc, _ = setupScenario()
+			v.EmptyTrash()
+			_, err = v.head("trash/" + v.key(loc))
+			c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+			if scenario.freshAfterEmpty {
+				t, err := v.Mtime(loc)
+				c.Check(err, check.IsNil)
+				// new mtime must be current (with an
+				// allowance for 1s timestamp precision)
+				c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+			}
+
+			// Check for current Mtime after Put (applies to all
+			// scenarios)
+			loc, blk = setupScenario()
+			err = v.Put(context.Background(), loc, blk)
 			c.Check(err, check.IsNil)
-		}
-
-		// Call EmptyTrash, then check haveTrashAfterEmpty and
-		// freshAfterEmpty
-		loc, _ = setupScenario()
-		v.EmptyTrash()
-		_, err = v.Head("trash/" + loc)
-		c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-		if scenario.freshAfterEmpty {
 			t, err := v.Mtime(loc)
 			c.Check(err, check.IsNil)
-			// new mtime must be current (with an
-			// allowance for 1s timestamp precision)
 			c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
 		}
-
-		// Check for current Mtime after Put (applies to all
-		// scenarios)
-		loc, blk = setupScenario()
-		err = v.Put(context.Background(), loc, blk)
-		c.Check(err, check.IsNil)
-		t, err := v.Mtime(loc)
-		c.Check(err, check.IsNil)
-		c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
 	}
 }
 
@@ -603,7 +618,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, cluster *arvados.Clust
 
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
-
+	key := v.key(loc)
 	r := NewCountingReader(bytes.NewReader(block), v.bucket.stats.TickOutBytes)
 
 	uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
@@ -613,35 +628,35 @@ func (v *TestableS3AWSVolume) PutRaw(loc string, block []byte) {
 
 	_, err := uploader.Upload(&s3manager.UploadInput{
 		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String(loc),
+		Key:    aws.String(key),
 		Body:   r,
 	})
 	if err != nil {
-		v.logger.Printf("PutRaw: %s: %+v", loc, err)
+		v.logger.Printf("PutRaw: %s: %+v", key, err)
 	}
 
 	empty := bytes.NewReader([]byte{})
 	_, err = uploader.Upload(&s3manager.UploadInput{
 		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String("recent/" + loc),
+		Key:    aws.String("recent/" + key),
 		Body:   empty,
 	})
 	if err != nil {
-		v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
+		v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
 	}
 }
 
 // TouchWithDate turns back the clock while doing a Touch(). We assume
 // there are no other operations happening on the same s3test server
 // while we do this.
-func (v *TestableS3AWSVolume) TouchWithDate(locator string, lastPut time.Time) {
+func (v *TestableS3AWSVolume) TouchWithDate(loc string, lastPut time.Time) {
 	v.serverClock.now = &lastPut
 
 	uploader := s3manager.NewUploaderWithClient(v.bucket.svc)
 	empty := bytes.NewReader([]byte{})
 	_, err := uploader.UploadWithContext(context.Background(), &s3manager.UploadInput{
 		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String("recent/" + locator),
+		Key:    aws.String("recent/" + v.key(loc)),
 		Body:   empty,
 	})
 	if err != nil {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list