[arvados] created: 2.7.0-5429-gdbd3e4e8a3
git repository hosting
git at public.arvados.org
Wed Dec 20 21:00:20 UTC 2023
at dbd3e4e8a3216dcf4942ded00a546649777245c2 (commit)
commit dbd3e4e8a3216dcf4942ded00a546649777245c2
Author: Tom Clegg <tom at curii.com>
Date: Wed Dec 20 14:37:28 2023 -0500
20318: Test hash check on block write.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 5fe1a6a08e..e4d1790cac 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -114,6 +114,36 @@ func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWrit
return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
}
+func (s *keepCacheSuite) TestBlockWrite(c *check.C) {
+ backend := &keepGatewayMemoryBacked{}
+ cache := DiskCache{
+ KeepGateway: backend,
+ MaxSize: 40000000,
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ ctx := context.Background()
+ real, err := cache.BlockWrite(ctx, BlockWriteOptions{
+ Data: make([]byte, 100000),
+ })
+ c.Assert(err, check.IsNil)
+
+ // Write different data but supply the same hash. Should be
+ // rejected (even though our fake backend doesn't notice).
+ _, err = cache.BlockWrite(ctx, BlockWriteOptions{
+ Hash: real.Locator[:32],
+ Data: make([]byte, 10),
+ })
+ c.Check(err, check.ErrorMatches, `block hash .+ did not match provided hash .+`)
+
+ // Ensure the bogus write didn't overwrite (or delete) the
+ // real cached data associated with that hash.
+ delete(backend.data, real.Locator)
+ n, err := cache.ReadAt(real.Locator, make([]byte, 100), 0)
+ c.Check(n, check.Equals, 100)
+ c.Check(err, check.IsNil)
+}
+
func (s *keepCacheSuite) TestMaxSize(c *check.C) {
backend := &keepGatewayMemoryBacked{}
cache := DiskCache{
commit a32cbc86cef9c05cc63a4bd749553c13befff730
Author: Tom Clegg <tom at curii.com>
Date: Wed Dec 20 13:33:56 2023 -0500
20318: Track estimated cache usage, and tidy more diligently.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index b366d6f1b0..af80daa2e0 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -58,6 +58,9 @@ type DiskCache struct {
writing map[string]*writeprogress
writingCond *sync.Cond
writingLock sync.Mutex
+
+ sizeMeasured int64 // actual size on disk after last tidy(); zero if not measured yet
+ sizeEstimated int64 // last measured size, plus files we have written since
}
type writeprogress struct {
@@ -76,7 +79,7 @@ type openFileEnt struct {
const (
cacheFileSuffix = ".keepcacheblock"
tmpFileSuffix = ".tmp"
- tidyHoldDuration = 10 * time.Second
+ tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
)
func (cache *DiskCache) cacheFile(locator string) string {
@@ -123,7 +126,6 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
// BlockWrite writes through to the wrapped KeepGateway, and (if
// possible) retains a copy of the written block in the cache.
func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
- cache.gotidy()
unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
@@ -187,6 +189,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
if err != nil {
cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
}
+ atomic.AddInt64(&cache.sizeEstimated, int64(n))
+ cache.gotidy()
}()
// Write through to the wrapped KeepGateway from the pipe,
@@ -231,7 +235,6 @@ func (fw funcwriter) Write(p []byte) (int, error) {
// cache. The remainder of the block may continue to be copied into
// the cache in the background.
func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
- cache.gotidy()
cachefilename := cache.cacheFile(locator)
if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
return n, err
@@ -305,6 +308,8 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
}
return n, err
})})
+ atomic.AddInt64(&cache.sizeEstimated, int64(size))
+ cache.gotidy()
}()
}
progress.cond.L.Lock()
@@ -519,9 +524,18 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
// Start a tidy() goroutine, unless one is already running / recently
// finished.
func (cache *DiskCache) gotidy() {
- // Return quickly if another tidy goroutine is running in this process.
+ // Skip if another tidy goroutine is running in this process.
n := atomic.AddInt32(&cache.tidying, 1)
- if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+ if n != 1 {
+ atomic.AddInt32(&cache.tidying, -1)
+ return
+ }
+ // Skip if sizeEstimated is based on an actual measurement and
+ // is below MaxSize, and we haven't reached the "recheck
+ // anyway" time threshold.
+ if cache.sizeMeasured > 0 &&
+ atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
+ time.Now().Before(cache.tidyHoldUntil) {
atomic.AddInt32(&cache.tidying, -1)
return
}
@@ -608,11 +622,26 @@ func (cache *DiskCache) tidy() {
return
}
- if totalsize <= maxsize {
+ // If we're below MaxSize or there's only one block in the
+ // cache, just update the usage estimate and return.
+ //
+ // (We never delete the last block because that would merely
+ // cause the same block to get re-fetched repeatedly from the
+ // backend.)
+ if totalsize <= maxsize || len(ents) == 1 {
+ atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+ atomic.StoreInt64(&cache.sizeEstimated, totalsize)
return
}
- // Delete oldest entries until totalsize < maxsize.
+ // Set a new size target of maxsize minus 5%. This makes some
+ // room for sizeEstimate to grow before it triggers another
+ // tidy. We don't want to walk/sort an entire large cache
+ // directory each time we write a block.
+ target := maxsize - (maxsize / 20)
+
+ // Delete oldest entries until totalsize < target or we're
+ // down to a single cached block.
sort.Slice(ents, func(i, j int) bool {
return ents[i].atime.Before(ents[j].atime)
})
@@ -622,7 +651,7 @@ func (cache *DiskCache) tidy() {
go cache.deleteHeldopen(ent.path, nil)
deleted++
totalsize -= ent.size
- if totalsize <= maxsize {
+ if totalsize <= target || deleted == len(ents)-1 {
break
}
}
@@ -633,4 +662,6 @@ func (cache *DiskCache) tidy() {
"totalsize": totalsize,
}).Debugf("DiskCache: remaining cache usage after deleting")
}
+ atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+ atomic.StoreInt64(&cache.sizeEstimated, totalsize)
}
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 5ae932a782..5fe1a6a08e 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -127,15 +127,29 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
Data: make([]byte, 44000000),
})
c.Check(err, check.IsNil)
+
+ // Wait for tidy to finish, check that it doesn't delete the
+ // only block.
time.Sleep(time.Millisecond)
+ for atomic.LoadInt32(&cache.tidying) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
+
resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
Data: make([]byte, 32000000),
})
c.Check(err, check.IsNil)
delete(backend.data, resp1.Locator)
delete(backend.data, resp2.Locator)
- cache.tidyHoldUntil = time.Time{}
- cache.tidy()
+
+ // Wait for tidy to finish, check that it deleted the older
+ // block.
+ time.Sleep(time.Millisecond)
+ for atomic.LoadInt32(&cache.tidying) > 0 {
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
c.Check(n, check.Equals, 0)
commit 899185924c97c0e981c2c40ab115d7572ce79811
Author: Tom Clegg <tom at curii.com>
Date: Wed Dec 20 13:19:32 2023 -0500
20318: Implement BlockRead via ReadAt so it supports streaming.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 2a33871ca4..b366d6f1b0 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -476,18 +476,8 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
return n, err
}
-// BlockRead reads the entire block from the wrapped KeepGateway into
-// the cache if needed, and writes it to the provided writer.
+// BlockRead reads an entire block using a 128 KiB buffer.
func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
- cache.gotidy()
- cachefilename := cache.cacheFile(opts.Locator)
- f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
- if err != nil {
- cache.debugf("BlockRead: %s", cachefilename, err)
- return cache.KeepGateway.BlockRead(ctx, opts)
- }
- defer f.Close()
-
i := strings.Index(opts.Locator, "+")
if i < 0 || i >= len(opts.Locator) {
return 0, errors.New("invalid block locator: no size hint")
@@ -502,33 +492,28 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
return 0, errors.New("invalid block locator: invalid size hint")
}
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
- if err != nil {
- return 0, err
- }
- filesize, err := f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, err
- }
- _, err = f.Seek(0, io.SeekStart)
- if err != nil {
- return 0, err
- }
- if filesize == blocksize {
- n, err := io.Copy(opts.WriteTo, f)
- return int(n), err
- }
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
- if err != nil {
- return 0, err
- }
- opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
- n, err := cache.KeepGateway.BlockRead(ctx, opts)
- if err != nil {
- return int(n), err
+ offset := 0
+ buf := make([]byte, 131072)
+ for offset < int(blocksize) {
+ if ctx.Err() != nil {
+ return offset, ctx.Err()
+ }
+ if int(blocksize)-offset > len(buf) {
+ buf = buf[:int(blocksize)-offset]
+ }
+ nr, err := cache.ReadAt(opts.Locator, buf, offset)
+ if nr > 0 {
+ nw, err := opts.WriteTo.Write(buf)
+ if err != nil {
+ return offset + nw, err
+ }
+ }
+ offset += nr
+ if err != nil {
+ return offset, err
+ }
}
- f.Truncate(int64(n))
- return n, nil
+ return offset, nil
}
// Start a tidy() goroutine, unless one is already running / recently
commit 4121c9e9fc03ee474f01248d384c7d3281b34328
Author: Tom Clegg <tom at curii.com>
Date: Wed Dec 20 10:55:30 2023 -0500
20318: Test streaming.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 6d11410470..5ae932a782 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -13,7 +13,9 @@ import (
"io"
"math/rand"
"os"
+ "path/filepath"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -49,8 +51,10 @@ func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOpti
}
type keepGatewayMemoryBacked struct {
- mtx sync.RWMutex
- data map[string][]byte
+ mtx sync.RWMutex
+ data map[string][]byte
+ pauseBlockReadAfter int
+ pauseBlockReadUntil chan error
}
func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
@@ -76,6 +80,16 @@ func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadO
if data == nil {
return 0, errors.New("block not found: " + opts.Locator)
}
+ if k.pauseBlockReadUntil != nil {
+ src := bytes.NewReader(data)
+ n, err := io.CopyN(opts.WriteTo, src, int64(k.pauseBlockReadAfter))
+ if err != nil {
+ return int(n), err
+ }
+ <-k.pauseBlockReadUntil
+ n2, err := io.Copy(opts.WriteTo, src)
+ return int(n + n2), err
+ }
return opts.WriteTo.Write(data)
}
func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
@@ -220,6 +234,66 @@ func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangle
wg.Wait()
}
+func (s *keepCacheSuite) TestStreaming(c *check.C) {
+ blksize := 64000000
+ backend := &keepGatewayMemoryBacked{
+ pauseBlockReadUntil: make(chan error),
+ pauseBlockReadAfter: blksize / 8,
+ }
+ cache := DiskCache{
+ KeepGateway: backend,
+ MaxSize: int64(blksize),
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
+ Data: make([]byte, blksize),
+ })
+ c.Check(err, check.IsNil)
+ os.RemoveAll(filepath.Join(cache.Dir, resp.Locator[:3]))
+
+ // Start a lot of concurrent requests for various ranges of
+ // the same block. Our backend will return the first 8MB and
+ // then pause. The requests that can be satisfied by the first
+ // 8MB of data should return quickly. The rest should wait,
+ // and return after we release pauseBlockReadUntil.
+ var wgEarly, wgLate sync.WaitGroup
+ var doneEarly, doneLate int32
+ for i := 0; i < 10000; i++ {
+ wgEarly.Add(1)
+ go func() {
+ offset := int(rand.Int63() % int64(blksize-benchReadSize))
+ if offset+benchReadSize > backend.pauseBlockReadAfter {
+ wgLate.Add(1)
+ defer wgLate.Done()
+ wgEarly.Done()
+ defer atomic.AddInt32(&doneLate, 1)
+ } else {
+ defer wgEarly.Done()
+ defer atomic.AddInt32(&doneEarly, 1)
+ }
+ buf := make([]byte, benchReadSize)
+ n, err := cache.ReadAt(resp.Locator, buf, offset)
+ c.Check(n, check.Equals, len(buf))
+ c.Check(err, check.IsNil)
+ }()
+ }
+
+ // Ensure all early ranges finish while backend request(s) are
+ // paused.
+ wgEarly.Wait()
+ c.Logf("doneEarly = %d", doneEarly)
+ c.Check(doneLate, check.Equals, int32(0))
+
+ // Unpause backend request(s).
+ close(backend.pauseBlockReadUntil)
+ wgLate.Wait()
+ c.Logf("doneLate = %d", doneLate)
+}
+
var _ = check.Suite(&keepCacheBenchSuite{})
type keepCacheBenchSuite struct {
commit 932ca9e698fb36bcdd0c558b50e6e965417409d0
Author: Tom Clegg <tom at curii.com>
Date: Wed Dec 20 10:22:31 2023 -0500
20318: Comment about error handling.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 567e0cbec3..2a33871ca4 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -458,6 +458,12 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
progress.cond.Wait()
}
progress.cond.L.Unlock()
+ // If size<needed && progress.err!=nil here, we'll end
+ // up reporting a less helpful "EOF reading from cache
+ // file" below, instead of the actual error fetching
+ // from upstream to cache file. This is OK though,
+ // because our caller (ReadAt) doesn't even report our
+ // error, it just retries.
}
n, err := heldopen.f.ReadAt(dst, int64(offset))
commit 4e69128e5e7aeb1a9c5e4462adb38ecc5f5bb8ea
Author: Tom Clegg <tom at curii.com>
Date: Tue Dec 19 16:48:58 2023 -0500
20318: Return requested range while fetching remainder of block.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index b8b3d9f588..567e0cbec3 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -50,6 +50,21 @@ type DiskCache struct {
heldopen map[string]*openFileEnt
heldopenMax int
heldopenLock sync.Mutex
+
+ // The "writing" fields allow multiple concurrent/sequential
+ // ReadAt calls to be notified as a single
+ // read-block-from-backend-into-cache goroutine fills the
+ // cache file.
+ writing map[string]*writeprogress
+ writingCond *sync.Cond
+ writingLock sync.Mutex
+}
+
+type writeprogress struct {
+ cond *sync.Cond // broadcast whenever size or done changes
+ done bool // size and err have their final values
+ size int // bytes copied into cache file so far
+ err error // error encountered while copying from backend to cache
}
type openFileEnt struct {
@@ -202,59 +217,116 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
return resp, err
}
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+ return fw(p)
+}
+
// ReadAt reads the entire block from the wrapped KeepGateway into the
// cache if needed, and copies the requested portion into the provided
// slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
cache.gotidy()
cachefilename := cache.cacheFile(locator)
if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
return n, err
}
- f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+ readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
if err != nil {
- return 0, fmt.Errorf("ReadAt: %s", cachefilename, err)
+ return 0, fmt.Errorf("ReadAt: %w", err)
}
- defer f.Close()
+ defer readf.Close()
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+ err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
if err != nil {
return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
}
- size, err := f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
- }
- if size < int64(len(dst)+offset) {
- // The cache file seems to be truncated or empty
- // (possibly because we just created it). Wait for an
- // exclusive lock, then check again (in case someone
- // else is doing the same thing) before trying to
- // retrieve the entire block.
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
- if err != nil {
- return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
- }
- }
- size, err = f.Seek(0, io.SeekEnd)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
- }
- if size < int64(len(dst)+offset) {
- // The cache file is truncated or empty, and we own it
- // now. Fill it.
- _, err = f.Seek(0, io.SeekStart)
- if err != nil {
- return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
+ cache.writingLock.Lock()
+ progress := cache.writing[cachefilename]
+ if progress != nil {
+ cache.writingLock.Unlock()
+ } else {
+ progress = &writeprogress{}
+ progress.cond = sync.NewCond(&sync.Mutex{})
+ if cache.writing == nil {
+ cache.writing = map[string]*writeprogress{}
}
- n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
- if err != nil {
- return 0, err
- }
- f.Truncate(int64(n))
+ cache.writing[cachefilename] = progress
+ cache.writingLock.Unlock()
+
+ // Start a goroutine to copy from backend to f. As
+ // data arrives, wake up any waiting loops (see below)
+ // so ReadAt() requests for partial data can return as
+ // soon as the relevant bytes have been copied.
+ go func() {
+ var size int
+ var writef *os.File
+ var err error
+ defer func() {
+ closeErr := writef.Close()
+ if err == nil {
+ err = closeErr
+ }
+ progress.cond.L.Lock()
+ progress.err = err
+ progress.done = true
+ progress.size = size
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ cache.writingLock.Lock()
+ delete(cache.writing, cachefilename)
+ cache.writingLock.Unlock()
+ }()
+ writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+ if err != nil {
+ err = fmt.Errorf("ReadAt: %w", err)
+ return
+ }
+ err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+ if err != nil {
+ err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+ return
+ }
+ size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+ Locator: locator,
+ WriteTo: funcwriter(func(p []byte) (int, error) {
+ n, err := writef.Write(p)
+ if n > 0 {
+ progress.cond.L.Lock()
+ progress.size += n
+ progress.cond.L.Unlock()
+ progress.cond.Broadcast()
+ }
+ return n, err
+ })})
+ }()
+ }
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ ok := progress.size >= len(dst)+offset
+ err = progress.err
+ progress.cond.L.Unlock()
+
+ if !ok && err != nil {
+ // If the copy-from-backend goroutine encountered an
+ // error before copying enough bytes to satisfy our
+ // request, we return that error.
+ return 0, err
+ } else {
+ // Regardless of whether the copy-from-backend
+ // goroutine succeeded, or failed after copying the
+ // bytes we need, the only errors we need to report
+ // are errors reading from the cache file.
+ return readf.ReadAt(dst, int64(offset))
}
- return f.ReadAt(dst, int64(offset))
}
var quickReadAtLostRace = errors.New("quickReadAt: lost race")
@@ -374,6 +446,20 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
// Other goroutine closed the file before we got RLock
return 0, quickReadAtLostRace
}
+
+ // If another goroutine is currently writing the file, wait
+ // for it to catch up to the end of the range we need.
+ cache.writingLock.Lock()
+ progress := cache.writing[cachefilename]
+ cache.writingLock.Unlock()
+ if progress != nil {
+ progress.cond.L.Lock()
+ for !progress.done && progress.size < len(dst)+offset {
+ progress.cond.Wait()
+ }
+ progress.cond.L.Unlock()
+ }
+
n, err := heldopen.f.ReadAt(dst, int64(offset))
if err != nil {
// wait for any concurrent users to finish, then
commit 2553652e43229a872b93a5d011c25a2727d1d18f
Author: Tom Clegg <tom at curii.com>
Date: Tue Dec 19 14:07:28 2023 -0500
20318: Fix stuttering error message.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 189332c866..b8b3d9f588 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -213,7 +213,7 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
}
f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
if err != nil {
- return 0, fmt.Errorf("ReadAt: open(%s) failed: %s", cachefilename, err)
+ return 0, fmt.Errorf("ReadAt: %s", cachefilename, err)
}
defer f.Close()
@@ -391,7 +391,7 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
cachefilename := cache.cacheFile(opts.Locator)
f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
if err != nil {
- cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
+ cache.debugf("BlockRead: %s", cachefilename, err)
return cache.KeepGateway.BlockRead(ctx, opts)
}
defer f.Close()
commit 82054f095dad0fb9d21842eac1cd9ade50ffc940
Author: Tom Clegg <tom at curii.com>
Date: Tue Dec 19 14:07:23 2023 -0500
20318: Remove memory-backed keep block cache.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 0e0d3c43e4..bde13424dd 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -78,7 +78,6 @@ type IKeepClient interface {
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
LocalLocator(locator string) (string, error)
- ClearBlockCache()
SetStorageClasses(sc []string)
}
@@ -2033,7 +2032,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
log.Printf("%s: %v", containerUUID, err)
return 1
}
- kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index c533821351..276dd36661 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -368,9 +368,6 @@ func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
return 0, errors.New("not implemented")
}
-func (client *KeepTestClient) ClearBlockCache() {
-}
-
func (client *KeepTestClient) Close() {
client.Content = nil
}
diff --git a/lib/mount/command.go b/lib/mount/command.go
index f88d977c4c..666f2cf4ac 100644
--- a/lib/mount/command.go
+++ b/lib/mount/command.go
@@ -43,7 +43,6 @@ func (c *mountCommand) RunCommand(prog string, args []string, stdin io.Reader, s
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
ro := flags.Bool("ro", false, "read-only")
experimental := flags.Bool("experimental", false, "acknowledge this is an experimental command, and should not be used in production (required)")
- blockCache := flags.Int("block-cache", 4, "read cache size (number of 64MiB blocks)")
pprof := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
if ok, code := cmd.ParseFlags(flags, prog, args, "[FUSE mount options]", stderr); !ok {
return code
@@ -69,7 +68,6 @@ func (c *mountCommand) RunCommand(prog string, args []string, stdin io.Reader, s
logger.Print(err)
return 1
}
- kc.BlockCache = &keepclient.BlockCache{MaxBlocks: *blockCache}
host := fuse.NewFileSystemHost(&keepFS{
Client: client,
KeepClient: kc,
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
deleted file mode 100644
index 37eee4de20..0000000000
--- a/sdk/go/keepclient/block_cache.go
+++ /dev/null
@@ -1,134 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package keepclient
-
-import (
- "bytes"
- "context"
- "io"
- "sort"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "git.arvados.org/arvados.git/sdk/go/arvados"
-)
-
-var DefaultBlockCache = &BlockCache{}
-
-type BlockCache struct {
- // Maximum number of blocks to keep in the cache. If 0, a
- // default size (currently 4) is used instead.
- MaxBlocks int
-
- cache map[string]*cacheBlock
- mtx sync.Mutex
-}
-
-const defaultMaxBlocks = 4
-
-// Sweep deletes the least recently used blocks from the cache until
-// there are no more than MaxBlocks left.
-func (c *BlockCache) Sweep() {
- max := c.MaxBlocks
- if max == 0 {
- max = defaultMaxBlocks
- }
- c.mtx.Lock()
- defer c.mtx.Unlock()
- if len(c.cache) <= max {
- return
- }
- lru := make([]time.Time, 0, len(c.cache))
- for _, b := range c.cache {
- lru = append(lru, b.lastUse)
- }
- sort.Sort(sort.Reverse(timeSlice(lru)))
- threshold := lru[max]
- for loc, b := range c.cache {
- if !b.lastUse.After(threshold) {
- delete(c.cache, loc)
- }
- }
-}
-
-// ReadAt returns data from the cache, first retrieving it from Keep if
-// necessary.
-func (c *BlockCache) ReadAt(upstream arvados.KeepGateway, locator string, p []byte, off int) (int, error) {
- buf, err := c.get(upstream, locator)
- if err != nil {
- return 0, err
- }
- if off > len(buf) {
- return 0, io.ErrUnexpectedEOF
- }
- return copy(p, buf[off:]), nil
-}
-
-// Get a block from the cache, first retrieving it from Keep if
-// necessary.
-func (c *BlockCache) get(upstream arvados.KeepGateway, locator string) ([]byte, error) {
- cacheKey := locator[:32]
- bufsize := BLOCKSIZE
- if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
- datasize, err := strconv.ParseInt(parts[1], 10, 32)
- if err == nil && datasize >= 0 {
- bufsize = int(datasize)
- }
- }
- c.mtx.Lock()
- if c.cache == nil {
- c.cache = make(map[string]*cacheBlock)
- }
- b, ok := c.cache[cacheKey]
- if !ok || b.err != nil {
- b = &cacheBlock{
- fetched: make(chan struct{}),
- lastUse: time.Now(),
- }
- c.cache[cacheKey] = b
- go func() {
- buf := bytes.NewBuffer(make([]byte, 0, bufsize))
- _, err := upstream.BlockRead(context.Background(), arvados.BlockReadOptions{Locator: locator, WriteTo: buf})
- c.mtx.Lock()
- b.data, b.err = buf.Bytes(), err
- c.mtx.Unlock()
- close(b.fetched)
- go c.Sweep()
- }()
- }
- c.mtx.Unlock()
-
- // Wait (with mtx unlocked) for the fetch goroutine to finish,
- // in case it hasn't already.
- <-b.fetched
-
- c.mtx.Lock()
- b.lastUse = time.Now()
- c.mtx.Unlock()
- return b.data, b.err
-}
-
-func (c *BlockCache) Clear() {
- c.mtx.Lock()
- c.cache = nil
- c.mtx.Unlock()
-}
-
-type timeSlice []time.Time
-
-func (ts timeSlice) Len() int { return len(ts) }
-
-func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
-
-func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
-
-type cacheBlock struct {
- data []byte
- err error
- fetched chan struct{}
- lastUse time.Time
-}
diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go
index 65dcd9ac8a..c1bad8557d 100644
--- a/sdk/go/keepclient/collectionreader_test.go
+++ b/sdk/go/keepclient/collectionreader_test.go
@@ -237,14 +237,8 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
}
func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
- // Disable disk cache
- defer func(home string) {
- os.Setenv("HOME", home)
- }(os.Getenv("HOME"))
- os.Setenv("HOME", "")
-
- // Disable memory cache
- s.kc.BlockCache = &BlockCache{}
+ // Disable cache
+ s.kc.gatewayStack = &keepViaHTTP{s.kc}
s.kc.PutB([]byte("foo"))
s.kc.PutB([]byte("bar"))
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index 35c191afe6..eeb187e107 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -67,28 +67,3 @@ func (kvh *keepViaHTTP) LocalLocator(locator string) (string, error) {
}
return loc, nil
}
-
-// keepViaBlockCache implements arvados.KeepGateway by using the given
-// KeepClient's BlockCache with the wrapped KeepGateway.
-//
-// Note the whole KeepClient gets passed in instead of just its
-// cache. This ensures the new BlockCache gets used if it changes
-// after keepViaBlockCache is initialized.
-type keepViaBlockCache struct {
- kc *KeepClient
- arvados.KeepGateway
-}
-
-func (kvbc *keepViaBlockCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
- return kvbc.kc.cache().ReadAt(kvbc.KeepGateway, locator, dst, offset)
-}
-
-func (kvbc *keepViaBlockCache) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
- rdr, _, _, _, err := kvbc.kc.getOrHead("GET", opts.Locator, nil)
- if err != nil {
- return 0, err
- }
- defer rdr.Close()
- n, err := io.Copy(opts.WriteTo, rdr)
- return int(n), err
-}
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 2712e9f3a5..4e935812be 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -110,7 +110,6 @@ type KeepClient struct {
lock sync.RWMutex
HTTPClient HTTPClient
Retries int
- BlockCache *BlockCache
RequestID string
StorageClasses []string
DefaultStorageClasses []string // Set by cluster's exported config
@@ -542,17 +541,6 @@ func (kc *KeepClient) getSortedRoots(locator string) []string {
return found
}
-func (kc *KeepClient) cache() *BlockCache {
- if kc.BlockCache != nil {
- return kc.BlockCache
- }
- return DefaultBlockCache
-}
-
-func (kc *KeepClient) ClearBlockCache() {
- kc.cache().Clear()
-}
-
func (kc *KeepClient) SetStorageClasses(sc []string) {
// make a copy so the caller can't mess with it.
kc.StorageClasses = append([]string{}, sc...)
commit 18541c985f7f19d9c200a592287333fb3fdab38b
Author: Tom Clegg <tom at curii.com>
Date: Tue Dec 19 10:57:27 2023 -0500
20318: Route KeepClient block writes through disk cache.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index 262d612640..35c191afe6 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -6,7 +6,11 @@ package keepclient
import (
"context"
+ "fmt"
"io"
+ "net/http"
+ "strings"
+ "time"
"git.arvados.org/arvados.git/sdk/go/arvados"
)
@@ -41,6 +45,29 @@ func (kvh *keepViaHTTP) BlockRead(ctx context.Context, opts arvados.BlockReadOpt
return int(n), err
}
+func (kvh *keepViaHTTP) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ return kvh.httpBlockWrite(ctx, req)
+}
+
+func (kvh *keepViaHTTP) LocalLocator(locator string) (string, error) {
+ if !strings.Contains(locator, "+R") {
+ // Either it has +A, or it's unsigned and we assume
+ // it's a local locator on a site with signatures
+ // disabled.
+ return locator, nil
+ }
+ sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
+ _, _, url, hdr, err := kvh.KeepClient.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
+ if err != nil {
+ return "", err
+ }
+ loc := hdr.Get("X-Keep-Locator")
+ if loc == "" {
+ return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
+ }
+ return loc, nil
+}
+
// keepViaBlockCache implements arvados.KeepGateway by using the given
// KeepClient's BlockCache with the wrapped KeepGateway.
//
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index cae6ca1545..2712e9f3a5 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -376,22 +376,7 @@ func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
// with a valid signature from the local cluster. If the given locator
// already has a local signature, it is returned unchanged.
func (kc *KeepClient) LocalLocator(locator string) (string, error) {
- if !strings.Contains(locator, "+R") {
- // Either it has +A, or it's unsigned and we assume
- // it's a local locator on a site with signatures
- // disabled.
- return locator, nil
- }
- sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
- _, _, url, hdr, err := kc.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
- if err != nil {
- return "", err
- }
- loc := hdr.Get("X-Keep-Locator")
- if loc == "" {
- return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
- }
- return loc, nil
+ return kc.upstreamGateway().LocalLocator(locator)
}
// Get retrieves a block, given a locator. Returns a reader, the
@@ -412,6 +397,12 @@ func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
return kc.upstreamGateway().ReadAt(locator, p, off)
}
+// BlockWrite writes a full block to upstream servers and saves a copy
+// in the local cache.
+func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ return kc.upstreamGateway().BlockWrite(ctx, req)
+}
+
// Ask verifies that a block with the given hash is available and
// readable, according to at least one Keep service. Unlike Get, it
// does not retrieve the data or verify that the data content matches
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 8d299815b2..c4144bf871 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -127,7 +127,7 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo [
}
}
-func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
var resp arvados.BlockWriteResponse
var getReader func() io.Reader
if req.Data == nil && req.Reader == nil {
commit f68557c627b5a8472d33d973a2737448904c29bd
Author: Tom Clegg <tom at curii.com>
Date: Tue Dec 19 10:47:05 2023 -0500
20318: Don't use memory-backed block cache for short reads.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 18614a77eb..cae6ca1545 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -42,6 +42,9 @@ var (
DefaultProxyConnectTimeout = 30 * time.Second
DefaultProxyTLSHandshakeTimeout = 10 * time.Second
DefaultProxyKeepAlive = 120 * time.Second
+
+ rootCacheDir = "/var/cache/arvados/keep"
+ userCacheDir = ".cache/arvados/keep" // relative to HOME
)
// Error interface with an error and boolean indicating whether the error is temporary
@@ -336,43 +339,37 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
return nil, 0, "", nil, err
}
+// attempt to create dir/subdir/ and its parents, up to but not
+// including dir itself, using mode 0700.
+func makedirs(dir, subdir string) {
+ for _, part := range strings.Split(subdir, string(os.PathSeparator)) {
+ dir = filepath.Join(dir, part)
+ os.Mkdir(dir, 0700)
+ }
+}
+
// upstreamGateway creates/returns the KeepGateway stack used to read
-// and write data: a memory cache, a disk-backed cache if available,
-// and an http backend.
+// and write data: a disk-backed cache on top of an http backend.
func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
kc.lock.Lock()
defer kc.lock.Unlock()
if kc.gatewayStack != nil {
return kc.gatewayStack
}
- var stack arvados.KeepGateway = &keepViaHTTP{kc}
-
- // Wrap with a disk cache, if cache dir is writable
- home := os.Getenv("HOME")
- if fi, err := os.Stat(home); home != "" && err == nil && fi.IsDir() {
- var err error
- dir := home
- for _, part := range []string{".cache", "arvados", "keep"} {
- dir = filepath.Join(dir, part)
- err = os.Mkdir(dir, 0700)
- }
- if err == nil || os.IsExist(err) {
- os.Mkdir(filepath.Join(dir, "tmp"), 0700)
- err = os.WriteFile(filepath.Join(dir, "tmp", "check.tmp"), nil, 0600)
- if err == nil {
- stack = &arvados.DiskCache{
- Dir: dir,
- KeepGateway: stack,
- }
- }
- }
+ var cachedir string
+ if os.Geteuid() == 0 {
+ cachedir = rootCacheDir
+ makedirs("/", cachedir)
+ } else {
+ home := "/" + os.Getenv("HOME")
+ makedirs(home, userCacheDir)
+ cachedir = filepath.Join(home, userCacheDir)
}
- stack = &keepViaBlockCache{
- kc: kc,
- KeepGateway: stack,
+ kc.gatewayStack = &arvados.DiskCache{
+ Dir: cachedir,
+ KeepGateway: &keepViaHTTP{kc},
}
- kc.gatewayStack = stack
- return stack
+ return kc.gatewayStack
}
// LocalLocator returns a locator equivalent to the one supplied, but
commit 4ef866c34bba8bd73c0b1de48fb8c62d4f7d0661
Author: Tom Clegg <tom at curii.com>
Date: Tue Dec 19 10:07:36 2023 -0500
20318: Fail instead of reading through if cache dir is unusable.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 6aed35a215..189332c866 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -213,8 +213,7 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
}
f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
if err != nil {
- cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
- return cache.KeepGateway.ReadAt(locator, dst, offset)
+ return 0, fmt.Errorf("ReadAt: open(%s) failed: %s", cachefilename, err)
}
defer f.Close()
commit 85a85da473af4b66dbc92a5f8882eea8c7ce8ffd
Author: Tom Clegg <tom at curii.com>
Date: Mon Dec 18 20:59:31 2023 -0500
20318: Close held-open filehandles when deleting cache files.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index eb9d4607bf..6aed35a215 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -260,12 +260,33 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
var quickReadAtLostRace = errors.New("quickReadAt: lost race")
-func (cache *DiskCache) deleteHeldopen(cachefilename string, heldopen *openFileEnt) {
+// Remove the cache entry for the indicated cachefilename if it
+// matches expect (quickReadAt() usage), or if expect is nil (tidy()
+// usage).
+//
+// If expect is non-nil, close expect's filehandle.
+//
+// If expect is nil and a different cache entry is deleted, close its
+// filehandle.
+func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
+ needclose := expect
+
cache.heldopenLock.Lock()
- if cache.heldopen[cachefilename] == heldopen {
+ found := cache.heldopen[cachefilename]
+ if found != nil && (expect == nil || expect == found) {
delete(cache.heldopen, cachefilename)
+ needclose = found
}
cache.heldopenLock.Unlock()
+
+ if needclose != nil {
+ needclose.Lock()
+ defer needclose.Unlock()
+ if needclose.f != nil {
+ needclose.f.Close()
+ needclose.f = nil
+ }
+ }
}
// quickReadAt attempts to use a cached-filehandle approach to read
@@ -522,6 +543,7 @@ func (cache *DiskCache) tidy() {
deleted := 0
for _, ent := range ents {
os.Remove(ent.path)
+ go cache.deleteHeldopen(ent.path, nil)
deleted++
totalsize -= ent.size
if totalsize <= maxsize {
commit f9a0922e50904365e40b99372ed66f3a6f992cd7
Author: Tom Clegg <tom at curii.com>
Date: Mon Dec 18 20:19:35 2023 -0500
20318: Test mangling cache files while reading.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 23e7dfbd9f..6d11410470 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"io"
+ "math/rand"
"os"
"sync"
"time"
@@ -131,7 +132,13 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
c.Check(err, check.IsNil)
}
-func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
+ s.testConcurrentReaders(c, true, false)
+}
+func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
+ s.testConcurrentReaders(c, false, true)
+}
+func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
blksize := 64000000
backend := &keepGatewayMemoryBacked{}
cache := DiskCache{
@@ -140,20 +147,61 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
Dir: c.MkDir(),
Logger: ctxlog.TestLogger(c),
}
- ctx := context.Background()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
Data: make([]byte, blksize),
})
c.Check(err, check.IsNil)
- delete(backend.data, resp.Locator)
+ if cannotRefresh {
+ // Delete the block from the backing store, to ensure
+ // the cache doesn't rely on re-reading a block that
+ // it has just written.
+ delete(backend.data, resp.Locator)
+ }
+ if mangleCache {
+ // Replace cache files with truncated files (and
+ // delete them outright) while the ReadAt loop is
+ // running, to ensure the cache can re-fetch from the
+ // backend as needed.
+ var nRemove, nTrunc int
+ defer func() {
+ c.Logf("nRemove %d", nRemove)
+ c.Logf("nTrunc %d", nTrunc)
+ }()
+ go func() {
+ // Truncate/delete the cache file at various
+ // intervals. Readers should re-fetch/recover from
+ // this.
+ fnm := cache.cacheFile(resp.Locator)
+ for ctx.Err() == nil {
+ trunclen := rand.Int63() % int64(blksize*2)
+ if trunclen > int64(blksize) {
+ err := os.Remove(fnm)
+ if err == nil {
+ nRemove++
+ }
+ } else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
+ err := os.Rename(fnm+"#", fnm)
+ if err == nil {
+ nTrunc++
+ }
+ }
+ }
+ }()
+ }
failed := false
var wg sync.WaitGroup
- for offset := 0; offset < blksize; offset += 123456 {
- offset := offset
+ var slots = make(chan bool, 100) // limit concurrency / memory usage
+ for i := 0; i < 20000; i++ {
+ offset := (i * 123456) % blksize
+ slots <- true
wg.Add(1)
go func() {
defer wg.Done()
+ defer func() { <-slots }()
buf := make([]byte, 654321)
if offset+len(buf) > blksize {
buf = buf[:blksize-offset]
commit fcfb6de8652973045d7c188d11817ef2471e7335
Author: Tom Clegg <tom at curii.com>
Date: Mon Dec 18 18:45:51 2023 -0500
20318: Add concurrent and sequential read benchmarks.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 128e47faa2..23e7dfbd9f 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -172,8 +172,73 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
wg.Wait()
}
+var _ = check.Suite(&keepCacheBenchSuite{})
+
+type keepCacheBenchSuite struct {
+ blksize int
+ blkcount int
+ backend *keepGatewayMemoryBacked
+ cache *DiskCache
+ locators []string
+}
+
+func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
+ s.blksize = 64000000
+ s.blkcount = 8
+ s.backend = &keepGatewayMemoryBacked{}
+ s.cache = &DiskCache{
+ KeepGateway: s.backend,
+ MaxSize: int64(s.blksize),
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ s.locators = make([]string, s.blkcount)
+ data := make([]byte, s.blksize)
+ for b := 0; b < s.blkcount; b++ {
+ for i := range data {
+ data[i] = byte(b)
+ }
+ resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: data,
+ })
+ c.Assert(err, check.IsNil)
+ s.locators[b] = resp.Locator
+ }
+}
+
+func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
+ var wg sync.WaitGroup
+ for i := 0; i < c.N; i++ {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ buf := make([]byte, benchReadSize)
+ _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+ if err != nil {
+ c.Fail()
+ }
+ }()
+ }
+ wg.Wait()
+}
+
+func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
+ buf := make([]byte, benchReadSize)
+ for i := 0; i < c.N; i++ {
+ _, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+ if err != nil {
+ c.Fail()
+ }
+ }
+}
+
const benchReadSize = 1000
+var _ = check.Suite(&fileOpsSuite{})
+
+type fileOpsSuite struct{}
+
// BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
// potential performance improvement of caching filehandles rather
// than opening/closing the cache file for each read.
@@ -182,7 +247,7 @@ const benchReadSize = 1000
// improvement: ~636 MB/s when opening/closing the file for each
// 1000-byte read vs. ~2 GB/s when opening the file once and doing
// concurrent reads using the same file descriptor.
-func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
+func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
fnm := c.MkDir() + "/testfile"
os.WriteFile(fnm, make([]byte, 64000000), 0700)
var wg sync.WaitGroup
@@ -206,7 +271,7 @@ func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
wg.Wait()
}
-func (s *keepCacheSuite) BenchmarkKeepOpen(c *check.C) {
+func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
fnm := c.MkDir() + "/testfile"
os.WriteFile(fnm, make([]byte, 64000000), 0700)
f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
commit 6a54de705f1e129566ee7f5101fe5cbe3dbba548
Author: Tom Clegg <tom at curii.com>
Date: Thu Dec 14 20:17:04 2023 -0500
20318: Try to keep cache files open for subsequent/concurrent reads.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 4dabbd3506..eb9d4607bf 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -17,6 +17,7 @@ import (
"sort"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"syscall"
"time"
@@ -42,6 +43,19 @@ type DiskCache struct {
tidying int32 // see tidy()
tidyHoldUntil time.Time
defaultMaxSize int64
+
+ // The "heldopen" fields are used to open cache files for
+ // reading, and leave them open for future/concurrent ReadAt
+ // operations. See quickReadAt.
+ heldopen map[string]*openFileEnt
+ heldopenMax int
+ heldopenLock sync.Mutex
+}
+
+type openFileEnt struct {
+ sync.RWMutex
+ f *os.File
+ err error // if err is non-nil, f should not be used.
}
const (
@@ -194,6 +208,9 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
cache.gotidy()
cachefilename := cache.cacheFile(locator)
+ if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
+ return n, err
+ }
f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
if err != nil {
cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
@@ -241,8 +258,114 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
return f.ReadAt(dst, int64(offset))
}
-// ReadAt reads the entire block from the wrapped KeepGateway into the
-// cache if needed, and writes it to the provided writer.
+var quickReadAtLostRace = errors.New("quickReadAt: lost race")
+
+func (cache *DiskCache) deleteHeldopen(cachefilename string, heldopen *openFileEnt) {
+ cache.heldopenLock.Lock()
+ if cache.heldopen[cachefilename] == heldopen {
+ delete(cache.heldopen, cachefilename)
+ }
+ cache.heldopenLock.Unlock()
+}
+
+// quickReadAt attempts to use a cached-filehandle approach to read
+// from the indicated file. The expectation is that the caller
+// (ReadAt) will try a more robust approach when this fails, so
+// quickReadAt doesn't try especially hard to ensure success in
+// races. In particular, when there are concurrent calls, and one
+// fails, that can cause others to fail too.
+func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
+ isnew := false
+ cache.heldopenLock.Lock()
+ if cache.heldopenMax == 0 {
+ // Choose a reasonable limit on open cache files based
+ // on RLIMIT_NOFILE. Note Go automatically raises
+ // softlimit to hardlimit, so it's typically 1048576,
+ // not 1024.
+ lim := syscall.Rlimit{}
+ err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
+ if err != nil {
+ cache.heldopenMax = 256
+ } else if lim.Cur > 40000 {
+ cache.heldopenMax = 10000
+ } else {
+ cache.heldopenMax = int(lim.Cur / 4)
+ }
+ }
+ heldopen := cache.heldopen[cachefilename]
+ if heldopen == nil {
+ isnew = true
+ heldopen = &openFileEnt{}
+ if cache.heldopen == nil {
+ cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
+ } else if len(cache.heldopen) > cache.heldopenMax {
+ // Rather than go to the trouble of tracking
+ // last access time, just close all files, and
+ // open again as needed. Even in the worst
+ // pathological case, this causes one extra
+ // open+close per read, which is not
+ // especially bad (see benchmarks).
+ go func(m map[string]*openFileEnt) {
+ for _, heldopen := range m {
+ heldopen.Lock()
+ defer heldopen.Unlock()
+ if heldopen.f != nil {
+ heldopen.f.Close()
+ heldopen.f = nil
+ }
+ }
+ }(cache.heldopen)
+ cache.heldopen = nil
+ }
+ cache.heldopen[cachefilename] = heldopen
+ heldopen.Lock()
+ }
+ cache.heldopenLock.Unlock()
+
+ if isnew {
+ // Open and flock the file, then call wg.Done() to
+ // unblock any other goroutines that are waiting in
+ // the !isnew case above.
+ f, err := os.Open(cachefilename)
+ if err == nil {
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+ if err == nil {
+ heldopen.f = f
+ } else {
+ f.Close()
+ }
+ }
+ if err != nil {
+ heldopen.err = err
+ go cache.deleteHeldopen(cachefilename, heldopen)
+ }
+ heldopen.Unlock()
+ }
+ // Acquire read lock to ensure (1) initialization is complete,
+ // if it's done by a different goroutine, and (2) any "delete
+ // old/unused entries" waits for our read to finish before
+ // closing the file.
+ heldopen.RLock()
+ defer heldopen.RUnlock()
+ if heldopen.err != nil {
+ // Other goroutine encountered an error during setup
+ return 0, heldopen.err
+ } else if heldopen.f == nil {
+ // Other goroutine closed the file before we got RLock
+ return 0, quickReadAtLostRace
+ }
+ n, err := heldopen.f.ReadAt(dst, int64(offset))
+ if err != nil {
+ // wait for any concurrent users to finish, then
+ // delete this cache entry in case reopening the
+ // backing file helps.
+ go cache.deleteHeldopen(cachefilename, heldopen)
+ }
+ return n, err
+}
+
+// BlockRead reads the entire block from the wrapped KeepGateway into
+// the cache if needed, and writes it to the provided writer.
func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
cache.gotidy()
cachefilename := cache.cacheFile(opts.Locator)
commit cdb63c3e5b6f11bfcb8244614d8a6fd309fbafce
Author: Tom Clegg <tom at curii.com>
Date: Thu Dec 14 20:12:41 2023 -0500
20318: Use const for cache-tidy hold timer.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index b9c7fea4b0..4dabbd3506 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -44,9 +44,10 @@ type DiskCache struct {
defaultMaxSize int64
}
-var (
- cacheFileSuffix = ".keepcacheblock"
- tmpFileSuffix = ".tmp"
+const (
+ cacheFileSuffix = ".keepcacheblock"
+ tmpFileSuffix = ".tmp"
+ tidyHoldDuration = 10 * time.Second
)
func (cache *DiskCache) cacheFile(locator string) string {
@@ -306,7 +307,7 @@ func (cache *DiskCache) gotidy() {
}
go func() {
cache.tidy()
- cache.tidyHoldUntil = time.Now().Add(10 * time.Second)
+ cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
atomic.AddInt32(&cache.tidying, -1)
}()
}
commit 01ecf2938246c47dda5cdf667c4dcf29f49693be
Author: Tom Clegg <tom at curii.com>
Date: Wed Nov 29 15:06:36 2023 -0500
20318: Benchmark open/close vs. shared fd.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 6cc5a2b8dd..128e47faa2 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"io"
+ "os"
"sync"
"time"
@@ -170,3 +171,62 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
}
wg.Wait()
}
+
+const benchReadSize = 1000
+
+// BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
+// potential performance improvement of caching filehandles rather
+// than opening/closing the cache file for each read.
+//
+// Results from a development machine indicate a ~3x throughput
+// improvement: ~636 MB/s when opening/closing the file for each
+// 1000-byte read vs. ~2 GB/s when opening the file once and doing
+// concurrent reads using the same file descriptor.
+func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
+ fnm := c.MkDir() + "/testfile"
+ os.WriteFile(fnm, make([]byte, 64000000), 0700)
+ var wg sync.WaitGroup
+ for i := 0; i < c.N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ f.Close()
+ }()
+ }
+ wg.Wait()
+}
+
+func (s *keepCacheSuite) BenchmarkKeepOpen(c *check.C) {
+ fnm := c.MkDir() + "/testfile"
+ os.WriteFile(fnm, make([]byte, 64000000), 0700)
+ f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ var wg sync.WaitGroup
+ for i := 0; i < c.N; i++ {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+ if err != nil {
+ c.Fail()
+ return
+ }
+ }()
+ }
+ wg.Wait()
+ f.Close()
+}
commit ef1a56aa4c6aff593767fcaba693ebc042df5d4b
Author: Tom Clegg <tom at curii.com>
Date: Mon Nov 27 16:25:56 2023 -0500
20318: Add filesystem cache tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
new file mode 100644
index 0000000000..6cc5a2b8dd
--- /dev/null
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -0,0 +1,172 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&keepCacheSuite{})
+
+type keepCacheSuite struct {
+}
+
+type keepGatewayBlackHole struct {
+}
+
+func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ return 0, errors.New("block not found")
+}
+func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+ return 0, errors.New("block not found")
+}
+func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
+ return locator, nil
+}
+func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ h := md5.New()
+ var size int64
+ if opts.Reader == nil {
+ size, _ = io.Copy(h, bytes.NewReader(opts.Data))
+ } else {
+ size, _ = io.Copy(h, opts.Reader)
+ }
+ return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
+}
+
+type keepGatewayMemoryBacked struct {
+ mtx sync.RWMutex
+ data map[string][]byte
+}
+
+func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ k.mtx.RLock()
+ data := k.data[locator]
+ k.mtx.RUnlock()
+ if data == nil {
+ return 0, errors.New("block not found: " + locator)
+ }
+ var n int
+ if len(data) > offset {
+ n = copy(dst, data[offset:])
+ }
+ if n < len(dst) {
+ return n, io.EOF
+ }
+ return n, nil
+}
+func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+ k.mtx.RLock()
+ data := k.data[opts.Locator]
+ k.mtx.RUnlock()
+ if data == nil {
+ return 0, errors.New("block not found: " + opts.Locator)
+ }
+ return opts.WriteTo.Write(data)
+}
+func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
+ return locator, nil
+}
+func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ h := md5.New()
+ data := bytes.NewBuffer(nil)
+ if opts.Reader == nil {
+ data.Write(opts.Data)
+ h.Write(data.Bytes())
+ } else {
+ io.Copy(io.MultiWriter(h, data), opts.Reader)
+ }
+ locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
+ k.mtx.Lock()
+ if k.data == nil {
+ k.data = map[string][]byte{}
+ }
+ k.data[locator] = data.Bytes()
+ k.mtx.Unlock()
+ return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
+}
+
+func (s *keepCacheSuite) TestMaxSize(c *check.C) {
+ backend := &keepGatewayMemoryBacked{}
+ cache := DiskCache{
+ KeepGateway: backend,
+ MaxSize: 40000000,
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ ctx := context.Background()
+ resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
+ Data: make([]byte, 44000000),
+ })
+ c.Check(err, check.IsNil)
+ time.Sleep(time.Millisecond)
+ resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
+ Data: make([]byte, 32000000),
+ })
+ c.Check(err, check.IsNil)
+ delete(backend.data, resp1.Locator)
+ delete(backend.data, resp2.Locator)
+ cache.tidyHoldUntil = time.Time{}
+ cache.tidy()
+
+ n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
+
+ n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
+ c.Check(n > 0, check.Equals, true)
+ c.Check(err, check.IsNil)
+}
+
+func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+ blksize := 64000000
+ backend := &keepGatewayMemoryBacked{}
+ cache := DiskCache{
+ KeepGateway: backend,
+ MaxSize: int64(blksize),
+ Dir: c.MkDir(),
+ Logger: ctxlog.TestLogger(c),
+ }
+ ctx := context.Background()
+ resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
+ Data: make([]byte, blksize),
+ })
+ c.Check(err, check.IsNil)
+ delete(backend.data, resp.Locator)
+
+ failed := false
+ var wg sync.WaitGroup
+ for offset := 0; offset < blksize; offset += 123456 {
+ offset := offset
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ buf := make([]byte, 654321)
+ if offset+len(buf) > blksize {
+ buf = buf[:blksize-offset]
+ }
+ n, err := cache.ReadAt(resp.Locator, buf, offset)
+ if failed {
+ // don't fill logs with subsequent errors
+ return
+ }
+ if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
+ failed = true
+ }
+ c.Assert(n, check.Equals, len(buf))
+ }()
+ }
+ wg.Wait()
+}
diff --git a/sdk/go/arvadostest/api.go b/sdk/go/arvadostest/api.go
index 4e214414d7..706e49542f 100644
--- a/sdk/go/arvadostest/api.go
+++ b/sdk/go/arvadostest/api.go
@@ -356,6 +356,26 @@ func (as *APIStub) APIClientAuthorizationGet(ctx context.Context, options arvado
as.appendCall(ctx, as.APIClientAuthorizationGet, options)
return arvados.APIClientAuthorization{}, as.Error
}
+func (as *APIStub) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ as.appendCall(context.TODO(), as.ReadAt, struct {
+ locator string
+ dst []byte
+ offset int
+ }{locator, dst, offset})
+ return 0, as.Error
+}
+func (as *APIStub) BlockRead(ctx context.Context, options arvados.BlockReadOptions) (int, error) {
+ as.appendCall(ctx, as.BlockRead, options)
+ return 0, as.Error
+}
+func (as *APIStub) BlockWrite(ctx context.Context, options arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ as.appendCall(ctx, as.BlockWrite, options)
+ return arvados.BlockWriteResponse{}, as.Error
+}
+func (as *APIStub) LocalLocator(locator string) (int, error) {
+ as.appendCall(context.TODO(), as.LocalLocator, locator)
+ return 0, as.Error
+}
func (as *APIStub) appendCall(ctx context.Context, method interface{}, options interface{}) {
as.mtx.Lock()
diff --git a/sdk/go/arvadostest/keep_stub.go b/sdk/go/arvadostest/keep_stub.go
new file mode 100644
index 0000000000..ddfa3909bb
--- /dev/null
+++ b/sdk/go/arvadostest/keep_stub.go
@@ -0,0 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+type KeepStub struct{}
commit 52fa01051f4633e3bbbfcdf8c55994e7cd91212a
Author: Tom Clegg <tom at curii.com>
Date: Mon Nov 27 11:20:09 2023 -0500
20318: Add disk-backed block cache.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go
index f4ac1ab3c4..2a527567e5 100644
--- a/sdk/go/arvados/api.go
+++ b/sdk/go/arvados/api.go
@@ -240,11 +240,16 @@ type LogoutOptions struct {
ReturnTo string `json:"return_to"` // Redirect to this URL after logging out
}
+type BlockReadOptions struct {
+ Locator string
+ WriteTo io.Writer
+}
+
type BlockWriteOptions struct {
Hash string
Data []byte
- Reader io.Reader
- DataSize int // Must be set if Data is nil.
+ Reader io.Reader // Must be set if Data is nil.
+ DataSize int // Must be set if Data is nil.
RequestID string
StorageClasses []string
Replicas int
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
new file mode 100644
index 0000000000..b9c7fea4b0
--- /dev/null
+++ b/sdk/go/arvados/keep_cache.go
@@ -0,0 +1,414 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "golang.org/x/sys/unix"
+)
+
+type KeepGateway interface {
+ ReadAt(locator string, dst []byte, offset int) (int, error)
+ BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
+ BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
+ LocalLocator(locator string) (string, error)
+}
+
+// DiskCache wraps KeepGateway, adding a disk-based cache layer.
+type DiskCache struct {
+ KeepGateway
+ Dir string
+ MaxSize int64
+ Logger logrus.FieldLogger
+
+ tidying int32 // see tidy()
+ tidyHoldUntil time.Time
+ defaultMaxSize int64
+}
+
+var (
+ cacheFileSuffix = ".keepcacheblock"
+ tmpFileSuffix = ".tmp"
+)
+
+func (cache *DiskCache) cacheFile(locator string) string {
+ hash := locator
+ if i := strings.Index(hash, "+"); i > 0 {
+ hash = hash[:i]
+ }
+ return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+}
+
+// Open a cache file, creating the parent dir if necessary.
+func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
+ f, err := os.OpenFile(name, flags, 0600)
+ if os.IsNotExist(err) {
+ // Create the parent dir and try again. (We could have
+ // checked/created the parent dir before, but that
+ // would be less efficient in the much more common
+ // situation where it already exists.)
+ parent, _ := filepath.Split(name)
+ os.Mkdir(parent, 0700)
+ f, err = os.OpenFile(name, flags, 0600)
+ }
+ return f, err
+}
+
+// Rename a file, creating the new path's parent dir if necessary.
+func (cache *DiskCache) rename(old, new string) error {
+ if nil == os.Rename(old, new) {
+ return nil
+ }
+ parent, _ := filepath.Split(new)
+ os.Mkdir(parent, 0700)
+ return os.Rename(old, new)
+}
+
+func (cache *DiskCache) debugf(format string, args ...interface{}) {
+ logger := cache.Logger
+ if logger == nil {
+ return
+ }
+ logger.Debugf(format, args...)
+}
+
+// BlockWrite writes through to the wrapped KeepGateway, and (if
+// possible) retains a copy of the written block in the cache.
+func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ cache.gotidy()
+ unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
+ tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+ tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
+ if err != nil {
+ cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
+ return cache.KeepGateway.BlockWrite(ctx, opts)
+ }
+
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ copyerr := make(chan error, 1)
+
+ // Start a goroutine to copy the caller's source data to
+ // tmpfile, a hash checker, and (via pipe) the wrapped
+ // KeepGateway.
+ pipereader, pipewriter := io.Pipe()
+ defer pipereader.Close()
+ go func() {
+ defer tmpfile.Close()
+ defer os.Remove(tmpfilename)
+ defer pipewriter.Close()
+
+ // Copy from opts.Data or opts.Reader, depending on
+ // which was provided.
+ var src io.Reader
+ if opts.Data != nil {
+ src = bytes.NewReader(opts.Data)
+ } else {
+ src = opts.Reader
+ }
+
+ hashcheck := md5.New()
+ n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
+ if err != nil {
+ copyerr <- err
+ cancel()
+ return
+ } else if opts.DataSize > 0 && opts.DataSize != int(n) {
+ copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
+ cancel()
+ return
+ }
+ err = tmpfile.Close()
+ if err != nil {
+ // Don't rename tmpfile into place, but allow
+ // the BlockWrite call to succeed if nothing
+ // else goes wrong.
+ return
+ }
+ hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
+ if opts.Hash != "" && opts.Hash != hash {
+ // Even if the wrapped KeepGateway doesn't
+ // notice a problem, this should count as an
+ // error.
+ copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
+ cancel()
+ return
+ }
+ cachefilename := cache.cacheFile(hash)
+ err = cache.rename(tmpfilename, cachefilename)
+ if err != nil {
+ cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
+ }
+ }()
+
+ // Write through to the wrapped KeepGateway from the pipe,
+ // instead of the original reader.
+ newopts := opts
+ if newopts.DataSize == 0 {
+ newopts.DataSize = len(newopts.Data)
+ }
+ newopts.Reader = pipereader
+ newopts.Data = nil
+
+ resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
+ if len(copyerr) > 0 {
+ // If the copy-to-pipe goroutine failed, that error
+ // will be more helpful than the resulting "context
+ // canceled" or "read [from pipereader] failed" error
+ // seen by the wrapped KeepGateway.
+ //
+ // If the wrapped KeepGateway encounters an error
+ // before all the data is copied into the pipe, it
+ // stops reading from the pipe, which causes the
+ // io.Copy() in the goroutine to block until our
+ // deferred pipereader.Close() call runs. In that case
+ // len(copyerr)==0 here, so the wrapped KeepGateway
+ // error is the one we return to our caller.
+ err = <-copyerr
+ }
+ return resp, err
+}
+
+// ReadAt reads the entire block from the wrapped KeepGateway into the
+// cache if needed, and copies the requested portion into the provided
+// slice.
+func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ cache.gotidy()
+ cachefilename := cache.cacheFile(locator)
+ f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+ if err != nil {
+ cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
+ return cache.KeepGateway.ReadAt(locator, dst, offset)
+ }
+ defer f.Close()
+
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+ if err != nil {
+ return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+ }
+
+ size, err := f.Seek(0, io.SeekEnd)
+ if err != nil {
+ return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
+ }
+ if size < int64(len(dst)+offset) {
+ // The cache file seems to be truncated or empty
+ // (possibly because we just created it). Wait for an
+ // exclusive lock, then check again (in case someone
+ // else is doing the same thing) before trying to
+ // retrieve the entire block.
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+ if err != nil {
+ return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
+ }
+ }
+ size, err = f.Seek(0, io.SeekEnd)
+ if err != nil {
+ return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
+ }
+ if size < int64(len(dst)+offset) {
+ // The cache file is truncated or empty, and we own it
+ // now. Fill it.
+ _, err = f.Seek(0, io.SeekStart)
+ if err != nil {
+ return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
+ }
+ n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
+ if err != nil {
+ return 0, err
+ }
+ f.Truncate(int64(n))
+ }
+ return f.ReadAt(dst, int64(offset))
+}
+
+// ReadAt reads the entire block from the wrapped KeepGateway into the
+// cache if needed, and writes it to the provided writer.
+func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+ cache.gotidy()
+ cachefilename := cache.cacheFile(opts.Locator)
+ f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+ if err != nil {
+ cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
+ return cache.KeepGateway.BlockRead(ctx, opts)
+ }
+ defer f.Close()
+
+ i := strings.Index(opts.Locator, "+")
+ if i < 0 || i >= len(opts.Locator) {
+ return 0, errors.New("invalid block locator: no size hint")
+ }
+ sizestr := opts.Locator[i+1:]
+ i = strings.Index(sizestr, "+")
+ if i > 0 {
+ sizestr = sizestr[:i]
+ }
+ blocksize, err := strconv.ParseInt(sizestr, 10, 32)
+ if err != nil || blocksize < 0 {
+ return 0, errors.New("invalid block locator: invalid size hint")
+ }
+
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+ if err != nil {
+ return 0, err
+ }
+ filesize, err := f.Seek(0, io.SeekEnd)
+ if err != nil {
+ return 0, err
+ }
+ _, err = f.Seek(0, io.SeekStart)
+ if err != nil {
+ return 0, err
+ }
+ if filesize == blocksize {
+ n, err := io.Copy(opts.WriteTo, f)
+ return int(n), err
+ }
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+ if err != nil {
+ return 0, err
+ }
+ opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
+ n, err := cache.KeepGateway.BlockRead(ctx, opts)
+ if err != nil {
+ return int(n), err
+ }
+ f.Truncate(int64(n))
+ return n, nil
+}
+
+// Start a tidy() goroutine, unless one is already running / recently
+// finished.
+func (cache *DiskCache) gotidy() {
+ // Return quickly if another tidy goroutine is running in this process.
+ n := atomic.AddInt32(&cache.tidying, 1)
+ if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+ atomic.AddInt32(&cache.tidying, -1)
+ return
+ }
+ go func() {
+ cache.tidy()
+ cache.tidyHoldUntil = time.Now().Add(10 * time.Second)
+ atomic.AddInt32(&cache.tidying, -1)
+ }()
+}
+
+// Delete cache files as needed to control disk usage.
+func (cache *DiskCache) tidy() {
+ maxsize := cache.MaxSize
+ if maxsize < 1 {
+ if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+ var stat unix.Statfs_t
+ if nil == unix.Statfs(cache.Dir, &stat) {
+ maxsize = int64(stat.Bavail) * stat.Bsize / 10
+ }
+ atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+ }
+ }
+
+ // Bail if a tidy goroutine is running in a different process.
+ lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+ if err != nil {
+ return
+ }
+ defer lockfile.Close()
+ err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+ if err != nil {
+ return
+ }
+
+ type entT struct {
+ path string
+ atime time.Time
+ size int64
+ }
+ var ents []entT
+ var totalsize int64
+ filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+ if err != nil {
+ cache.debugf("tidy: skipping dir %s: %s", path, err)
+ return nil
+ }
+ if info.IsDir() {
+ return nil
+ }
+ if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
+ return nil
+ }
+ var atime time.Time
+ if stat, ok := info.Sys().(*syscall.Stat_t); ok {
+ // Access time is available (hopefully the
+ // filesystem is not mounted with noatime)
+ atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
+ } else {
+ // If access time isn't available we fall back
+ // to sorting by modification time.
+ atime = info.ModTime()
+ }
+ ents = append(ents, entT{path, atime, info.Size()})
+ totalsize += info.Size()
+ return nil
+ })
+ if cache.Logger != nil {
+ cache.Logger.WithFields(logrus.Fields{
+ "totalsize": totalsize,
+ "maxsize": maxsize,
+ }).Debugf("DiskCache: checked current cache usage")
+ }
+
+ // If MaxSize wasn't specified and we failed to come up with a
+ // defaultSize above, use the larger of {current cache size, 1
+ // GiB} as the defaultSize for subsequent tidy() operations.
+ if maxsize == 0 {
+ if totalsize < 1<<30 {
+ atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
+ } else {
+ atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
+ }
+ cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
+ return
+ }
+
+ if totalsize <= maxsize {
+ return
+ }
+
+ // Delete oldest entries until totalsize < maxsize.
+ sort.Slice(ents, func(i, j int) bool {
+ return ents[i].atime.Before(ents[j].atime)
+ })
+ deleted := 0
+ for _, ent := range ents {
+ os.Remove(ent.path)
+ deleted++
+ totalsize -= ent.size
+ if totalsize <= maxsize {
+ break
+ }
+ }
+
+ if cache.Logger != nil {
+ cache.Logger.WithFields(logrus.Fields{
+ "deleted": deleted,
+ "totalsize": totalsize,
+ }).Debugf("DiskCache: remaining cache usage after deleting")
+ }
+}
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index 89eecc6e27..37eee4de20 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -5,13 +5,16 @@
package keepclient
import (
- "fmt"
+ "bytes"
+ "context"
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
)
var DefaultBlockCache = &BlockCache{}
@@ -54,8 +57,8 @@ func (c *BlockCache) Sweep() {
// ReadAt returns data from the cache, first retrieving it from Keep if
// necessary.
-func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
- buf, err := c.Get(kc, locator)
+func (c *BlockCache) ReadAt(upstream arvados.KeepGateway, locator string, p []byte, off int) (int, error) {
+ buf, err := c.get(upstream, locator)
if err != nil {
return 0, err
}
@@ -65,9 +68,9 @@ func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (
return copy(p, buf[off:]), nil
}
-// Get returns data from the cache, first retrieving it from Keep if
+// Get a block from the cache, first retrieving it from Keep if
// necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) get(upstream arvados.KeepGateway, locator string) ([]byte, error) {
cacheKey := locator[:32]
bufsize := BLOCKSIZE
if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
@@ -88,21 +91,10 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
}
c.cache[cacheKey] = b
go func() {
- rdr, size, _, err := kc.Get(locator)
- var data []byte
- if err == nil {
- data = make([]byte, size, bufsize)
- _, err = io.ReadFull(rdr, data)
- err2 := rdr.Close()
- if err == nil && err2 != nil {
- err = fmt.Errorf("close(): %w", err2)
- }
- if err != nil {
- err = fmt.Errorf("Get %s: %w", locator, err)
- }
- }
+ buf := bytes.NewBuffer(make([]byte, 0, bufsize))
+ _, err := upstream.BlockRead(context.Background(), arvados.BlockReadOptions{Locator: locator, WriteTo: buf})
c.mtx.Lock()
- b.data, b.err = data, err
+ b.data, b.err = buf.Bytes(), err
c.mtx.Unlock()
close(b.fetched)
go c.Sweep()
diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go
index 75603f1baa..65dcd9ac8a 100644
--- a/sdk/go/keepclient/collectionreader_test.go
+++ b/sdk/go/keepclient/collectionreader_test.go
@@ -237,7 +237,15 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
}
func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+ // Disable disk cache
+ defer func(home string) {
+ os.Setenv("HOME", home)
+ }(os.Getenv("HOME"))
+ os.Setenv("HOME", "")
+
+ // Disable memory cache
s.kc.BlockCache = &BlockCache{}
+
s.kc.PutB([]byte("foo"))
s.kc.PutB([]byte("bar"))
s.kc.PutB([]byte("baz"))
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
new file mode 100644
index 0000000000..262d612640
--- /dev/null
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -0,0 +1,67 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package keepclient
+
+import (
+ "context"
+ "io"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+)
+
+// keepViaHTTP implements arvados.KeepGateway by using a KeepClient to
+// do upstream requests to keepstore and keepproxy.
+type keepViaHTTP struct {
+ *KeepClient
+}
+
+func (kvh *keepViaHTTP) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ rdr, _, _, _, err := kvh.getOrHead("GET", locator, nil)
+ if err != nil {
+ return 0, err
+ }
+ defer rdr.Close()
+ _, err = io.CopyN(io.Discard, rdr, int64(offset))
+ if err != nil {
+ return 0, err
+ }
+ n, err := rdr.Read(dst)
+ return int(n), err
+}
+
+func (kvh *keepViaHTTP) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+ rdr, _, _, _, err := kvh.getOrHead("GET", opts.Locator, nil)
+ if err != nil {
+ return 0, err
+ }
+ defer rdr.Close()
+ n, err := io.Copy(opts.WriteTo, rdr)
+ return int(n), err
+}
+
+// keepViaBlockCache implements arvados.KeepGateway by using the given
+// KeepClient's BlockCache with the wrapped KeepGateway.
+//
+// Note the whole KeepClient gets passed in instead of just its
+// cache. This ensures the new BlockCache gets used if it changes
+// after keepViaBlockCache is initialized.
+type keepViaBlockCache struct {
+ kc *KeepClient
+ arvados.KeepGateway
+}
+
+func (kvbc *keepViaBlockCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ return kvbc.kc.cache().ReadAt(kvbc.KeepGateway, locator, dst, offset)
+}
+
+func (kvbc *keepViaBlockCache) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+ rdr, _, _, _, err := kvbc.kc.getOrHead("GET", opts.Locator, nil)
+ if err != nil {
+ return 0, err
+ }
+ defer rdr.Close()
+ n, err := io.Copy(opts.WriteTo, rdr)
+ return int(n), err
+}
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 68ac886ddd..18614a77eb 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -16,6 +16,8 @@ import (
"io/ioutil"
"net"
"net/http"
+ "os"
+ "path/filepath"
"regexp"
"strconv"
"strings"
@@ -118,6 +120,8 @@ type KeepClient struct {
// Disable automatic discovery of keep services
disableDiscovery bool
+
+ gatewayStack arvados.KeepGateway
}
func (kc *KeepClient) loadDefaultClasses() error {
@@ -332,6 +336,45 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
return nil, 0, "", nil, err
}
+// upstreamGateway creates/returns the KeepGateway stack used to read
+// and write data: a memory cache, a disk-backed cache if available,
+// and an http backend.
+func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ if kc.gatewayStack != nil {
+ return kc.gatewayStack
+ }
+ var stack arvados.KeepGateway = &keepViaHTTP{kc}
+
+ // Wrap with a disk cache, if cache dir is writable
+ home := os.Getenv("HOME")
+ if fi, err := os.Stat(home); home != "" && err == nil && fi.IsDir() {
+ var err error
+ dir := home
+ for _, part := range []string{".cache", "arvados", "keep"} {
+ dir = filepath.Join(dir, part)
+ err = os.Mkdir(dir, 0700)
+ }
+ if err == nil || os.IsExist(err) {
+ os.Mkdir(filepath.Join(dir, "tmp"), 0700)
+ err = os.WriteFile(filepath.Join(dir, "tmp", "check.tmp"), nil, 0600)
+ if err == nil {
+ stack = &arvados.DiskCache{
+ Dir: dir,
+ KeepGateway: stack,
+ }
+ }
+ }
+ }
+ stack = &keepViaBlockCache{
+ kc: kc,
+ KeepGateway: stack,
+ }
+ kc.gatewayStack = stack
+ return stack
+}
+
// LocalLocator returns a locator equivalent to the one supplied, but
// with a valid signature from the local cluster. If the given locator
// already has a local signature, it is returned unchanged.
@@ -369,7 +412,7 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
// ReadAt retrieves a portion of block from the cache if it's
// present, otherwise from the network.
func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
- return kc.cache().ReadAt(kc, locator, p, off)
+ return kc.upstreamGateway().ReadAt(locator, p, off)
}
// Ask verifies that a block with the given hash is available and
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list