[ARVADOS] created: 2.1.0-1697-g5ce5bf966

Git user git at public.arvados.org
Fri Dec 3 20:13:37 UTC 2021

        at  5ce5bf966dfabbc0beb7330d4c976a23fde3fd83 (commit)

commit 5ce5bf966dfabbc0beb7330d4c976a23fde3fd83
Author: Ward Vandewege <ward at curii.com>
Date:   Fri Dec 3 15:12:38 2021 -0500

    17339: remove unnecessary memory allocation when reading from Keep with
           the S3 v2 driver. Also fix a few incorrect calls to log.Error()
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 52683d971..63a23687e 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -709,7 +709,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 		if filehash != hash {
 			// TODO: Try harder to tell a sysadmin about
 			// this.
-			log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
+			log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
 			errorToCaller = DiskHashError
@@ -976,7 +976,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 			// to tell which one is wanted if we have
 			// both, so there's no point writing it even
 			// on a different volume.)
-			log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
+			log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
 			return CollisionError
 		} else if os.IsNotExist(err) {
 			// Block does not exist. This is the only
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
index e6b4d6453..4064809d5 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3aws_volume.go
@@ -463,52 +463,24 @@ func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error)
 // Get a block: copy the block data into buf, and return the number of
 // bytes copied.
 func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-	return getWithPipe(ctx, loc, buf, v)
+	// Do not use getWithPipe here: the BlockReader interface does not pass
+	// through 'buf []byte', and we don't want to allocate two buffers for each
+	// read request. Instead, use a version of ReadBlock that accepts 'buf []byte'
+	// as an input.
+	return v.ReadBlock(ctx, loc, buf)
-func (v *S3AWSVolume) readWorker(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
-	buf := make([]byte, 0, 67108864)
-	awsBuf := aws.NewWriteAtBuffer(buf)
-	downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
-		u.PartSize = PartSize
-		u.Concurrency = ReadConcurrency
-	})
-	v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
-	_, err = downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
-		Bucket: aws.String(v.bucket.bucket),
-		Key:    aws.String(key),
-	})
-	v.bucket.stats.TickOps("get")
-	v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
-	v.bucket.stats.TickErr(err)
-	if err != nil {
-		return nil, v.translateError(err)
-	}
-	buf = awsBuf.Bytes()
-	rdr = NewCountingReader(bytes.NewReader(buf), v.bucket.stats.TickInBytes)
-	return
-// ReadBlock implements BlockReader.
-func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
+func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, buf []byte) (int, error) {
 	key := v.key(loc)
-	rdr, err := v.readWorker(ctx, key)
+	count, err := v.readWorker(ctx, key, buf)
 	if err == nil {
-		_, err2 := io.Copy(w, rdr)
-		if err2 != nil {
-			return err2
-		}
-		return err
+		v.bucket.stats.TickInBytes(uint64(count))
+		return count, err
 	err = v.translateError(err)
 	if !os.IsNotExist(err) {
-		return err
+		return 0, err
 	_, err = v.head("recent/" + key)
@@ -516,23 +488,45 @@ func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) er
 	if err != nil {
 		// If we can't read recent/X, there's no point in
 		// trying fixRace. Give up.
-		return err
+		return 0, err
 	if !v.fixRace(key) {
 		err = os.ErrNotExist
-		return err
+		return 0, err
-	rdr, err = v.readWorker(ctx, key)
+	count, err = v.readWorker(ctx, key, buf)
 	if err != nil {
 		v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
 		err = v.translateError(err)
-		return err
+		return 0, err
+	v.bucket.stats.TickInBytes(uint64(count))
+	return count, err
-	_, err = io.Copy(w, rdr)
+func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) {
+	awsBuf := aws.NewWriteAtBuffer(buf)
+	downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
+		u.PartSize = PartSize
+		u.Concurrency = ReadConcurrency
+	})
-	return err
+	v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
+	count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
+		Bucket: aws.String(v.bucket.bucket),
+		Key:    aws.String(key),
+	})
+	v.bucket.stats.TickOps("get")
+	v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
+	v.bucket.stats.TickErr(err)
+	if err != nil {
+		return 0, v.translateError(err)
+	}
+	buf = awsBuf.Bytes()
+	return int(count), err
 func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {

commit 464c1562415ac7b5b5503f41b20c3183610dc899
Author: Ward Vandewege <ward at curii.com>
Date:   Thu Dec 2 15:59:18 2021 -0500

    17339: remove unnecessary memory allocation when writing to Keep with
           the S3 v2 driver.
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/services/keepstore/count.go b/services/keepstore/count.go
index 272b5017c..2dc7e04b4 100644
--- a/services/keepstore/count.go
+++ b/services/keepstore/count.go
@@ -22,6 +22,15 @@ func NewCountingReader(r io.Reader, f func(uint64)) io.ReadCloser {
+func NewCountingReaderAtSeeker(r interface{}, f func(uint64)) *countingReaderAtSeeker {
+	return &countingReaderAtSeeker{readerAtSeeker: readerAtSeeker{
+		readSeeker: r.(io.ReadSeeker),
+		readerAt:   r.(io.ReaderAt),
+	},
+		counter: f,
+	}
 type countingReadWriter struct {
 	reader  io.Reader
 	writer  io.Writer
@@ -46,3 +55,30 @@ func (crw *countingReadWriter) Close() error {
 	return nil
+type readerAtSeeker struct {
+	readSeeker io.ReadSeeker
+	readerAt   io.ReaderAt
+type countingReaderAtSeeker struct {
+	readerAtSeeker
+	counter func(uint64)
+func (crw *countingReaderAtSeeker) Read(buf []byte) (int, error) {
+	n, err := crw.readSeeker.Read(buf)
+	crw.counter(uint64(n))
+	return n, err
+func (crw *countingReaderAtSeeker) ReadAt(buf []byte, off int64) (int, error) {
+	n, err := crw.readerAt.ReadAt(buf, off)
+	crw.counter(uint64(n))
+	return n, err
+func (crw *countingReaderAtSeeker) Seek(offset int64, whence int) (int64, error) {
+	n, err := crw.readSeeker.Seek(offset, whence)
+	return n, err
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3aws_volume.go
index cb0b73cb8..e6b4d6453 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3aws_volume.go
@@ -586,7 +586,9 @@ func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader)
 // Put writes a block.
 func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
-	return putWithPipe(ctx, loc, block, v)
+	// Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
+	// sdk to avoid memory allocation there. See #17339 for more information.
+	return v.WriteBlock(ctx, loc, bytes.NewReader(block))
 // WriteBlock implements BlockWriter.
@@ -595,7 +597,7 @@ func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 		return MethodDisabledError
-	r := NewCountingReader(rdr, v.bucket.stats.TickOutBytes)
+	r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
 	key := v.key(loc)
 	err := v.writeObject(ctx, key, r)
 	if err != nil {



More information about the arvados-commits mailing list