[arvados] updated: 2.7.0-6006-g022107bd52
git repository hosting
git at public.arvados.org
Thu Feb 15 20:44:05 UTC 2024
Summary of changes:
services/keepstore/azure_blob_volume.go | 49 +++-----
services/keepstore/azure_blob_volume_test.go | 28 ++---
services/keepstore/keepstore.go | 107 ++++++++++-------
services/keepstore/keepstore_test.go | 22 ++--
services/keepstore/router_test.go | 14 +--
.../keepstore/{s3aws_volume.go => s3_volume.go} | 41 ++-----
.../{s3aws_volume_test.go => s3_volume_test.go} | 51 ++++----
services/keepstore/streamwriterat.go | 8 +-
services/keepstore/unix_volume.go | 64 +++++-----
services/keepstore/unix_volume_test.go | 131 +++------------------
services/keepstore/volume.go | 2 +-
services/keepstore/volume_generic_test.go | 61 +++++-----
services/keepstore/volume_test.go | 50 ++++++++
13 files changed, 285 insertions(+), 343 deletions(-)
rename services/keepstore/{s3aws_volume.go => s3_volume.go} (97%)
rename services/keepstore/{s3aws_volume_test.go => s3_volume_test.go} (92%)
via 022107bd52092c658208e74161581c6bedda4a5f (commit)
via 7ef752823f118079af629604ac29143e7c156687 (commit)
via 99e43aaefc4a76908fd5f649edf0512c3800e021 (commit)
from 39f6e9f70f683237d9488faac1c549ca19ac9dae (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 022107bd52092c658208e74161581c6bedda4a5f
Author: Tom Clegg <tom at curii.com>
Date: Thu Feb 15 15:20:41 2024 -0500
2960: Move streaming from volume to keepstore layer.
Avoids using 2x buffers when comparing existing data during
BlockWrite.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 31660614f3..2c8a79350c 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -147,24 +147,22 @@ func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a BlockWrite operation is in progress,
// and wait for it to finish writing.
-func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
trashed, _, err := v.checkTrashed(hash)
if err != nil {
- return 0, err
+ return err
}
if trashed {
- return 0, os.ErrNotExist
+ return os.ErrNotExist
}
buf, err := v.bufferPool.GetContext(ctx)
if err != nil {
- return 0, err
+ return err
}
defer v.bufferPool.Put(buf)
- streamer := newStreamWriterAt(writeTo, 65536, buf)
- defer streamer.Close()
var deadline time.Time
- size, err := v.get(ctx, hash, streamer)
- for err == nil && size == 0 && streamer.WroteAt() == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
+ wrote, err := v.get(ctx, hash, w)
+ for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
// Seeing a brand new empty block probably means we're
// in a race with CreateBlob, which under the hood
// (apparently) does "CreateEmpty" and "CommitData"
@@ -185,20 +183,15 @@ func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, writeTo io
}
select {
case <-ctx.Done():
- return 0, ctx.Err()
+ return ctx.Err()
case <-time.After(v.WriteRacePollTime.Duration()):
}
- size, err = v.get(ctx, hash, streamer)
+ wrote, err = v.get(ctx, hash, w)
}
if !deadline.IsZero() {
- ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
- }
- if err != nil {
- streamer.Close()
- return streamer.Wrote(), err
+ ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote)
}
- err = streamer.Close()
- return streamer.Wrote(), err
+ return err
}
func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
@@ -212,6 +205,7 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
pieces := 1
expectSize := BlockSize
+ sizeKnown := false
if pieceSize < BlockSize {
// Unfortunately the handler doesn't tell us how long
// the blob is expected to be, so we have to ask
@@ -225,15 +219,15 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
}
expectSize = int(props.ContentLength)
pieces = (expectSize + pieceSize - 1) / pieceSize
+ sizeKnown = true
}
if expectSize == 0 {
return 0, nil
}
- // We'll update this actualSize if/when we get the last piece.
- actualSize := -1
errors := make(chan error, pieces)
+ var wrote atomic.Int64
var wg sync.WaitGroup
wg.Add(pieces)
for p := 0; p < pieces; p++ {
@@ -289,31 +283,24 @@ func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt)
rdr.Close()
}()
n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos))
- if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+ wrote.Add(n)
+ if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) {
// If we don't know the actual size,
// and just tried reading 64 MiB, it's
// normal to encounter EOF.
} else if err != nil {
- if ctx.Err() == nil {
- errors <- err
- }
+ errors <- err
cancel()
return
}
- if p == pieces-1 {
- actualSize = startPos + int(n)
- }
}(p)
}
wg.Wait()
close(errors)
if len(errors) > 0 {
- return 0, v.translateError(<-errors)
- }
- if ctx.Err() != nil {
- return 0, ctx.Err()
+ return int(wrote.Load()), v.translateError(<-errors)
}
- return actualSize, nil
+ return int(wrote.Load()), ctx.Err()
}
// BlockWrite stores a block on the volume. If it already exists, its
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index c629c9dc15..b8acd980a1 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -13,7 +13,6 @@ import (
"encoding/xml"
"flag"
"fmt"
- "io"
"io/ioutil"
"math/rand"
"net"
@@ -490,15 +489,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeRangeFenceposts(c *check.C) {
if err != nil {
c.Error(err)
}
- gotData := bytes.NewBuffer(nil)
- gotLen, err := v.BlockRead(context.Background(), hash, gotData)
+ gotData := &brbuffer{}
+ err = v.BlockRead(context.Background(), hash, gotData)
if err != nil {
c.Error(err)
}
gotHash := fmt.Sprintf("%x", md5.Sum(gotData.Bytes()))
- if gotLen != size {
- c.Errorf("length mismatch: got %d != %d", gotLen, size)
- }
+ c.Check(gotData.Len(), check.Equals, size)
if gotHash != hash {
c.Errorf("hash mismatch: got %s != %s", gotHash, hash)
}
@@ -532,7 +529,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRace(c *check.C) {
wg.Add(1)
go func() {
defer wg.Done()
- _, err := v.BlockRead(context.Background(), TestHash, io.Discard)
+ err := v.BlockRead(context.Background(), TestHash, brdiscard)
if err != nil {
c.Error(err)
}
@@ -570,15 +567,13 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
allDone := make(chan struct{})
go func() {
defer close(allDone)
- buf := bytes.NewBuffer(nil)
- n, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
c.Error(err)
return
}
- if n != 0 {
- c.Errorf("Got %+q (n=%d), expected empty buf", buf.Bytes(), n)
- }
+ c.Check(buf.String(), check.Equals, "")
}()
select {
case <-allDone:
@@ -596,8 +591,7 @@ func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeCreateBlobRaceDeadline(c *che
func (s *stubbedAzureBlobSuite) TestAzureBlobVolumeContextCancelBlockRead(c *check.C) {
s.testAzureBlobVolumeContextCancel(c, func(ctx context.Context, v *testableAzureBlobVolume) error {
v.BlockWriteRaw(TestHash, TestBlock)
- _, err := v.BlockRead(ctx, TestHash, io.Discard)
- return err
+ return v.BlockRead(ctx, TestHash, brdiscard)
})
}
@@ -667,7 +661,7 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := volume.BlockRead(context.Background(), loc, io.Discard)
+ err := volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
@@ -679,9 +673,9 @@ func (s *stubbedAzureBlobSuite) TestStats(c *check.C) {
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
- _, err = volume.BlockRead(context.Background(), loc, io.Discard)
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
- _, err = volume.BlockRead(context.Background(), loc, io.Discard)
+ err = volume.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 62b6d15e56..c9a8023059 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -243,13 +243,58 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
} else {
out = io.MultiWriter(out, hashcheck)
}
+
+ buf, err := ks.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer ks.bufferPool.Put(buf)
+ streamer := newStreamWriterAt(out, 65536, buf)
+ defer streamer.Close()
+
var errToCaller error = os.ErrNotExist
for _, mnt := range ks.rendezvous(li.hash, ks.mountsR) {
if ctx.Err() != nil {
return 0, ctx.Err()
}
- n, err = mnt.BlockRead(ctx, li.hash, out)
- if err == nil && li.size > 0 && n != li.size {
+ err := mnt.BlockRead(ctx, li.hash, streamer)
+ if err != nil {
+ if streamer.WroteAt() != 0 {
+ // BlockRead encountered an error
+ // after writing some data, so it's
+ // too late to try another
+ // volume. Flush streamer before
+ // calling Wrote() to ensure our
+ // return value accurately reflects
+ // the number of bytes written to
+ // opts.WriteTo.
+ streamer.Close()
+ return streamer.Wrote(), err
+ }
+ if !os.IsNotExist(err) {
+ errToCaller = err
+ }
+ continue
+ }
+ if li.size == 0 {
+ // hashCheckingWriter isn't in use because we
+ // don't know the expected size. All we can do
+ // is check after writing all the data, and
+ // trust the caller is doing a HEAD request so
+ // it's not too late to set an error code in
+ // the response header.
+ err = streamer.Close()
+ if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash && err == nil {
+ err = errChecksum
+ }
+ if rw, ok := opts.WriteTo.(http.ResponseWriter); ok {
+ // We didn't set the content-length header
+ // above because we didn't know the block size
+ // until now.
+ rw.Header().Set("Content-Length", fmt.Sprintf("%d", streamer.WroteAt()))
+ }
+ return streamer.WroteAt(), err
+ } else if streamer.WroteAt() != li.size {
// If the backend read fewer bytes than
// expected but returns no error, we can
// classify this as a checksum error (even
@@ -260,42 +305,17 @@ func (ks *keepstore) BlockRead(ctx context.Context, opts arvados.BlockReadOption
// it anyway, but if it's a HEAD request the
// caller can still change the response status
// code.
- return n, errChecksum
- }
- if err == nil && li.size == 0 {
- // hashCheckingWriter isn't in use because we
- // don't know the expected size. All we can do
- // is check after writing all the data, and
- // trust the caller is doing a HEAD request so
- // it's not too late to set an error code in
- // the response header.
- if hash := fmt.Sprintf("%x", hashcheck.Sum(nil)); hash != li.hash {
- return n, errChecksum
- }
- }
- if rw, ok := opts.WriteTo.(http.ResponseWriter); ok && li.size == 0 && err == nil {
- // We didn't set the content-length header
- // above because we didn't know the block size
- // until now.
- rw.Header().Set("Content-Length", fmt.Sprintf("%d", n))
- }
- if n > 0 || err == nil {
- // success, or there's an error but we can't
- // retry because we've already sent some data.
- return n, err
- }
- if !os.IsNotExist(err) {
- // If some volume returns a transient error,
- // return it to the caller instead of "Not
- // found" so it can retry.
- errToCaller = err
+ return streamer.WroteAt(), errChecksum
}
+ // Ensure streamer flushes all buffered data without
+ // errors.
+ err = streamer.Close()
+ return streamer.Wrote(), err
}
return 0, errToCaller
}
func (ks *keepstore) blockReadRemote(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
- ks.logger.Infof("blockReadRemote(%s)", opts.Locator)
token := ctxToken(ctx)
if token == "" {
return 0, errNoTokenProvided
@@ -459,7 +479,7 @@ func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOpti
continue
}
cmp := &checkEqual{Expect: opts.Data}
- if _, err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+ if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
if !cmp.Equal() {
return resp, errCollision
}
@@ -564,25 +584,28 @@ func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
return mnts
}
-// checkEqual reports whether the data written to it (via io.Writer
+// checkEqual reports whether the data written to it (via io.WriterAt
// interface) is equal to the expected data.
//
// Expect should not be changed after the first Write.
+//
+// Results are undefined if WriteAt is called with overlapping ranges.
type checkEqual struct {
- Expect []byte
- equalUntil int
+ Expect []byte
+ equal atomic.Int64
+ notequal atomic.Bool
}
func (ce *checkEqual) Equal() bool {
- return ce.equalUntil == len(ce.Expect)
+ return !ce.notequal.Load() && ce.equal.Load() == int64(len(ce.Expect))
}
-func (ce *checkEqual) Write(p []byte) (int, error) {
- endpos := ce.equalUntil + len(p)
- if ce.equalUntil >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[ce.equalUntil:endpos]) {
- ce.equalUntil = endpos
+func (ce *checkEqual) WriteAt(p []byte, offset int64) (int, error) {
+ endpos := int(offset) + len(p)
+ if offset >= 0 && endpos <= len(ce.Expect) && bytes.Equal(p, ce.Expect[int(offset):endpos]) {
+ ce.equal.Add(int64(len(p)))
} else {
- ce.equalUntil = -1
+ ce.notequal.Store(true)
}
return len(p), nil
}
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 3a01476096..28049506f6 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -372,7 +372,7 @@ func (s *keepstoreSuite) TestBlockTrash(c *C) {
return ks.BlockTrash(ctx, loc)
}
checkexists := func(volidx int) bool {
- _, err := vol[volidx].BlockRead(ctx, fooHash, io.Discard)
+ err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
if !os.IsNotExist(err) {
c.Check(err, IsNil)
}
@@ -573,7 +573,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
for _, mnt := range ks.mounts {
err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
c.Assert(err, IsNil)
- _, err = mnt.BlockRead(context.Background(), fooHash, io.Discard)
+ err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, IsNil)
}
@@ -581,7 +581,7 @@ func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
c.Check(os.IsNotExist(err), Equals, true)
for _, mnt := range ks.mounts {
- _, err := mnt.BlockRead(context.Background(), fooHash, io.Discard)
+ err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, IsNil)
}
}
@@ -693,7 +693,7 @@ type stubVolume struct {
// corresponding func (if non-nil). If the func returns an
// error, that error is returned to caller. Otherwise, the
// stub continues normally.
- blockRead func(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+ blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error
blockWrite func(ctx context.Context, hash string, data []byte) error
deviceID func() string
blockTouch func(hash string) error
@@ -710,19 +710,19 @@ func (v *stubVolume) log(op, hash string) {
v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
}
-func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
v.log("read", hash)
if v.blockRead != nil {
- n, err := v.blockRead(ctx, hash, writeTo)
+ err := v.blockRead(ctx, hash, writeTo)
if err != nil {
- return n, err
+ return err
}
}
v.mtx.Lock()
ent, ok := v.data[hash]
v.mtx.Unlock()
if !ok || !ent.trash.IsZero() {
- return 0, os.ErrNotExist
+ return os.ErrNotExist
}
wrote := 0
for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
@@ -730,13 +730,13 @@ func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writ
if len(data) > writesize {
data = data[:writesize]
}
- n, err := writeTo.Write(data)
+ n, err := writeTo.WriteAt(data, int64(wrote))
wrote += n
if err != nil {
- return wrote, err
+ return err
}
}
- return wrote, nil
+ return nil
}
func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
diff --git a/services/keepstore/router_test.go b/services/keepstore/router_test.go
index f4bcdd4ae4..ee7be4768c 100644
--- a/services/keepstore/router_test.go
+++ b/services/keepstore/router_test.go
@@ -268,7 +268,7 @@ func (s *routerSuite) TestBlockTrash(c *C) {
resp := call(router, "DELETE", "http://example/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
c.Check(resp.Code, Equals, http.StatusOK)
c.Check(vol0.stubLog.String(), Matches, `(?ms).* trash .*`)
- _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+ err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, Equals, os.ErrNotExist)
}
@@ -281,12 +281,12 @@ func (s *routerSuite) TestBlockUntrash(c *C) {
c.Assert(err, IsNil)
err = vol0.BlockTrash(fooHash)
c.Assert(err, IsNil)
- _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+ err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
c.Assert(err, Equals, os.ErrNotExist)
resp := call(router, "PUT", "http://example/untrash/"+fooHash+"+3", s.cluster.SystemRootToken, nil, nil)
c.Check(resp.Code, Equals, http.StatusOK)
c.Check(vol0.stubLog.String(), Matches, `(?ms).* untrash .*`)
- _, err = vol0.BlockRead(context.Background(), fooHash, io.Discard)
+ err = vol0.BlockRead(context.Background(), fooHash, brdiscard)
c.Check(err, IsNil)
}
@@ -356,8 +356,8 @@ func (s *routerSuite) TestRequireAdminMgtToken(c *C) {
func (s *routerSuite) TestVolumeErrorStatusCode(c *C) {
router, cancel := testRouter(c, s.cluster, nil)
defer cancel()
- router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.Writer) (int, error) {
- return 0, httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
+ router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(_ context.Context, hash string, w io.WriterAt) error {
+ return httpserver.ErrorWithStatus(errors.New("test error"), http.StatusBadGateway)
}
// To test whether we fall back to volume 1 after volume 0
@@ -472,10 +472,10 @@ func (s *routerSuite) TestCancelOnDisconnect(c *C) {
defer cancel()
unblock := make(chan struct{})
- router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.Writer) (int, error) {
+ router.keepstore.mountsW[0].volume.(*stubVolume).blockRead = func(ctx context.Context, hash string, w io.WriterAt) error {
<-unblock
c.Check(ctx.Err(), NotNil)
- return 0, ctx.Err()
+ return ctx.Err()
}
go func() {
time.Sleep(time.Second / 10)
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index bd79d49e16..d4b90540ea 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -411,24 +411,13 @@ func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
// BlockRead reads a Keep block that has been stored as a block blob
// in the S3 bucket.
-func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
key := v.key(hash)
- buf, err := v.bufferPool.GetContext(ctx)
- if err != nil {
- return 0, err
- }
- defer v.bufferPool.Put(buf)
-
- streamer := newStreamWriterAt(writeTo, 65536, buf)
- defer streamer.Close()
- err = v.readWorker(ctx, key, streamer)
+ err := v.readWorker(ctx, key, w)
if err != nil {
err = v.translateError(err)
if !os.IsNotExist(err) {
- return 0, err
- }
- if streamer.WroteAt() > 0 {
- return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
+ return err
}
_, err = v.head("recent/" + key)
@@ -436,25 +425,21 @@ func (v *s3Volume) BlockRead(ctx context.Context, hash string, writeTo io.Writer
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
- return 0, err
+ return err
}
if !v.fixRace(key) {
err = os.ErrNotExist
- return 0, err
+ return err
}
- err = v.readWorker(ctx, key, streamer)
+ err = v.readWorker(ctx, key, w)
if err != nil {
v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
err = v.translateError(err)
- return 0, err
+ return err
}
}
- err = streamer.Close()
- if err != nil {
- return 0, v.translateError(err)
- }
- return streamer.Wrote(), nil
+ return nil
}
func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index d814949f44..fb68e1c057 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -221,7 +221,7 @@ func (s *stubbedS3Suite) TestStats(c *check.C) {
c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := v.BlockRead(context.Background(), loc, io.Discard)
+ err := v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
c.Check(stats(), check.Matches, `.*"s3.requestFailure 404 NoSuchKey[^"]*":[^0].*`)
@@ -232,9 +232,9 @@ func (s *stubbedS3Suite) TestStats(c *check.C) {
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
@@ -261,8 +261,7 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
- _, err := v.BlockRead(ctx, fooHash, io.Discard)
- return err
+ return v.BlockRead(ctx, fooHash, brdiscard)
})
}
@@ -480,7 +479,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
// Check canGet
loc, blk := setupScenario()
- _, err := v.BlockRead(context.Background(), loc, io.Discard)
+ err := v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
@@ -490,7 +489,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
loc, _ = setupScenario()
err = v.BlockTrash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
@@ -505,7 +504,7 @@ func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
// should be able to Get after Untrash --
// regardless of timestamps, errors, race
// conditions, etc.
- _, err = v.BlockRead(context.Background(), loc, io.Discard)
+ err = v.BlockRead(context.Background(), loc, brdiscard)
c.Check(err, check.IsNil)
}
diff --git a/services/keepstore/streamwriterat.go b/services/keepstore/streamwriterat.go
index 3426dadc1f..02dce6e216 100644
--- a/services/keepstore/streamwriterat.go
+++ b/services/keepstore/streamwriterat.go
@@ -19,10 +19,10 @@ import (
// streamWriterAt writes the data to the provided io.Writer in
// sequential order.
//
-// streamWriterAt can also be used as an asynchronous buffer: the
-// caller can use the io.Writer interface to write into a memory
-// buffer and return without waiting for the wrapped writer to catch
-// up.
+// streamWriterAt can also be wrapped with an io.OffsetWriter to
+// provide an asynchronous buffer: the caller can use the io.Writer
+// interface to write into a memory buffer and return without waiting
+// for the wrapped writer to catch up.
//
// Close returns when all data has been written through.
type streamWriterAt struct {
@@ -87,14 +87,7 @@ func (swa *streamWriterAt) writeToWriter() {
}
}
-// Write implements io.Writer.
-func (swa *streamWriterAt) Write(p []byte) (int, error) {
- n, err := swa.WriteAt(p, int64(swa.writepos))
- swa.writepos += n
- return n, err
-}
-
-// WriteAt implements io.WriterAt.
+// WriteAt implements io.WriterAt. WriteAt is goroutine-safe.
func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
pos := int(offset)
n := 0
diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go
index f652a50023..92cf12ac18 100644
--- a/services/keepstore/unix_volume.go
+++ b/services/keepstore/unix_volume.go
@@ -198,21 +198,6 @@ func (v *unixVolume) Mtime(loc string) (time.Time, error) {
return fi.ModTime(), nil
}
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
- if err := v.lock(ctx); err != nil {
- return err
- }
- defer v.unlock()
- f, err := v.os.Open(path)
- if err != nil {
- return err
- }
- defer f.Close()
- return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
// stat is os.Stat() with some extra sanity checks.
func (v *unixVolume) stat(path string) (os.FileInfo, error) {
stat, err := v.os.Stat(path)
@@ -227,41 +212,28 @@ func (v *unixVolume) stat(path string) (os.FileInfo, error) {
}
// BlockRead reads a block from the volume.
-func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
path := v.blockPath(hash)
stat, err := v.stat(path)
if err != nil {
- return 0, v.translateError(err)
+ return v.translateError(err)
}
- var streamer *streamWriterAt
- if v.locker != nil {
- buf, err := v.bufferPool.GetContext(ctx)
- if err != nil {
- return 0, err
- }
- defer v.bufferPool.Put(buf)
- streamer = newStreamWriterAt(w, 65536, buf)
- defer streamer.Close()
- w = streamer
- }
- var n int64
- err = v.getFunc(ctx, path, func(rdr io.Reader) error {
- n, err = io.Copy(w, rdr)
- if err == nil && n != stat.Size() {
- err = io.ErrUnexpectedEOF
- }
+ if err := v.lock(ctx); err != nil {
+ return err
+ }
+ defer v.unlock()
+ f, err := v.os.Open(path)
+ if err != nil {
return err
- })
- if streamer != nil {
- // If we're using the streamer (and there's no error
- // so far) flush any remaining buffered data now that
- // getFunc has released the serialize lock.
- if err == nil {
- err = streamer.Close()
- }
- return streamer.WroteAt(), err
}
- return int(n), err
+ defer f.Close()
+ src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+ dst := io.NewOffsetWriter(w, 0)
+ n, err := io.Copy(dst, src)
+ if err == nil && n != stat.Size() {
+ err = io.ErrUnexpectedEOF
+ }
+ return err
}
// BlockWrite stores a block on the volume. If it already exists, its
diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go
index 715e23a9ea..bcdb5f6358 100644
--- a/services/keepstore/unix_volume_test.go
+++ b/services/keepstore/unix_volume_test.go
@@ -8,9 +8,7 @@ import (
"bytes"
"context"
"encoding/json"
- "errors"
"fmt"
- "io"
"io/ioutil"
"os"
"sync"
@@ -123,16 +121,9 @@ func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
defer v.Teardown()
v.BlockWrite(context.Background(), TestHash, TestBlock)
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash2, buf)
- switch {
- case os.IsNotExist(err):
- break
- case err == nil:
- c.Errorf("Read should have failed, returned %+q", buf.Bytes())
- default:
- c.Errorf("Read expected ErrNotExist, got: %s", err)
- }
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash2, buf)
+ c.Check(err, check.FitsTypeOf, os.ErrNotExist)
}
func (s *unixVolumeSuite) TestPut(c *check.C) {
@@ -182,94 +173,6 @@ func (s *unixVolumeSuite) TestIsFull(c *check.C) {
}
}
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
- v := s.newTestableUnixVolume(c, s.params, false)
- defer v.Teardown()
-
- v.BlockWrite(context.Background(), TestHash, TestBlock)
- mockErr := errors.New("Mock error")
- err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
- return mockErr
- })
- if err != mockErr {
- c.Errorf("Got %v, expected %v", err, mockErr)
- }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
- v := s.newTestableUnixVolume(c, s.params, false)
- defer v.Teardown()
-
- funcCalled := false
- err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
- funcCalled = true
- return nil
- })
- if err == nil {
- c.Errorf("Expected error opening non-existent file")
- }
- if funcCalled {
- c.Errorf("Worker func should not have been called")
- }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
- v := s.newTestableUnixVolume(c, s.params, false)
- defer v.Teardown()
-
- v.BlockWrite(context.Background(), TestHash, TestBlock)
-
- mtx := NewMockMutex()
- v.locker = mtx
-
- funcCalled := make(chan struct{})
- go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
- funcCalled <- struct{}{}
- return nil
- })
- select {
- case mtx.AllowLock <- struct{}{}:
- case <-funcCalled:
- c.Fatal("Function was called before mutex was acquired")
- case <-time.After(5 * time.Second):
- c.Fatal("Timed out before mutex was acquired")
- }
- select {
- case <-funcCalled:
- case mtx.AllowUnlock <- struct{}{}:
- c.Fatal("Mutex was released before function was called")
- case <-time.After(5 * time.Second):
- c.Fatal("Timed out waiting for funcCalled")
- }
- select {
- case mtx.AllowUnlock <- struct{}{}:
- case <-time.After(5 * time.Second):
- c.Fatal("Timed out waiting for getFunc() to release mutex")
- }
-}
-
-type MockMutex struct {
- AllowLock chan struct{}
- AllowUnlock chan struct{}
-}
-
-func NewMockMutex() *MockMutex {
- return &MockMutex{
- AllowLock: make(chan struct{}),
- AllowUnlock: make(chan struct{}),
- }
-}
-
-// Lock waits for someone to send to AllowLock.
-func (m *MockMutex) Lock() {
- <-m.AllowLock
-}
-
-// Unlock waits for someone to send to AllowUnlock.
-func (m *MockMutex) Unlock() {
- <-m.AllowUnlock
-}
-
func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
v := s.newTestableUnixVolume(c, s.params, true)
defer v.Teardown()
@@ -300,9 +203,10 @@ func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
time.Sleep(50 * time.Millisecond)
cancel()
}()
- n, err := v.BlockRead(ctx, TestHash, io.Discard)
- if n > 0 || err != context.Canceled {
- c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err)
+ buf := &brbuffer{}
+ err = v.BlockRead(ctx, TestHash, buf)
+ if buf.Len() != 0 || err != context.Canceled {
+ c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err)
}
}
@@ -317,7 +221,7 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
- _, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
+ err := vol.BlockRead(context.Background(), fooHash, brdiscard)
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
@@ -339,8 +243,8 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
- buf := bytes.NewBuffer(nil)
- _, err = vol.BlockRead(context.Background(), fooHash, buf)
+ buf := &brbuffer{}
+ err = vol.BlockRead(context.Background(), fooHash, buf)
c.Check(err, check.IsNil)
c.Check(buf.String(), check.Equals, "foo")
c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index a0b6fda7d3..f1b6781da6 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -15,7 +15,7 @@ import (
// volume is the interface to a back-end storage device.
type volume interface {
- BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+ BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error
BlockWrite(ctx context.Context, hash string, data []byte) error
DeviceID() string
BlockTouch(hash string) error
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 22667743dd..16084058b7 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -9,7 +9,6 @@ import (
"context"
"crypto/md5"
"fmt"
- "io"
"os"
"regexp"
"sort"
@@ -127,8 +126,8 @@ func (s *genericVolumeSuite) testGet(t TB, factory TestableVolumeFactory) {
t.Error(err)
}
- buf := bytes.NewBuffer(nil)
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
@@ -144,7 +143,7 @@ func (s *genericVolumeSuite) testGetNoSuchBlock(t TB, factory TestableVolumeFact
v := s.newVolume(t, factory)
defer v.Teardown()
- if _, err := v.BlockRead(context.Background(), barHash, io.Discard); err == nil {
+ if err := v.BlockRead(context.Background(), barHash, brdiscard); err == nil {
t.Errorf("Expected error while getting non-existing block %v", barHash)
}
}
@@ -177,8 +176,8 @@ func (s *genericVolumeSuite) testPutBlockWithDifferentContent(t TB, factory Test
v.BlockWrite(context.Background(), testHash, testDataA)
putErr := v.BlockWrite(context.Background(), testHash, testDataB)
- buf := bytes.NewBuffer(nil)
- _, getErr := v.BlockRead(context.Background(), testHash, buf)
+ buf := &brbuffer{}
+ getErr := v.BlockRead(context.Background(), testHash, buf)
if putErr == nil {
// Put must not return a nil error unless it has
// overwritten the existing data.
@@ -217,8 +216,8 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
}
- buf := bytes.NewBuffer(nil)
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
} else {
@@ -228,7 +227,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
}
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash2, buf)
+ err = v.BlockRead(context.Background(), TestHash2, buf)
if err != nil {
t.Error(err)
} else {
@@ -238,7 +237,7 @@ func (s *genericVolumeSuite) testPutMultipleBlocks(t TB, factory TestableVolumeF
}
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash3, buf)
+ err = v.BlockRead(context.Background(), TestHash3, buf)
if err != nil {
t.Error(err)
} else {
@@ -404,8 +403,8 @@ func (s *genericVolumeSuite) testDeleteNewBlock(t TB, factory TestableVolumeFact
if err := v.BlockTrash(TestHash); err != nil {
t.Error(err)
}
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
} else if buf.String() != string(TestBlock) {
@@ -428,7 +427,7 @@ func (s *genericVolumeSuite) testDeleteOldBlock(t TB, factory TestableVolumeFact
if err := v.BlockTrash(TestHash); err != nil {
t.Error(err)
}
- if _, err := v.BlockRead(context.Background(), TestHash, io.Discard); err == nil || !os.IsNotExist(err) {
+ if err := v.BlockRead(context.Background(), TestHash, brdiscard); err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
@@ -517,7 +516,7 @@ func (s *genericVolumeSuite) testMetrics(t TB, readonly bool, factory TestableVo
v.BlockWrite(context.Background(), TestHash, TestBlock)
}
- _, err = v.BlockRead(context.Background(), TestHash, io.Discard)
+ err = v.BlockRead(context.Background(), TestHash, brdiscard)
if err != nil {
t.Error(err)
}
@@ -546,8 +545,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
sem := make(chan int)
go func() {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("err1: %v", err)
}
@@ -558,8 +557,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
}()
go func() {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash2, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash2, buf)
if err != nil {
t.Errorf("err2: %v", err)
}
@@ -570,8 +569,8 @@ func (s *genericVolumeSuite) testGetConcurrent(t TB, factory TestableVolumeFacto
}()
go func() {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash3, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash3, buf)
if err != nil {
t.Errorf("err3: %v", err)
}
@@ -619,8 +618,8 @@ func (s *genericVolumeSuite) testPutConcurrent(t TB, factory TestableVolumeFacto
// Check that we actually wrote the blocks.
for _, blk := range blks {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), blk.hash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), blk.hash, buf)
if err != nil {
t.Errorf("get %s: %v", blk.hash, err)
} else if buf.String() != string(blk.data) {
@@ -644,13 +643,13 @@ func (s *genericVolumeSuite) testPutFullBlock(t TB, factory TestableVolumeFactor
t.Error(err)
}
- buf := bytes.NewBuffer(nil)
- _, err = v.BlockRead(context.Background(), hash, buf)
+ buf := &brbuffer{}
+ err = v.BlockRead(context.Background(), hash, buf)
if err != nil {
t.Error(err)
}
if buf.String() != string(wdata) {
- t.Error("buf %+q != wdata %+q", buf, wdata)
+ t.Errorf("buf (len %d) != wdata (len %d)", buf.Len(), len(wdata))
}
}
@@ -668,8 +667,8 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
v.BlockWrite(context.Background(), TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*s.cluster.Collections.BlobSigningTTL.Duration()))
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
@@ -684,7 +683,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
return
}
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
@@ -697,7 +696,7 @@ func (s *genericVolumeSuite) testTrashUntrash(t TB, readonly bool, factory Testa
// Get the block - after trash and untrash sequence
buf.Reset()
- _, err = v.BlockRead(context.Background(), TestHash, buf)
+ err = v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
@@ -712,8 +711,8 @@ func (s *genericVolumeSuite) testTrashEmptyTrashUntrash(t TB, factory TestableVo
defer v.Teardown()
checkGet := func() error {
- buf := bytes.NewBuffer(nil)
- _, err := v.BlockRead(context.Background(), TestHash, buf)
+ buf := &brbuffer{}
+ err := v.BlockRead(context.Background(), TestHash, buf)
if err != nil {
return err
}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 5a17b3a7dc..f64041b048 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -5,6 +5,7 @@
package keepstore
import (
+ "sync"
"time"
)
@@ -38,3 +39,52 @@ type TestableVolume interface {
// Clean up, delete temporary files.
Teardown()
}
+
+// brbuffer is like bytes.Buffer, but it implements io.WriterAt.
+// Convenient for testing (volume)BlockRead implementations.
+type brbuffer struct {
+ mtx sync.Mutex
+ buf []byte
+}
+
+func (b *brbuffer) WriteAt(p []byte, offset int64) (int, error) {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ if short := int(offset) + len(p) - len(b.buf); short > 0 {
+ b.buf = append(b.buf, make([]byte, short)...)
+ }
+ return copy(b.buf[offset:], p), nil
+}
+
+func (b *brbuffer) Bytes() []byte {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ return b.buf
+}
+
+func (b *brbuffer) String() string {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ return string(b.buf)
+}
+
+func (b *brbuffer) Len() int {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ return len(b.buf)
+}
+
+func (b *brbuffer) Reset() {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ b.buf = nil
+}
+
+// a brdiscarder is like io.Discard, but it implements
+// io.WriterAt. Convenient for testing (volume)BlockRead
+// implementations when the output is not checked.
+type brdiscarder struct{}
+
+func (brdiscarder) WriteAt(p []byte, offset int64) (int, error) { return len(p), nil }
+
+var brdiscard = brdiscarder{}
commit 7ef752823f118079af629604ac29143e7c156687
Author: Tom Clegg <tom at curii.com>
Date: Wed Feb 14 20:25:04 2024 -0500
2960: Buffer reads when serialize enabled on unix volume.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/streamwriterat.go b/services/keepstore/streamwriterat.go
index 365b55f233..3426dadc1f 100644
--- a/services/keepstore/streamwriterat.go
+++ b/services/keepstore/streamwriterat.go
@@ -19,10 +19,16 @@ import (
// streamWriterAt writes the data to the provided io.Writer in
// sequential order.
//
+// streamWriterAt can also be used as an asynchronous buffer: the
+// caller can use the io.Writer interface to write into a memory
+// buffer and return without waiting for the wrapped writer to catch
+// up.
+//
// Close returns when all data has been written through.
type streamWriterAt struct {
writer io.Writer
buf []byte
+ writepos int // target offset if Write is called
partsize int // size of each part written through to writer
endpos int // portion of buf actually used, judging by WriteAt calls so far
partfilled []int // number of bytes written to each part so far
@@ -81,6 +87,13 @@ func (swa *streamWriterAt) writeToWriter() {
}
}
+// Write implements io.Writer.
+func (swa *streamWriterAt) Write(p []byte) (int, error) {
+ n, err := swa.WriteAt(p, int64(swa.writepos))
+ swa.writepos += n
+ return n, err
+}
+
// WriteAt implements io.WriterAt.
func (swa *streamWriterAt) WriteAt(p []byte, offset int64) (int, error) {
pos := int(offset)
diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go
index f01ad97553..f652a50023 100644
--- a/services/keepstore/unix_volume.go
+++ b/services/keepstore/unix_volume.go
@@ -33,11 +33,12 @@ func init() {
func newUnixVolume(params newVolumeParams) (volume, error) {
v := &unixVolume{
- uuid: params.UUID,
- cluster: params.Cluster,
- volume: params.ConfigVolume,
- logger: params.Logger,
- metrics: params.MetricsVecs,
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
}
err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
if err != nil {
@@ -71,11 +72,12 @@ type unixVolume struct {
Root string // path to the volume's root directory
Serialize bool
- uuid string
- cluster *arvados.Cluster
- volume arvados.Volume
- logger logrus.FieldLogger
- metrics *volumeMetricsVecs
+ uuid string
+ cluster *arvados.Cluster
+ volume arvados.Volume
+ logger logrus.FieldLogger
+ metrics *volumeMetricsVecs
+ bufferPool *bufferPool
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
@@ -231,6 +233,17 @@ func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i
if err != nil {
return 0, v.translateError(err)
}
+ var streamer *streamWriterAt
+ if v.locker != nil {
+ buf, err := v.bufferPool.GetContext(ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer v.bufferPool.Put(buf)
+ streamer = newStreamWriterAt(w, 65536, buf)
+ defer streamer.Close()
+ w = streamer
+ }
var n int64
err = v.getFunc(ctx, path, func(rdr io.Reader) error {
n, err = io.Copy(w, rdr)
@@ -239,6 +252,15 @@ func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (i
}
return err
})
+ if streamer != nil {
+ // If we're using the streamer (and there's no error
+ // so far) flush any remaining buffered data now that
+ // getFunc has released the serialize lock.
+ if err == nil {
+ err = streamer.Close()
+ }
+ return streamer.WroteAt(), err
+ }
return int(n), err
}
diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go
index de8d3c42d8..715e23a9ea 100644
--- a/services/keepstore/unix_volume_test.go
+++ b/services/keepstore/unix_volume_test.go
@@ -78,13 +78,14 @@ func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumePara
}
v := &testableUnixVolume{
unixVolume: unixVolume{
- Root: d,
- locker: locker,
- uuid: params.UUID,
- cluster: params.Cluster,
- logger: params.Logger,
- volume: params.ConfigVolume,
- metrics: params.MetricsVecs,
+ Root: d,
+ locker: locker,
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ logger: params.Logger,
+ volume: params.ConfigVolume,
+ metrics: params.MetricsVecs,
+ bufferPool: params.BufferPool,
},
t: c,
}
commit 99e43aaefc4a76908fd5f649edf0512c3800e021
Author: Tom Clegg <tom at curii.com>
Date: Wed Feb 14 17:21:27 2024 -0500
2960: Finish renaming s3aws_volume to s3_volume.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/s3aws_volume.go b/services/keepstore/s3_volume.go
similarity index 99%
rename from services/keepstore/s3aws_volume.go
rename to services/keepstore/s3_volume.go
index 8e93eed12c..bd79d49e16 100644
--- a/services/keepstore/s3aws_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -65,7 +65,7 @@ type s3Volume struct {
logger logrus.FieldLogger
metrics *volumeMetricsVecs
bufferPool *bufferPool
- bucket *s3AWSbucket
+ bucket *s3Bucket
region string
startOnce sync.Once
}
@@ -73,7 +73,7 @@ type s3Volume struct {
// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
// wrapped bucket can be replaced atomically with SetBucket in order
// to update credentials.
-type s3AWSbucket struct {
+type s3Bucket struct {
bucket string
svc *s3.Client
stats s3awsbucketStats
@@ -222,7 +222,7 @@ func (v *s3Volume) check(ec2metadataHostname string) error {
cfg.Credentials = creds
- v.bucket = &s3AWSbucket{
+ v.bucket = &s3Bucket{
bucket: v.Bucket,
svc: s3.New(cfg),
}
@@ -538,7 +538,7 @@ func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) err
type s3awsLister struct {
Logger logrus.FieldLogger
- Bucket *s3AWSbucket
+ Bucket *s3Bucket
Prefix string
PageSize int
Stats *s3awsbucketStats
@@ -768,7 +768,7 @@ func (v *s3Volume) checkRaceWindow(key string) error {
return nil
}
-func (b *s3AWSbucket) Del(path string) error {
+func (b *s3Bucket) Del(path string) error {
input := &s3.DeleteObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(path),
diff --git a/services/keepstore/s3aws_volume_test.go b/services/keepstore/s3_volume_test.go
similarity index 94%
rename from services/keepstore/s3aws_volume_test.go
rename to services/keepstore/s3_volume_test.go
index d9dcbc52d6..d814949f44 100644
--- a/services/keepstore/s3aws_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -32,7 +32,7 @@ import (
)
const (
- S3AWSTestBucketName = "testbucket"
+ s3TestBucketName = "testbucket"
)
type s3AWSFakeClock struct {
@@ -50,18 +50,18 @@ func (c *s3AWSFakeClock) Since(t time.Time) time.Duration {
return c.Now().Sub(t)
}
-var _ = check.Suite(&StubbedS3AWSSuite{})
+var _ = check.Suite(&stubbedS3Suite{})
var srv httptest.Server
-type StubbedS3AWSSuite struct {
+type stubbedS3Suite struct {
s3server *httptest.Server
metadata *httptest.Server
cluster *arvados.Cluster
volumes []*testableS3Volume
}
-func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
+func (s *stubbedS3Suite) SetUpTest(c *check.C) {
s.s3server = nil
s.metadata = nil
s.cluster = testCluster(c)
@@ -71,7 +71,7 @@ func (s *StubbedS3AWSSuite) SetUpTest(c *check.C) {
}
}
-func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
+func (s *stubbedS3Suite) TestGeneric(c *check.C) {
DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
@@ -79,13 +79,13 @@ func (s *StubbedS3AWSSuite) TestGeneric(c *check.C) {
})
}
-func (s *StubbedS3AWSSuite) TestGenericReadOnly(c *check.C) {
+func (s *stubbedS3Suite) TestGenericReadOnly(c *check.C) {
DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume {
return s.newTestableVolume(c, params, -2*time.Second)
})
}
-func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
+func (s *stubbedS3Suite) TestGenericWithPrefix(c *check.C) {
DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume {
v := s.newTestableVolume(c, params, -2*time.Second)
v.PrefixLength = 3
@@ -93,7 +93,7 @@ func (s *StubbedS3AWSSuite) TestGenericWithPrefix(c *check.C) {
})
}
-func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
+func (s *stubbedS3Suite) TestIndex(c *check.C) {
v := s.newTestableVolume(c, newVolumeParams{
Cluster: s.cluster,
ConfigVolume: arvados.Volume{Replication: 2},
@@ -124,7 +124,7 @@ func (s *StubbedS3AWSSuite) TestIndex(c *check.C) {
}
}
-func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
+func (s *stubbedS3Suite) TestSignature(c *check.C) {
var header http.Header
stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
header = r.Header
@@ -155,7 +155,7 @@ func (s *StubbedS3AWSSuite) TestSignature(c *check.C) {
c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
}
-func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
+func (s *stubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
@@ -205,7 +205,7 @@ func (s *StubbedS3AWSSuite) TestIAMRoleCredentials(c *check.C) {
c.Check(err, check.ErrorMatches, `(?s).*404.*`)
}
-func (s *StubbedS3AWSSuite) TestStats(c *check.C) {
+func (s *stubbedS3Suite) TestStats(c *check.C) {
v := s.newTestableVolume(c, newVolumeParams{
Cluster: s.cluster,
ConfigVolume: arvados.Volume{Replication: 2},
@@ -259,20 +259,20 @@ func (h *s3AWSBlockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
http.Error(w, "nothing here", http.StatusNotFound)
}
-func (s *StubbedS3AWSSuite) TestGetContextCancel(c *check.C) {
+func (s *stubbedS3Suite) TestGetContextCancel(c *check.C) {
s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
_, err := v.BlockRead(ctx, fooHash, io.Discard)
return err
})
}
-func (s *StubbedS3AWSSuite) TestPutContextCancel(c *check.C) {
+func (s *stubbedS3Suite) TestPutContextCancel(c *check.C) {
s.testContextCancel(c, func(ctx context.Context, v *testableS3Volume) error {
return v.BlockWrite(ctx, fooHash, []byte("foo"))
})
}
-func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
+func (s *stubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *testableS3Volume) error) {
handler := &s3AWSBlockingHandler{}
s.s3server = httptest.NewServer(handler)
defer s.s3server.Close()
@@ -318,7 +318,7 @@ func (s *StubbedS3AWSSuite) testContextCancel(c *check.C, testFunc func(context.
}
}
-func (s *StubbedS3AWSSuite) TestBackendStates(c *check.C) {
+func (s *stubbedS3Suite) TestBackendStates(c *check.C) {
s.cluster.Collections.BlobTrashLifetime.Set("1h")
s.cluster.Collections.BlobSigningTTL.Set("1h")
@@ -559,7 +559,7 @@ func (l LogrusLog) Print(level gofakes3.LogLevel, v ...interface{}) {
}
}
-func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
+func (s *stubbedS3Suite) newTestableVolume(c *check.C, params newVolumeParams, raceWindow time.Duration) *testableS3Volume {
clock := &s3AWSFakeClock{}
// fake s3
@@ -588,7 +588,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams
IAMRole: iamRole,
AccessKeyID: accessKey,
SecretAccessKey: secretKey,
- Bucket: S3AWSTestBucketName,
+ Bucket: s3TestBucketName,
Endpoint: endpoint,
Region: "test-region-1",
LocationConstraint: true,
@@ -610,7 +610,7 @@ func (s *StubbedS3AWSSuite) newTestableVolume(c *check.C, params newVolumeParams
v.s3Volume.bucket.svc.ForcePathStyle = true
// Create the testbucket
input := &s3.CreateBucketInput{
- Bucket: aws.String(S3AWSTestBucketName),
+ Bucket: aws.String(s3TestBucketName),
}
req := v.s3Volume.bucket.svc.CreateBucketRequest(input)
_, err := req.Send(context.Background())
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list