[arvados] updated: 2.7.0-6006-g022107bd52

git repository hosting git at public.arvados.org
Thu Feb 15 20:44:05 UTC 2024


Summary of changes:
 services/keepstore/azure_blob_volume.go            |  49 +++-----
 services/keepstore/azure_blob_volume_test.go       |  28 ++---
 services/keepstore/keepstore.go                    | 107 ++++++++++-------
 services/keepstore/keepstore_test.go               |  22 ++--
 services/keepstore/router_test.go                  |  14 +--
 .../keepstore/{s3aws_volume.go => s3_volume.go}    |  41 ++-----
 .../{s3aws_volume_test.go => s3_volume_test.go}    |  51 ++++----
 services/keepstore/streamwriterat.go               |   8 +-
 services/keepstore/unix_volume.go                  |  64 +++++-----
 services/keepstore/unix_volume_test.go             | 131 +++------------------
 services/keepstore/volume.go                       |   2 +-
 services/keepstore/volume_generic_test.go          |  61 +++++-----
 services/keepstore/volume_test.go                  |  50 ++++++++
 13 files changed, 285 insertions(+), 343 deletions(-)
 rename services/keepstore/{s3aws_volume.go => s3_volume.go} (97%)
 rename services/keepstore/{s3aws_volume_test.go => s3_volume_test.go} (92%)

       via  022107bd52092c658208e74161581c6bedda4a5f (commit)
       via  7ef752823f118079af629604ac29143e7c156687 (commit)
       via  99e43aaefc4a76908fd5f649edf0512c3800e021 (commit)
      from  39f6e9f70f683237d9488faac1c549ca19ac9dae (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 022107bd52092c658208e74161581c6bedda4a5f
Author: Tom Clegg <tom at curii.com>
Date:   Thu Feb 15 15:20:41 2024 -0500

    2960: Move streaming from volume to keepstore layer.
    
    Avoids using 2x buffers when comparing existing data during
    BlockWrite.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 31660614f3..2c8a79350c 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -147,24 +147,22 @@ func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a BlockWrite operation is in progress,
 // and wait for it to finish writing.
-func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
 	trashed, _, err := v.checkTrashed(hash)
 	if err != nil {
-		return 0, err
+		return err
 	}
 	if trashed {
-		return 0, os.ErrNotExist
+		return os.ErrNotExist
 	}
 	buf, err := v.bufferPool.GetContext(ctx)
 	if err != nil {
-		return 0, err
+		return err
 	}
 	defer v.bufferPool.Put(buf)
-	streamer := newStreamWriterAt(writeTo, 65536, buf)
-	defer streamer.Close()
 	var deadline time.Time
-	size, err := v.get(ctx, hash, streamer)
-	for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
+	wrote, err := v.get(ctx, hash, w)
+	for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
 		// Seeing a brand new empty block probably means we're
 		// in a race with CreateBlob, which under the hood
 		// (apparently) does "CreateEmpty" and "CommitData"
@@ -185,20 +183,15 @@ func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io
 		}
 		select {
 		case <-ctx.Done():
-			return 0, ctx.Err()
+			return ctx.Err()
 		case <-time.After(v.WriteRacePollTime.Duration()):
 		}
-		size, err = v.get(ctx, hash, streamer)
+		wrote, err = v.get(ctx, hash, w)
 	}
 	if !deadline.IsZero() {
-		ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
-	}
-	if err != nil {
-		streamer.Close()
-		return streamer.Wrote(), err
+		ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote)
 	}
-	err = streamer.Close()
-	return streamer.Wrote(), err
+	return err
 }
 
 func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
@@ -212,6 +205,7 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
 
 	pieces := 1
 	expectSize := BlockSize
+	sizeKnown := false
 	if pieceSize < BlockSize {
 		// Unfortunately the handler doesn't tell us how long
 		// the blob is expected to be, so we have to ask
@@ -225,15 +219,15 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
 		}
 		expectSize = int(props.ContentLength)
 		pieces = (expectSize + pieceSize - 1) / pieceSize
+		sizeKnown = true
 	}
 
 	if expectSize == 0 {
 		return 0, nil
 	}
 
-	// We'll update this actualSize if/when we get the last piece.
-	actualSize := -1
 	errors := make(chan error, pieces)
+	var wrote atomic.Int64
 	var wg sync.WaitGroup
 	wg.Add(pieces)
 	for p := 0; p < pieces; p++ {
@@ -289,31 +283,24 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
 				rdr.Close()
 			}()
 			n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos))
-			if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+			wrote.Add(n)
+			if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) {
 				// If we don't know the actual size,
 				// and just tried reading 64 MiB, it's
 				// normal to encounter EOF.
 			} else if err != nil {
-				if ctx.Err() == nil {
-					errors <- err
-				}
+				errors <- err
 				cancel()
 				return
 			}
-			if p == pieces-1 {
-				actualSize = startPos + int(n)
-			}
 		}(p)
 	}
 	wg.Wait()
 	close(errors)
 	if len(errors) > 0 {
-		return 0, v.translateError(<-errors)
-	}
-	if ctx.Err() != nil {
-		return 0, ctx.Err()
+		return int(wrote.Load()), v.translateError(<-errors)
 	}
-	return actualSize, nil
+	return int(wrote.Load()), ctx.Err()
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index c629c9dc15..b8acd980a1 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -13,7 +13,6 @@ import (
 	"encoding/xml"
 	"flag"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"math/rand"
 	"net"
@@ -490,15 +489,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
 		if err != nil {
 			c.Error(err)
 		}
-		gotData := bytes.NewBuffer(nil)
-		gotLen, err := v.BlockRead(context.Background(), hash, gotData)
+		gotData := &brbuffer{}
+		err = v.BlockRead(context.Background(), hash, gotData)
 		if err != nil {
 			c.Error(err)
 		}
 		gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
-		if gotLen != size {
-			c.Errorf("length mismatch: got %d != %d", gotLen, size)
-		}
+		c.Check(gotData.Len(), check.Equals, size)
 		if gotHash != hash {
 			c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
 		}
@@ -532,7 +529,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		_, err := v.BlockRead(context.Background(), TestHash, io.Discard)
+		err := v.BlockRead(context.Background(), TestHash, brdiscard)
 		if err != nil {
 			c.Error(err)
 		}
@@ -570,15 +567,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
 	allDone := make(chan struct{})
 	go func() {
 		defer close(allDone)
-		buf := bytes.NewBuffer(nil)
-		n, err := v.BlockRead(context.Background(), TestHash, buf)
+		buf := &brbuffer{}
+		err := v.BlockRead(context.Background(), TestHash, buf)
 		if err != nil {
 			c.Error(err)
 			return
 		}
-		if n != 0 {
-			c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n)
-		}
+		c.Check(buf.String(), check.Equals, "")
 	}()
 	select {
 	case <-allDone:
@@ -596,8 +591,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
 func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
 	s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
 		v.BlockWriteRaw(TestHash, TestBlock)
-		_, err := v.BlockRead(ctx, TestHash, io.Discard)
-		return err
+		return v.BlockRead(ctx, TestHash, brdiscard)
 	})
 }
 
@@ -667,7 +661,7 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
 	c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
 
 	loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-	_, err := volume.BlockRead(context.Background(), loc, io.Discard)
+	err := volume.BlockRead(context.Background(), loc, brdiscard)
 	c.Check(err, check.NotNil)
 	c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
 	c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
@@ -679,9 +673,9 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
 	c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
 	c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
 
-	_, err = volume.BlockRead(context.Background(), loc, io.Discard)
+	err = volume.BlockRead(context.Background(), loc, brdiscard)
 	c.Check(err, check.IsNil)
-	_, err = volume.BlockRead(context.Background(), loc, io.Discard)
+	err = volume.BlockRead(context.Background(), loc, brdiscard)
 	c.Check(err, check.IsNil)
 	c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 62b6d15e56..c9a8023059 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -243,13 +243,58 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
 	} else {
 		out = io.MultiWriter(out, hashcheck)
 	}
+
+	buf, err := ks.bufferPool.GetContext(ctx)
+	if err != nil {
+		return 0, err
+	}
+	defer ks.bufferPool.Put(buf)
+	streamer := newStreamWriterAt(out, 65536, buf)
+	defer streamer.Close()
+
 	var errToCaller error = os.ErrNotExist
 	for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
 		if ctx.Err() != nil {
 			return 0, ctx.Err()
 		}
-		n, err = mnt.BlockRead(ctx, li.hash, out)
-		if err == nil && li.size > 0 && n != li.size {
+		err := mnt.BlockRead(ctx, li.hash, streamer)
+		if err != nil {
+			if streamer.WroteAt() != 0 {
+				// BlockRead encountered an error
+				// after writing some data, so it's
+				// too late to try another
+				// volume. Flush streamer before
+				// calling Wrote() to ensure our
+				// return value accurately reflects
+				// the number of bytes written to
+				// opts.WriteTo.
+				streamer.Close()
+				return streamer.Wrote(), err
+			}
+			if !os.IsNotExist(err) {
+				errToCaller = err
+			}
+			continue
+		}
+		if li.size == 0 {
+			// hashCheckingWriter isn't in use because we
+			// don't know the expected size. All we can do
+			// is check after writing all the data, and
+			// trust the caller is doing a HEAD request so
+			// it's not too late to set an error code in
+			// the response header.
+			err = streamer.Close()
+			if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
+				err = errChecksum
+			}
+			if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+				// We didn't set the content-length header
+				// above because we didn't know the block size
+				// until now.
+				rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
+			}
+			return streamer.WroteAt(), err
+		} else if streamer.WroteAt() != li.size {
 			// If the backend read fewer bytes than
 			// expected but returns no error, we can
 			// classify this as a checksum error (even
@@ -260,42 +305,17 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
 			// it anyway, but if it's a HEAD request the
 			// caller can still change the response status
 			// code.
-			return n, errChecksum
-		}
-		if err == nil && li.size == 0 {
-			// hashCheckingWriter isn't in use because we
-			// don't know the expected size. All we can do
-			// is check after writing all the data, and
-			// trust the caller is doing a HEAD request so
-			// it's not too late to set an error code in
-			// the response header.
-			if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
-				return n, errChecksum
-			}
-		}
-		if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
-			// We didn't set the content-length header
-			// above because we didn't know the block size
-			// until now.
-			rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
-		}
-		if n > 0 || err == nil {
-			// success, or there's an error but we can't
-			// retry because we've already sent some data.
-			return n, err
-		}
-		if !os.IsNotExist(err) {
-			// If some volume returns a transient error,
-			// return it to the caller instead of "Not
-			// found" so it can retry.
-			errToCaller = err
+			return streamer.WroteAt(), errChecksum
 		}
+		// Ensure streamer flushes all buffered data without
+		// errors.
+		err = streamer.Close()
+		return streamer.Wrote(), err
 	}
 	return 0, errToCaller
 }
 
 func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
-	ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
 	token := ctxToken(ctx)
 	if token == "" {
 		return 0, errNoTokenProvided
@@ -459,7 +479,7 @@ func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOpti
 			continue
 		}
 		cmp := &checkEqual{Expect: opts.Data}
-		if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+		if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
 			if !cmp.Equal() {
 				return resp, errCollision
 			}
@@ -564,25 +584,28 @@ func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
 	return mnts
 }
 
-// checkEqual reports whether the data written to it (via io.Writer
+// checkEqual reports whether the data written to it (via io.WriterAt
 // interface) is equal to the expected data.
 //
 // Expect should not be changed after the first Write.
+//
+// Results are undefined if WriteAt is called with overlapping ranges.
 type checkEqual struct {
-	Expect     []byte
-	equalUntil int
+	Expect   []byte
+	equal    atomic.Int64
+	notequal atomic.Bool
 }
 
 func (ce *checkEqual) Equal() bool {
-	return ce.equalUntil == len(ce.Expect)
+	return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
 }
 
-func (ce *checkEqual) Write(p []byte) (int, error) {
-	endpos := ce.equalUntil + len(p)
-	if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
-		ce.equalUntil = endpos
+func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
+	endpos := int(offset) + len(p)
+	if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
+		ce.equal.Add(int64(len(p)))
 	} else {
-		ce.equalUntil = -1
+		ce.notequal.Store(true)
 	}
 	return len(p), nil
 }
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 3a01476096..28049506f6 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -372,7 +372,7 @@ func (s *keepstoreSuite) TestBlockTrash(c *C) {
 		return ks.BlockTrash(ctx, loc)
 	}
 	checkexists := func(volidx int) bool {
-		_, err := vol[volidx].BlockRead(ctx, fooHash, io.Discard)
+		err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
 		if !os.IsNotExist(err) {
 			c.Check(err, IsNil)
 		}
@@ -573,7 +573,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
 	for _, mnt := range ks.mounts {
 		err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
 		c.Assert(err, IsNil)
-		_, err = mnt.BlockRead(context.Background(), fooHash, io.Discard)
+		err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
 		c.Assert(err, IsNil)
 	}
 
@@ -581,7 +581,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
 	c.Check(os.IsNotExist(err), Equals, true)
 
 	for _, mnt := range ks.mounts {
-		_, err := mnt.BlockRead(context.Background(), fooHash, io.Discard)
+		err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
 		c.Assert(err, IsNil)
 	}
 }
@@ -693,7 +693,7 @@ type stubVolume struct {
 	// corresponding func (if non-nil). If the func returns an
 	// error, that error is returned to caller. Otherwise, the
 	// stub continues normally.
-	blockRead    func(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+	blockRead    func(ctx context.Context, hash string, writeTo io.WriterAt) error
 	blockWrite   func(ctx context.Context, hash string, data []byte) error
 	deviceID     func() string
 	blockTouch   func(hash string) error
@@ -710,19 +710,19 @@ func (v *stubVolume) log(op, hash string) {
 	v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
 }
 
-func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
 	v.log("read", hash)
 	if v.blockRead != nil {
-		n, err := v.blockRead(ctx, hash, writeTo)
+		err := v.blockRead(ctx, hash, writeTo)
 		if err != nil {
-			return n, err
+			return err
 		}
 	}
 	v.mtx.Lock()
 	ent, ok := v.data[hash]
 	v.mtx.Unlock()
 	if !ok || !ent.trash.IsZero() {
-		return 0, os.ErrNotExist
+		return os.ErrNotExist
 	}
 	wrote := 0
 	for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
@@ -730,13 +730,13 @@ func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writ
 		if len(data) > writesize {
 			data = data[:writesize]
 		}
-		n, err := writeTo.Write(data)
+		n, err := writeTo.WriteAt(data, int64(wrote))
 		wrote += n
 		if err != nil {
-			return wrote, err
+			return err
 		}
 	}
-	return wrote, nil
+	return nil
 }
 
 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
diff --git a/services/keepstore/router_test.go b/services/keepstore/router_test.go
index f4bcdd4ae4..ee7be4768c 100644
--- a/services/keepstore/router_test.go
+++ b/services/keepstore/router_test.go
@@ -268,7 +268,7 @@ func (s *routerSuite) TestBlockTrash(c *C) {
 	resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
 	c.Check(resp.Code, Equals, http.StatusOK)
 	c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
-	_, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+	err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
 	c.Assert(err, Equals, os.ErrNotExist)
 }
 
@@ -281,12 +281,12 @@ func (s *routerSuite) TestBlockUntrash(c *C) {
 	c.Assert(err, IsNil)
 	err = vol0.BlockTrash(fooHash)
 	c.Assert(err, IsNil)
-	_, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+	err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
 	c.Assert(err, Equals, os.ErrNotExist)
 	resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
 	c.Check(resp.Code, Equals, http.StatusOK)
 	c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
-	_, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+	err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
 	c.Check(err, IsNil)
 }
 
@@ -356,8 +356,8 @@ func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
 func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
 	router, cancel := testRouter(c, s.cluster, nil)
 	defer cancel()
-	router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.Writer) (int, error) {
-		return 0, httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
+	router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
+		return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
 	}
 
 	// To test whether we fall back to volume 1 after volume 0
@@ -472,10 +472,10 @@ func (s *routerSuite) TestCancelOnDisconnect(c *C) {
 	defer cancel()
 
 	unblock := make(chan struct{})
-	router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.Writer) (int, error) {
+	router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
 		<-unblock
 		c.Check(ctx.Err(), NotNil)
-		return 0, ctx.Err()
+		return ctx.Err()
 	}
 	go func() {
 		time.Sleep(time.Second / 10)
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index bd79d49e16..d4b90540ea 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -411,24 +411,13 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
 
 // BlockRead reads a Keep block that has been stored as a block blob
 // in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
 	key := v.key(hash)
-	buf, err := v.bufferPool.GetContext(ctx)
-	if err != nil {
-		return 0, err
-	}
-	defer v.bufferPool.Put(buf)
-
-	streamer := newStreamWriterAt(writeTo, 65536, buf)
-	defer streamer.Close()
-	err = v.readWorker(ctx, key, streamer)
+	err := v.readWorker(ctx, key, w)
 	if err != nil {
 		err = v.translateError(err)
 		if !os.IsNotExist(err) {
-			return 0, err
-		}
-		if streamer.WroteAt() > 0 {
-			return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+			return err
 		}
 
 		_, err = v.head("recent/" + key)
@@ -436,25 +425,21 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer
 		if err != nil {
 			// If we can't read recent/X, there's no point in
 			// trying fixRace. Give up.
-			return 0, err
+			return err
 		}
 		if !v.fixRace(key) {
 			err = os.ErrNotExist
-			return 0, err
+			return err
 		}
 
-		err = v.readWorker(ctx, key, streamer)
+		err = v.readWorker(ctx, key, w)
 		if err != nil {
 			v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
 			err = v.translateError(err)
-			return 0, err
+			return err
 		}
 	}
-	err = streamer.Close()
-	if err != nil {
-		return 0, v.translateError(err)
-	}
-	return streamer.Wrote(), nil
+	return nil
 }
 
 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index d814949f44..fb68e1c057 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -221,7 +221,7 @@ func (s *stubbedS3Suite) TestStats(c *check.C) {
 	c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
 
 	loc := "acbd18db4cc2f85cedef654fccc4a4d8"
-	_, err := v.BlockRead(context.Background(), loc, io.Discard)
+	err := v.BlockRead(context.Background(), loc, brdiscard)
 	c.Check(err, check.NotNil)
 	c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
 	c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
@@ -232,9 +232,9 @@ func (s *stubbedS3Suite) TestStats(c *check.C) {
 	c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
 	c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
 
-	_, err = v.BlockRead(context.Background(), loc, io.Discard)
+	err = v.BlockRead(context.Background(), loc, brdiscard)
 	c.Check(err, check.IsNil)
-	_, err = v.BlockRead(context.Background(), loc, io.Discard)
+	err = v.BlockRead(context.Background(), loc, brdiscard)
 	c.Check(err, check.IsNil)
 	c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
@@ -261,8 +261,7 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 
 func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
 	s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
-		_, err := v.BlockRead(ctx, fooHash, io.Discard)
-		return err
+		return v.BlockRead(ctx, fooHash, brdiscard)
 	})
 }
 
@@ -480,7 +479,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
 
 			// Check canGet
 			loc, blk := setupScenario()
-			_, err := v.BlockRead(context.Background(), loc, io.Discard)
+			err := v.BlockRead(context.Background(), loc, brdiscard)
 			c.Check(err == nil, check.Equals, scenario.canGet)
 			if err != nil {
 				c.Check(os.IsNotExist(err), check.Equals, true)
@@ -490,7 +489,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
 			loc, _ = setupScenario()
 			err = v.BlockTrash(loc)
 			c.Check(err == nil, check.Equals, scenario.canTrash)
-			_, err = v.BlockRead(context.Background(), loc, io.Discard)
+			err = v.BlockRead(context.Background(), loc, brdiscard)
 			c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
 			if err != nil {
 				c.Check(os.IsNotExist(err), check.Equals, true)
@@ -505,7 +504,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
 				// should be able to Get after Untrash --
 				// regardless of timestamps, errors, race
 				// conditions, etc.
-				_, err = v.BlockRead(context.Background(), loc, io.Discard)
+				err = v.BlockRead(context.Background(), loc, brdiscard)
 				c.Check(err, check.IsNil)
 			}
 
diff --git a/services/keepstore/streamwriterat.go b/services/keepstore/streamwriterat.go
index 3426dadc1f..02dce6e216 100644
--- a/services/keepstore/streamwriterat.go
+++ b/services/keepstore/streamwriterat.go
@@ -19,10 +19,10 @@ import (
 // streamWriterAt writes the data to the provided io.Writer in
 // sequential order.
 //
-// streamWriterAt can also be used as an asynchronous buffer: the
-// caller can use the io.Writer interface to write into a memory
-// buffer and return without waiting for the wrapped writer to catch
-// up.
+// streamWriterAt can also be wrapped with an io.OffsetWriter to
+// provide an asynchronous buffer: the caller can use the io.Writer
+// interface to write into a memory buffer and return without waiting
+// for the wrapped writer to catch up.
 //
 // Close returns when all data has been written through.
 type streamWriterAt struct {
@@ -87,14 +87,7 @@ func (swa *streamWriterAt) writeToWriter() {
 	}
 }
 
-// Write implements io.Writer.
-func (swa *streamWriterAt) Write(p []byte) (int, error) {
-	n, err := swa.WriteAt(p, int64(swa.writepos))
-	swa.writepos += n
-	return n, err
-}
-
-// WriteAt implements io.WriterAt.
+// WriteAt implements io.WriterAt. WriteAt is goroutine-safe.
 func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
 	pos := int(offset)
 	n := 0
diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go
index f652a50023..92cf12ac18 100644
--- a/services/keepstore/unix_volume.go
+++ b/services/keepstore/unix_volume.go
@@ -198,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) {
 	return fi.ModTime(), nil
 }
 
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
-	if err := v.lock(ctx); err != nil {
-		return err
-	}
-	defer v.unlock()
-	f, err := v.os.Open(path)
-	if err != nil {
-		return err
-	}
-	defer f.Close()
-	return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
 // stat is os.Stat() with some extra sanity checks.
 func (v *unixVolume) stat(path string) (os.FileInfo, error) {
 	stat, err := v.os.Stat(path)
@@ -227,41 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) {
 }
 
 // BlockRead reads a block from the volume.
-func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
 	path := v.blockPath(hash)
 	stat, err := v.stat(path)
 	if err != nil {
-		return 0, v.translateError(err)
+		return v.translateError(err)
 	}
-	var streamer *streamWriterAt
-	if v.locker != nil {
-		buf, err := v.bufferPool.GetContext(ctx)
-		if err != nil {
-			return 0, err
-		}
-		defer v.bufferPool.Put(buf)
-		streamer = newStreamWriterAt(w, 65536, buf)
-		defer streamer.Close()
-		w = streamer
-	}
-	var n int64
-	err = v.getFunc(ctx, path, func(rdr io.Reader) error {
-		n, err = io.Copy(w, rdr)
-		if err == nil && n != stat.Size() {
-			err = io.ErrUnexpectedEOF
-		}
+	if err := v.lock(ctx); err != nil {
+		return err
+	}
+	defer v.unlock()
+	f, err := v.os.Open(path)
+	if err != nil {
 		return err
-	})
-	if streamer != nil {
-		// If we're using the streamer (and there's no error
-		// so far) flush any remaining buffered data now that
-		// getFunc has released the serialize lock.
-		if err == nil {
-			err = streamer.Close()
-		}
-		return streamer.WroteAt(), err
 	}
-	return int(n), err
+	defer f.Close()
+	src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+	dst := io.NewOffsetWriter(w, 0)
+	n, err := io.Copy(dst, src)
+	if err == nil && n != stat.Size() {
+		err = io.ErrUnexpectedEOF
+	}
+	return err
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its
diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go
index 715e23a9ea..bcdb5f6358 100644
--- a/services/keepstore/unix_volume_test.go
+++ b/services/keepstore/unix_volume_test.go
@@ -8,9 +8,7 @@ import (
 	"bytes"
 	"context"
 	"encoding/json"
-	"errors"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"os"
 	"sync"
@@ -123,16 +121,9 @@ func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
 	defer v.Teardown()
 	v.BlockWrite(context.Background(), TestHash, TestBlock)
 
-	buf := bytes.NewBuffer(nil)
-	_, err := v.BlockRead(context.Background(), TestHash2, buf)
-	switch {
-	case os.IsNotExist(err):
-		break
-	case err == nil:
-		c.Errorf("Read should have failed, returned %+q", buf.Bytes())
-	default:
-		c.Errorf("Read expected ErrNotExist, got: %s", err)
-	}
+	buf := &brbuffer{}
+	err := v.BlockRead(context.Background(), TestHash2, buf)
+	c.Check(err, check.FitsTypeOf, os.ErrNotExist)
 }
 
 func (s *unixVolumeSuite) TestPut(c *check.C) {
@@ -182,94 +173,6 @@ func (s *unixVolumeSuite) TestIsFull(c *check.C) {
 	}
 }
 
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
-	v := s.newTestableUnixVolume(c, s.params, false)
-	defer v.Teardown()
-
-	v.BlockWrite(context.Background(), TestHash, TestBlock)
-	mockErr := errors.New("Mock error")
-	err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-		return mockErr
-	})
-	if err != mockErr {
-		c.Errorf("Got %v, expected %v", err, mockErr)
-	}
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
-	v := s.newTestableUnixVolume(c, s.params, false)
-	defer v.Teardown()
-
-	funcCalled := false
-	err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-		funcCalled = true
-		return nil
-	})
-	if err == nil {
-		c.Errorf("Expected error opening non-existent file")
-	}
-	if funcCalled {
-		c.Errorf("Worker func should not have been called")
-	}
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
-	v := s.newTestableUnixVolume(c, s.params, false)
-	defer v.Teardown()
-
-	v.BlockWrite(context.Background(), TestHash, TestBlock)
-
-	mtx := NewMockMutex()
-	v.locker = mtx
-
-	funcCalled := make(chan struct{})
-	go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-		funcCalled <- struct{}{}
-		return nil
-	})
-	select {
-	case mtx.AllowLock <- struct{}{}:
-	case <-funcCalled:
-		c.Fatal("Function was called before mutex was acquired")
-	case <-time.After(5 * time.Second):
-		c.Fatal("Timed out before mutex was acquired")
-	}
-	select {
-	case <-funcCalled:
-	case mtx.AllowUnlock <- struct{}{}:
-		c.Fatal("Mutex was released before function was called")
-	case <-time.After(5 * time.Second):
-		c.Fatal("Timed out waiting for funcCalled")
-	}
-	select {
-	case mtx.AllowUnlock <- struct{}{}:
-	case <-time.After(5 * time.Second):
-		c.Fatal("Timed out waiting for getFunc() to release mutex")
-	}
-}
-
-type MockMutex struct {
-	AllowLock   chan struct{}
-	AllowUnlock chan struct{}
-}
-
-func NewMockMutex() *MockMutex {
-	return &MockMutex{
-		AllowLock:   make(chan struct{}),
-		AllowUnlock: make(chan struct{}),
-	}
-}
-
-// Lock waits for someone to send to AllowLock.
-func (m *MockMutex) Lock() {
-	<-m.AllowLock
-}
-
-// Unlock waits for someone to send to AllowUnlock.
-func (m *MockMutex) Unlock() {
-	<-m.AllowUnlock
-}
-
 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
 	v := s.newTestableUnixVolume(c, s.params, true)
 	defer v.Teardown()
@@ -300,9 +203,10 @@ func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
 		time.Sleep(50 * time.Millisecond)
 		cancel()
 	}()
-	n, err := v.BlockRead(ctx, TestHash, io.Discard)
-	if n > 0 || err != context.Canceled {
-		c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err)
+	buf := &brbuffer{}
+	err = v.BlockRead(ctx, TestHash, buf)
+	if buf.Len() != 0 || err != context.Canceled {
+		c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err)
 	}
 }
 
@@ -317,7 +221,7 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
 	c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
 	c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
 
-	_, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
+	err := vol.BlockRead(context.Background(), fooHash, brdiscard)
 	c.Check(err, check.NotNil)
 	c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
 	c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
@@ -339,8 +243,8 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
 	c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
 	c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
 
-	buf := bytes.NewBuffer(nil)
-	_, err = vol.BlockRead(context.Background(), fooHash, buf)
+	buf := &brbuffer{}
+	err = vol.BlockRead(context.Background(), fooHash, buf)
 	c.Check(err, check.IsNil)
 	c.Check(buf.String(), check.Equals, "foo")
 	c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index a0b6fda7d3..f1b6781da6 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -15,7 +15,7 @@ import (
 
 // volume is the interface to a back-end storage device.
 type volume interface {
-	BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+	BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error
 	BlockWrite(ctx context.Context, hash string, data []byte) error
 	DeviceID() string
 	BlockTouch(hash string) error
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 22667743dd..16084058b7 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -9,7 +9,6 @@ import (
 	"context"
 	"crypto/md5"
 	"fmt"
-	"io"
 	"os"
 	"regexp"
 	"sort"
@@ -127,8 +126,8 @@ func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
 		t.Error(err)
 	}
 
-	buf := bytes.NewBuffer(nil)
-	_, err = v.BlockRead(context.Background(), TestHash, buf)
+	buf := &brbuffer{}
+	err = v.BlockRead(context.Background(), TestHash, buf)
 	if err != nil {
 		t.Error(err)
 	}
@@ -144,7 +143,7 @@ func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFact
 	v := s.newVolume(t, factory)
 	defer v.Teardown()
 
-	if _, err := v.BlockRead(context.Background(), barHash, io.Discard); err == nil {
+	if err := v.BlockRead(context.Background(), barHash, brdiscard); err == nil {
 		t.Errorf("Expected error while getting non-existing block %v", barHash)
 	}
 }
@@ -177,8 +176,8 @@ func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory Test
 	v.BlockWrite(context.Background(), testHash, testDataA)
 
 	putErr := v.BlockWrite(context.Background(), testHash, testDataB)
-	buf := bytes.NewBuffer(nil)
-	_, getErr := v.BlockRead(context.Background(), testHash, buf)
+	buf := &brbuffer{}
+	getErr := v.BlockRead(context.Background(), testHash, buf)
 	if putErr == nil {
 		// Put must not return a nil error unless it has
 		// overwritten the existing data.
@@ -217,8 +216,8 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
 		t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
 	}
 
-	buf := bytes.NewBuffer(nil)
-	_, err = v.BlockRead(context.Background(), TestHash, buf)
+	buf := &brbuffer{}
+	err = v.BlockRead(context.Background(), TestHash, buf)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -228,7 +227,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
 	}
 
 	buf.Reset()
-	_, err = v.BlockRead(context.Background(), TestHash2, buf)
+	err = v.BlockRead(context.Background(), TestHash2, buf)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -238,7 +237,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
 	}
 
 	buf.Reset()
-	_, err = v.BlockRead(context.Background(), TestHash3, buf)
+	err = v.BlockRead(context.Background(), TestHash3, buf)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -404,8 +403,8 @@ func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFact
 	if err := v.BlockTrash(TestHash); err != nil {
 		t.Error(err)
 	}
-	buf := bytes.NewBuffer(nil)
-	_, err := v.BlockRead(context.Background(), TestHash, buf)
+	buf := &brbuffer{}
+	err := v.BlockRead(context.Background(), TestHash, buf)
 	if err != nil {
 		t.Error(err)
 	} else if buf.String() != string(TestBlock) {
@@ -428,7 +427,7 @@ func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFact
 	if err := v.BlockTrash(TestHash); err != nil {
 		t.Error(err)
 	}
-	if _, err := v.BlockRead(context.Background(), TestHash, io.Discard); err == nil || !os.IsNotExist(err) {
+	if err := v.BlockRead(context.Background(), TestHash, brdiscard); err == nil || !os.IsNotExist(err) {
 		t.Errorf("os.IsNotExist(%v) should have been true", err)
 	}
 
@@ -517,7 +516,7 @@ func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVo
 		v.BlockWrite(context.Background(), TestHash, TestBlock)
 	}
 
-	_, err = v.BlockRead(context.Background(), TestHash, io.Discard)
+	err = v.BlockRead(context.Background(), TestHash, brdiscard)
 	if err != nil {
 		t.Error(err)
 	}
@@ -546,8 +545,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
 
 	sem := make(chan int)
 	go func() {
-		buf := bytes.NewBuffer(nil)
-		_, err := v.BlockRead(context.Background(), TestHash, buf)
+		buf := &brbuffer{}
+		err := v.BlockRead(context.Background(), TestHash, buf)
 		if err != nil {
 			t.Errorf("err1: %v", err)
 		}
@@ -558,8 +557,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
 	}()
 
 	go func() {
-		buf := bytes.NewBuffer(nil)
-		_, err := v.BlockRead(context.Background(), TestHash2, buf)
+		buf := &brbuffer{}
+		err := v.BlockRead(context.Background(), TestHash2, buf)
 		if err != nil {
 			t.Errorf("err2: %v", err)
 		}
@@ -570,8 +569,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
 	}()
 
 	go func() {
-		buf := bytes.NewBuffer(nil)
-		_, err := v.BlockRead(context.Background(), TestHash3, buf)
+		buf := &brbuffer{}
+		err := v.BlockRead(context.Background(), TestHash3, buf)
 		if err != nil {
 			t.Errorf("err3: %v", err)
 		}
@@ -619,8 +618,8 @@ func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFacto
 
 	// Check that we actually wrote the blocks.
 	for _, blk := range blks {
-		buf := bytes.NewBuffer(nil)
-		_, err := v.BlockRead(context.Background(), blk.hash, buf)
+		buf := &brbuffer{}
+		err := v.BlockRead(context.Background(), blk.hash, buf)
 		if err != nil {
 			t.Errorf("get %s: %v", blk.hash, err)
 		} else if buf.String() != string(blk.data) {
@@ -644,13 +643,13 @@ func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactor
 		t.Error(err)
 	}
 
-	buf := bytes.NewBuffer(nil)
-	_, err = v.BlockRead(context.Background(), hash, buf)
+	buf := &brbuffer{}
+	err = v.BlockRead(context.Background(), hash, buf)
 	if err != nil {
 		t.Error(err)
 	}
 	if buf.String() != string(wdata) {
-		t.Error("buf %+q != wdata %+q", buf, wdata)
+		t.Errorf("buf (len %d) != wdata (len %d)", buf.Len(), len(wdata))
 	}
 }
 
@@ -668,8 +667,8 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
 	v.BlockWrite(context.Background(), TestHash, TestBlock)
 	v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
 
-	buf := bytes.NewBuffer(nil)
-	_, err := v.BlockRead(context.Background(), TestHash, buf)
+	buf := &brbuffer{}
+	err := v.BlockRead(context.Background(), TestHash, buf)
 	if err != nil {
 		t.Error(err)
 	}
@@ -684,7 +683,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
 		return
 	}
 	buf.Reset()
-	_, err = v.BlockRead(context.Background(), TestHash, buf)
+	err = v.BlockRead(context.Background(), TestHash, buf)
 	if err == nil || !os.IsNotExist(err) {
 		t.Errorf("os.IsNotExist(%v) should have been true", err)
 	}
@@ -697,7 +696,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
 
 	// Get the block - after trash and untrash sequence
 	buf.Reset()
-	_, err = v.BlockRead(context.Background(), TestHash, buf)
+	err = v.BlockRead(context.Background(), TestHash, buf)
 	if err != nil {
 		t.Error(err)
 	}
@@ -712,8 +711,8 @@ func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVo
 	defer v.Teardown()
 
 	checkGet := func() error {
-		buf := bytes.NewBuffer(nil)
-		_, err := v.BlockRead(context.Background(), TestHash, buf)
+		buf := &brbuffer{}
+		err := v.BlockRead(context.Background(), TestHash, buf)
 		if err != nil {
 			return err
 		}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 5a17b3a7dc..f64041b048 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -5,6 +5,7 @@
 package keepstore
 
 import (
+	"sync"
 	"time"
 )
 
@@ -38,3 +39,52 @@ type TestableVolume interface {
 	// Clean up, delete temporary files.
 	Teardown()
 }
+
+// brbuffer is like bytes.Buffer, but it implements io.WriterAt.
+// Convenient for testing (volume)BlockRead implementations.
+type brbuffer struct {
+	mtx sync.Mutex
+	buf []byte
+}
+
+func (b *brbuffer) WriteAt(p []byte, offset int64) (int, error) {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	if short := int(offset) + len(p) - len(b.buf); short > 0 {
+		b.buf = append(b.buf, make([]byte, short)...)
+	}
+	return copy(b.buf[offset:], p), nil
+}
+
+func (b *brbuffer) Bytes() []byte {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	return b.buf
+}
+
+func (b *brbuffer) String() string {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	return string(b.buf)
+}
+
+func (b *brbuffer) Len() int {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	return len(b.buf)
+}
+
+func (b *brbuffer) Reset() {
+	b.mtx.Lock()
+	defer b.mtx.Unlock()
+	b.buf = nil
+}
+
+// a brdiscarder is like io.Discard, but it implements
+// io.WriterAt. Convenient for testing (volume)BlockRead
+// implementations when the output is not checked.
+type brdiscarder struct{}
+
+func (brdiscarder) WriteAt(p []byte, offset int64) (int, error) { return len(p), nil }
+
+var brdiscard = brdiscarder{}

commit 7ef752823f118079af629604ac29143e7c156687
Author: Tom Clegg <tom at curii.com>
Date:   Wed Feb 14 20:25:04 2024 -0500

    2960: Buffer reads when serialize enabled on unix volume.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/streamwriterat.go b/services/keepstore/streamwriterat.go
index 365b55f233..3426dadc1f 100644
--- a/services/keepstore/streamwriterat.go
+++ b/services/keepstore/streamwriterat.go
@@ -19,10 +19,16 @@ import (
 // streamWriterAt writes the data to the provided io.Writer in
 // sequential order.
 //
+// streamWriterAt can also be used as an asynchronous buffer: the
+// caller can use the io.Writer interface to write into a memory
+// buffer and return without waiting for the wrapped writer to catch
+// up.
+//
 // Close returns when all data has been written through.
 type streamWriterAt struct {
 	writer     io.Writer
 	buf        []byte
+	writepos   int         // target offset if Write is called
 	partsize   int         // size of each part written through to writer
 	endpos     int         // portion of buf actually used, judging by WriteAt calls so far
 	partfilled []int       // number of bytes written to each part so far
@@ -81,6 +87,13 @@ func (swa *streamWriterAt) writeToWriter() {
 	}
 }
 
+// Write implements io.Writer.
+func (swa *streamWriterAt) Write(p []byte) (int, error) {
+	n, err := swa.WriteAt(p, int64(swa.writepos))
+	swa.writepos += n
+	return n, err
+}
+
 // WriteAt implements io.WriterAt.
 func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
 	pos := int(offset)
diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go
index f01ad97553..f652a50023 100644
--- a/services/keepstore/unix_volume.go
+++ b/services/keepstore/unix_volume.go
@@ -33,11 +33,12 @@ func init() {
 
 func newUnixVolume(params newVolumeParams) (volume, error) {
 	v := &unixVolume{
-		uuid:    params.UUID,
-		cluster: params.Cluster,
-		volume:  params.ConfigVolume,
-		logger:  params.Logger,
-		metrics: params.MetricsVecs,
+		uuid:       params.UUID,
+		cluster:    params.Cluster,
+		volume:     params.ConfigVolume,
+		logger:     params.Logger,
+		metrics:    params.MetricsVecs,
+		bufferPool: params.BufferPool,
 	}
 	err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
 	if err != nil {
@@ -71,11 +72,12 @@ type unixVolume struct {
 	Root      string // path to the volume's root directory
 	Serialize bool
 
-	uuid    string
-	cluster *arvados.Cluster
-	volume  arvados.Volume
-	logger  logrus.FieldLogger
-	metrics *volumeMetricsVecs
+	uuid       string
+	cluster    *arvados.Cluster
+	volume     arvados.Volume
+	logger     logrus.FieldLogger
+	metrics    *volumeMetricsVecs
+	bufferPool *bufferPool
 
 	// something to lock during IO, typically a sync.Mutex (or nil
 	// to skip locking)
@@ -231,6 +233,17 @@ func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i
 	if err != nil {
 		return 0, v.translateError(err)
 	}
+	var streamer *streamWriterAt
+	if v.locker != nil {
+		buf, err := v.bufferPool.GetContext(ctx)
+		if err != nil {
+			return 0, err
+		}
+		defer v.bufferPool.Put(buf)
+		streamer = newStreamWriterAt(w, 65536, buf)
+		defer streamer.Close()
+		w = streamer
+	}
 	var n int64
 	err = v.getFunc(ctx, path, func(rdr io.Reader) error {
 		n, err = io.Copy(w, rdr)
@@ -239,6 +252,15 @@ func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i
 		}
 		return err
 	})
+	if streamer != nil {
+		// If we're using the streamer (and there's no error
+		// so far) flush any remaining buffered data now that
+		// getFunc has released the serialize lock.
+		if err == nil {
+			err = streamer.Close()
+		}
+		return streamer.WroteAt(), err
+	}
 	return int(n), err
 }
 
diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go
index de8d3c42d8..715e23a9ea 100644
--- a/services/keepstore/unix_volume_test.go
+++ b/services/keepstore/unix_volume_test.go
@@ -78,13 +78,14 @@ func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumePara
 	}
 	v := &testableUnixVolume{
 		unixVolume: unixVolume{
-			Root:    d,
-			locker:  locker,
-			uuid:    params.UUID,
-			cluster: params.Cluster,
-			logger:  params.Logger,
-			volume:  params.ConfigVolume,
-			metrics: params.MetricsVecs,
+			Root:       d,
+			locker:     locker,
+			uuid:       params.UUID,
+			cluster:    params.Cluster,
+			logger:     params.Logger,
+			volume:     params.ConfigVolume,
+			metrics:    params.MetricsVecs,
+			bufferPool: params.BufferPool,
 		},
 		t: c,
 	}

commit 99e43aaefc4a76908fd5f649edf0512c3800e021
Author: Tom Clegg <tom at curii.com>
Date:   Wed Feb 14 17:21:27 2024 -0500

    2960: Finish renaming s3aws_volume to s3_volume.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3_volume.go
similarity index 99%
rename from services/keepstore/s3aws_volume.go
rename to services/keepstore/s3_volume.go
index 8e93eed12c..bd79d49e16 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -65,7 +65,7 @@ type s3Volume struct {
 	logger     logrus.FieldLogger
 	metrics    *volumeMetricsVecs
 	bufferPool *bufferPool
-	bucket     *s3AWSbucket
+	bucket     *s3Bucket
 	region     string
 	startOnce  sync.Once
 }
@@ -73,7 +73,7 @@ type s3Volume struct {
 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
 // wrapped bucket can be replaced atomically with SetBucket in order
 // to update credentials.
-type s3AWSbucket struct {
+type s3Bucket struct {
 	bucket string
 	svc    *s3.Client
 	stats  s3awsbucketStats
@@ -222,7 +222,7 @@ func (v *s3Volume) check(ec2metadataHostname string) error {
 
 	cfg.Credentials = creds
 
-	v.bucket = &s3AWSbucket{
+	v.bucket = &s3Bucket{
 		bucket: v.Bucket,
 		svc:    s3.New(cfg),
 	}
@@ -538,7 +538,7 @@ func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) err
 
 type s3awsLister struct {
 	Logger            logrus.FieldLogger
-	Bucket            *s3AWSbucket
+	Bucket            *s3Bucket
 	Prefix            string
 	PageSize          int
 	Stats             *s3awsbucketStats
@@ -768,7 +768,7 @@ func (v *s3Volume) checkRaceWindow(key string) error {
 	return nil
 }
 
-func (b *s3AWSbucket) Del(path string) error {
+func (b *s3Bucket) Del(path string) error {
 	input := &s3.DeleteObjectInput{
 		Bucket: aws.String(b.bucket),
 		Key:    aws.String(path),
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3_volume_test.go
similarity index 94%
rename from services/keepstore/s3aws_volume_test.go
rename to services/keepstore/s3_volume_test.go
index d9dcbc52d6..d814949f44 100644
--- a/services/keepstore/s3aws_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -32,7 +32,7 @@ import (
 )
 
 const (
-	S3AWSTestBucketName = "testbucket"
+	s3TestBucketName = "testbucket"
 )
 
 type s3AWSFakeClock struct {
@@ -50,18 +50,18 @@ func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
 	return c.Now().Sub(t)
 }
 
-var _ = check.Suite(&StubbedS3AWSSuite{})
+var _ = check.Suite(&stubbedS3Suite{})
 
 var srv httptest.Server
 
-type StubbedS3AWSSuite struct {
+type stubbedS3Suite struct {
 	s3server *httptest.Server
 	metadata *httptest.Server
 	cluster  *arvados.Cluster
 	volumes  []*testableS3Volume
 }
 
-func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
+func (s *stubbedS3Suite) SetUpTest(c *check.C) {
 	s.s3server = nil
 	s.metadata = nil
 	s.cluster = testCluster(c)
@@ -71,7 +71,7 @@ func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
 	}
 }
 
-func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
+func (s *stubbedS3Suite) TestGeneric(c *check.C) {
 	DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
 		// Use a negative raceWindow so s3test's 1-second
 		// timestamp precision doesn't confuse fixRace.
@@ -79,13 +79,13 @@ func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
 	})
 }
 
-func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
+func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
 	DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
 		return s.newTestableVolume(c, params, -2*time.Second)
 	})
 }
 
-func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
+func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
 	DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
 		v := s.newTestableVolume(c, params, -2*time.Second)
 		v.PrefixLength = 3
@@ -93,7 +93,7 @@ func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
 	})
 }
 
-func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
+func (s *stubbedS3Suite) TestIndex(c *check.C) {
 	v := s.newTestableVolume(c, newVolumeParams{
 		Cluster:      s.cluster,
 		ConfigVolume: arvados.Volume{Replication: 2},
@@ -124,7 +124,7 @@ func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
 	}
 }
 
-func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
+func (s *stubbedS3Suite) TestSignature(c *check.C) {
 	var header http.Header
 	stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 		header = r.Header
@@ -155,7 +155,7 @@ func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
 	c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
 }
 
-func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
+func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
 	s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 		upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
 		exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
@@ -205,7 +205,7 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
 	c.Check(err, check.ErrorMatches, `(?s).*404.*`)
 }
 
-func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
+func (s *stubbedS3Suite) TestStats(c *check.C) {
 	v := s.newTestableVolume(c, newVolumeParams{
 		Cluster:      s.cluster,
 		ConfigVolume: arvados.Volume{Replication: 2},
@@ -259,20 +259,20 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 	http.Error(w, "nothing here", http.StatusNotFound)
 }
 
-func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
+func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
 	s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
 		_, err := v.BlockRead(ctx, fooHash, io.Discard)
 		return err
 	})
 }
 
-func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
+func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
 	s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
 		return v.BlockWrite(ctx, fooHash, []byte("foo"))
 	})
 }
 
-func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
+func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
 	handler := &s3AWSBlockingHandler{}
 	s.s3server = httptest.NewServer(handler)
 	defer s.s3server.Close()
@@ -318,7 +318,7 @@ func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.
 	}
 }
 
-func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
+func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
 	s.cluster.Collections.BlobTrashLifetime.Set("1h")
 	s.cluster.Collections.BlobSigningTTL.Set("1h")
 
@@ -559,7 +559,7 @@ func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
 	}
 }
 
-func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
+func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
 
 	clock := &s3AWSFakeClock{}
 	// fake s3
@@ -588,7 +588,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams
 				IAMRole:            iamRole,
 				AccessKeyID:        accessKey,
 				SecretAccessKey:    secretKey,
-				Bucket:             S3AWSTestBucketName,
+				Bucket:             s3TestBucketName,
 				Endpoint:           endpoint,
 				Region:             "test-region-1",
 				LocationConstraint: true,
@@ -610,7 +610,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams
 	v.s3Volume.bucket.svc.ForcePathStyle = true
 	// Create the testbucket
 	input := &s3.CreateBucketInput{
-		Bucket: aws.String(S3AWSTestBucketName),
+		Bucket: aws.String(s3TestBucketName),
 	}
 	req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
 	_, err := req.Send(context.Background())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list