[arvados] created: 2.7.0-5429-gdbd3e4e8a3

git repository hosting git at public.arvados.org
Wed Dec 20 21:00:20 UTC 2023


        at  dbd3e4e8a3216dcf4942ded00a546649777245c2 (commit)


commit dbd3e4e8a3216dcf4942ded00a546649777245c2
Author: Tom Clegg <tom at curii.com>
Date:   Wed Dec 20 14:37:28 2023 -0500

    20318: Test hash check on block write.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 5fe1a6a08e..e4d1790cac 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -114,6 +114,36 @@ func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWrit
 	return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
 }
 
+func (s *keepCacheSuite) TestBlockWrite(c *check.C) {
+	backend := &keepGatewayMemoryBacked{}
+	cache := DiskCache{
+		KeepGateway: backend,
+		MaxSize:     40000000,
+		Dir:         c.MkDir(),
+		Logger:      ctxlog.TestLogger(c),
+	}
+	ctx := context.Background()
+	real, err := cache.BlockWrite(ctx, BlockWriteOptions{
+		Data: make([]byte, 100000),
+	})
+	c.Assert(err, check.IsNil)
+
+	// Write different data but supply the same hash. Should be
+	// rejected (even though our fake backend doesn't notice).
+	_, err = cache.BlockWrite(ctx, BlockWriteOptions{
+		Hash: real.Locator[:32],
+		Data: make([]byte, 10),
+	})
+	c.Check(err, check.ErrorMatches, `block hash .+ did not match provided hash .+`)
+
+	// Ensure the bogus write didn't overwrite (or delete) the
+	// real cached data associated with that hash.
+	delete(backend.data, real.Locator)
+	n, err := cache.ReadAt(real.Locator, make([]byte, 100), 0)
+	c.Check(n, check.Equals, 100)
+	c.Check(err, check.IsNil)
+}
+
 func (s *keepCacheSuite) TestMaxSize(c *check.C) {
 	backend := &keepGatewayMemoryBacked{}
 	cache := DiskCache{

commit a32cbc86cef9c05cc63a4bd749553c13befff730
Author: Tom Clegg <tom at curii.com>
Date:   Wed Dec 20 13:33:56 2023 -0500

    20318: Track estimated cache usage, and tidy more diligently.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index b366d6f1b0..af80daa2e0 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -58,6 +58,9 @@ type DiskCache struct {
 	writing     map[string]*writeprogress
 	writingCond *sync.Cond
 	writingLock sync.Mutex
+
+	sizeMeasured  int64 // actual size on disk after last tidy(); zero if not measured yet
+	sizeEstimated int64 // last measured size, plus files we have written since
 }
 
 type writeprogress struct {
@@ -76,7 +79,7 @@ type openFileEnt struct {
 const (
 	cacheFileSuffix  = ".keepcacheblock"
 	tmpFileSuffix    = ".tmp"
-	tidyHoldDuration = 10 * time.Second
+	tidyHoldDuration = 5 * time.Minute // time to re-check cache size even if estimated size is below max
 )
 
 func (cache *DiskCache) cacheFile(locator string) string {
@@ -123,7 +126,6 @@ func (cache *DiskCache) debugf(format string, args ...interface{}) {
 // BlockWrite writes through to the wrapped KeepGateway, and (if
 // possible) retains a copy of the written block in the cache.
 func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
-	cache.gotidy()
 	unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
 	tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
 	tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
@@ -187,6 +189,8 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
 		if err != nil {
 			cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
 		}
+		atomic.AddInt64(&cache.sizeEstimated, int64(n))
+		cache.gotidy()
 	}()
 
 	// Write through to the wrapped KeepGateway from the pipe,
@@ -231,7 +235,6 @@ func (fw funcwriter) Write(p []byte) (int, error) {
 // cache. The remainder of the block may continue to be copied into
 // the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
-	cache.gotidy()
 	cachefilename := cache.cacheFile(locator)
 	if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
 		return n, err
@@ -305,6 +308,8 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 					}
 					return n, err
 				})})
+			atomic.AddInt64(&cache.sizeEstimated, int64(size))
+			cache.gotidy()
 		}()
 	}
 	progress.cond.L.Lock()
@@ -519,9 +524,18 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
 // Start a tidy() goroutine, unless one is already running / recently
 // finished.
 func (cache *DiskCache) gotidy() {
-	// Return quickly if another tidy goroutine is running in this process.
+	// Skip if another tidy goroutine is running in this process.
 	n := atomic.AddInt32(&cache.tidying, 1)
-	if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+	if n != 1 {
+		atomic.AddInt32(&cache.tidying, -1)
+		return
+	}
+	// Skip if sizeEstimated is based on an actual measurement and
+	// is below MaxSize, and we haven't reached the "recheck
+	// anyway" time threshold.
+	if cache.sizeMeasured > 0 &&
+		atomic.LoadInt64(&cache.sizeEstimated) < cache.MaxSize &&
+		time.Now().Before(cache.tidyHoldUntil) {
 		atomic.AddInt32(&cache.tidying, -1)
 		return
 	}
@@ -608,11 +622,26 @@ func (cache *DiskCache) tidy() {
 		return
 	}
 
-	if totalsize <= maxsize {
+	// If we're below MaxSize or there's only one block in the
+	// cache, just update the usage estimate and return.
+	//
+	// (We never delete the last block because that would merely
+	// cause the same block to get re-fetched repeatedly from the
+	// backend.)
+	if totalsize <= maxsize || len(ents) == 1 {
+		atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+		atomic.StoreInt64(&cache.sizeEstimated, totalsize)
 		return
 	}
 
-	// Delete oldest entries until totalsize < maxsize.
+	// Set a new size target of maxsize minus 5%.  This makes some
+	// room for sizeEstimate to grow before it triggers another
+	// tidy. We don't want to walk/sort an entire large cache
+	// directory each time we write a block.
+	target := maxsize - (maxsize / 20)
+
+	// Delete oldest entries until totalsize < target or we're
+	// down to a single cached block.
 	sort.Slice(ents, func(i, j int) bool {
 		return ents[i].atime.Before(ents[j].atime)
 	})
@@ -622,7 +651,7 @@ func (cache *DiskCache) tidy() {
 		go cache.deleteHeldopen(ent.path, nil)
 		deleted++
 		totalsize -= ent.size
-		if totalsize <= maxsize {
+		if totalsize <= target || deleted == len(ents)-1 {
 			break
 		}
 	}
@@ -633,4 +662,6 @@ func (cache *DiskCache) tidy() {
 			"totalsize": totalsize,
 		}).Debugf("DiskCache: remaining cache usage after deleting")
 	}
+	atomic.StoreInt64(&cache.sizeMeasured, totalsize)
+	atomic.StoreInt64(&cache.sizeEstimated, totalsize)
 }
diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 5ae932a782..5fe1a6a08e 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -127,15 +127,29 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
 		Data: make([]byte, 44000000),
 	})
 	c.Check(err, check.IsNil)
+
+	// Wait for tidy to finish, check that it doesn't delete the
+	// only block.
 	time.Sleep(time.Millisecond)
+	for atomic.LoadInt32(&cache.tidying) > 0 {
+		time.Sleep(time.Millisecond)
+	}
+	c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(44000000))
+
 	resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
 		Data: make([]byte, 32000000),
 	})
 	c.Check(err, check.IsNil)
 	delete(backend.data, resp1.Locator)
 	delete(backend.data, resp2.Locator)
-	cache.tidyHoldUntil = time.Time{}
-	cache.tidy()
+
+	// Wait for tidy to finish, check that it deleted the older
+	// block.
+	time.Sleep(time.Millisecond)
+	for atomic.LoadInt32(&cache.tidying) > 0 {
+		time.Sleep(time.Millisecond)
+	}
+	c.Check(atomic.LoadInt64(&cache.sizeMeasured), check.Equals, int64(32000000))
 
 	n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
 	c.Check(n, check.Equals, 0)

commit 899185924c97c0e981c2c40ab115d7572ce79811
Author: Tom Clegg <tom at curii.com>
Date:   Wed Dec 20 13:19:32 2023 -0500

    20318: Implement BlockRead via ReadAt so it supports streaming.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 2a33871ca4..b366d6f1b0 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -476,18 +476,8 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
 	return n, err
 }
 
-// BlockRead reads the entire block from the wrapped KeepGateway into
-// the cache if needed, and writes it to the provided writer.
+// BlockRead reads an entire block using a 128 KiB buffer.
 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
-	cache.gotidy()
-	cachefilename := cache.cacheFile(opts.Locator)
-	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
-	if err != nil {
-		cache.debugf("BlockRead: %s", cachefilename, err)
-		return cache.KeepGateway.BlockRead(ctx, opts)
-	}
-	defer f.Close()
-
 	i := strings.Index(opts.Locator, "+")
 	if i < 0 || i >= len(opts.Locator) {
 		return 0, errors.New("invalid block locator: no size hint")
@@ -502,33 +492,28 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
 		return 0, errors.New("invalid block locator: invalid size hint")
 	}
 
-	err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
-	if err != nil {
-		return 0, err
-	}
-	filesize, err := f.Seek(0, io.SeekEnd)
-	if err != nil {
-		return 0, err
-	}
-	_, err = f.Seek(0, io.SeekStart)
-	if err != nil {
-		return 0, err
-	}
-	if filesize == blocksize {
-		n, err := io.Copy(opts.WriteTo, f)
-		return int(n), err
-	}
-	err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-	if err != nil {
-		return 0, err
-	}
-	opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
-	n, err := cache.KeepGateway.BlockRead(ctx, opts)
-	if err != nil {
-		return int(n), err
+	offset := 0
+	buf := make([]byte, 131072)
+	for offset < int(blocksize) {
+		if ctx.Err() != nil {
+			return offset, ctx.Err()
+		}
+		if int(blocksize)-offset > len(buf) {
+			buf = buf[:int(blocksize)-offset]
+		}
+		nr, err := cache.ReadAt(opts.Locator, buf, offset)
+		if nr > 0 {
+			nw, err := opts.WriteTo.Write(buf)
+			if err != nil {
+				return offset + nw, err
+			}
+		}
+		offset += nr
+		if err != nil {
+			return offset, err
+		}
 	}
-	f.Truncate(int64(n))
-	return n, nil
+	return offset, nil
 }
 
 // Start a tidy() goroutine, unless one is already running / recently

commit 4121c9e9fc03ee474f01248d384c7d3281b34328
Author: Tom Clegg <tom at curii.com>
Date:   Wed Dec 20 10:55:30 2023 -0500

    20318: Test streaming.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 6d11410470..5ae932a782 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -13,7 +13,9 @@ import (
 	"io"
 	"math/rand"
 	"os"
+	"path/filepath"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/ctxlog"
@@ -49,8 +51,10 @@ func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOpti
 }
 
 type keepGatewayMemoryBacked struct {
-	mtx  sync.RWMutex
-	data map[string][]byte
+	mtx                 sync.RWMutex
+	data                map[string][]byte
+	pauseBlockReadAfter int
+	pauseBlockReadUntil chan error
 }
 
 func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
@@ -76,6 +80,16 @@ func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadO
 	if data == nil {
 		return 0, errors.New("block not found: " + opts.Locator)
 	}
+	if k.pauseBlockReadUntil != nil {
+		src := bytes.NewReader(data)
+		n, err := io.CopyN(opts.WriteTo, src, int64(k.pauseBlockReadAfter))
+		if err != nil {
+			return int(n), err
+		}
+		<-k.pauseBlockReadUntil
+		n2, err := io.Copy(opts.WriteTo, src)
+		return int(n + n2), err
+	}
 	return opts.WriteTo.Write(data)
 }
 func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
@@ -220,6 +234,66 @@ func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangle
 	wg.Wait()
 }
 
+func (s *keepCacheSuite) TestStreaming(c *check.C) {
+	blksize := 64000000
+	backend := &keepGatewayMemoryBacked{
+		pauseBlockReadUntil: make(chan error),
+		pauseBlockReadAfter: blksize / 8,
+	}
+	cache := DiskCache{
+		KeepGateway: backend,
+		MaxSize:     int64(blksize),
+		Dir:         c.MkDir(),
+		Logger:      ctxlog.TestLogger(c),
+	}
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
+		Data: make([]byte, blksize),
+	})
+	c.Check(err, check.IsNil)
+	os.RemoveAll(filepath.Join(cache.Dir, resp.Locator[:3]))
+
+	// Start a lot of concurrent requests for various ranges of
+	// the same block. Our backend will return the first 8MB and
+	// then pause. The requests that can be satisfied by the first
+	// 8MB of data should return quickly. The rest should wait,
+	// and return after we release pauseBlockReadUntil.
+	var wgEarly, wgLate sync.WaitGroup
+	var doneEarly, doneLate int32
+	for i := 0; i < 10000; i++ {
+		wgEarly.Add(1)
+		go func() {
+			offset := int(rand.Int63() % int64(blksize-benchReadSize))
+			if offset+benchReadSize > backend.pauseBlockReadAfter {
+				wgLate.Add(1)
+				defer wgLate.Done()
+				wgEarly.Done()
+				defer atomic.AddInt32(&doneLate, 1)
+			} else {
+				defer wgEarly.Done()
+				defer atomic.AddInt32(&doneEarly, 1)
+			}
+			buf := make([]byte, benchReadSize)
+			n, err := cache.ReadAt(resp.Locator, buf, offset)
+			c.Check(n, check.Equals, len(buf))
+			c.Check(err, check.IsNil)
+		}()
+	}
+
+	// Ensure all early ranges finish while backend request(s) are
+	// paused.
+	wgEarly.Wait()
+	c.Logf("doneEarly = %d", doneEarly)
+	c.Check(doneLate, check.Equals, int32(0))
+
+	// Unpause backend request(s).
+	close(backend.pauseBlockReadUntil)
+	wgLate.Wait()
+	c.Logf("doneLate = %d", doneLate)
+}
+
 var _ = check.Suite(&keepCacheBenchSuite{})
 
 type keepCacheBenchSuite struct {

commit 932ca9e698fb36bcdd0c558b50e6e965417409d0
Author: Tom Clegg <tom at curii.com>
Date:   Wed Dec 20 10:22:31 2023 -0500

    20318: Comment about error handling.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 567e0cbec3..2a33871ca4 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -458,6 +458,12 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
 			progress.cond.Wait()
 		}
 		progress.cond.L.Unlock()
+		// If size<needed && progress.err!=nil here, we'll end
+		// up reporting a less helpful "EOF reading from cache
+		// file" below, instead of the actual error fetching
+		// from upstream to cache file.  This is OK though,
+		// because our caller (ReadAt) doesn't even report our
+		// error, it just retries.
 	}
 
 	n, err := heldopen.f.ReadAt(dst, int64(offset))

commit 4e69128e5e7aeb1a9c5e4462adb38ecc5f5bb8ea
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 19 16:48:58 2023 -0500

    20318: Return requested range while fetching remainder of block.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index b8b3d9f588..567e0cbec3 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -50,6 +50,21 @@ type DiskCache struct {
 	heldopen     map[string]*openFileEnt
 	heldopenMax  int
 	heldopenLock sync.Mutex
+
+	// The "writing" fields allow multiple concurrent/sequential
+	// ReadAt calls to be notified as a single
+	// read-block-from-backend-into-cache goroutine fills the
+	// cache file.
+	writing     map[string]*writeprogress
+	writingCond *sync.Cond
+	writingLock sync.Mutex
+}
+
+type writeprogress struct {
+	cond *sync.Cond // broadcast whenever size or done changes
+	done bool       // size and err have their final values
+	size int        // bytes copied into cache file so far
+	err  error      // error encountered while copying from backend to cache
 }
 
 type openFileEnt struct {
@@ -202,59 +217,116 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
 	return resp, err
 }
 
+type funcwriter func([]byte) (int, error)
+
+func (fw funcwriter) Write(p []byte) (int, error) {
+	return fw(p)
+}
+
 // ReadAt reads the entire block from the wrapped KeepGateway into the
 // cache if needed, and copies the requested portion into the provided
 // slice.
+//
+// ReadAt returns as soon as the requested portion is available in the
+// cache. The remainder of the block may continue to be copied into
+// the cache in the background.
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
 	cache.gotidy()
 	cachefilename := cache.cacheFile(locator)
 	if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
 		return n, err
 	}
-	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+	readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
 	if err != nil {
-		return 0, fmt.Errorf("ReadAt: %s", cachefilename, err)
+		return 0, fmt.Errorf("ReadAt: %w", err)
 	}
-	defer f.Close()
+	defer readf.Close()
 
-	err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+	err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
 	if err != nil {
 		return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
 	}
 
-	size, err := f.Seek(0, io.SeekEnd)
-	if err != nil {
-		return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-	}
-	if size < int64(len(dst)+offset) {
-		// The cache file seems to be truncated or empty
-		// (possibly because we just created it). Wait for an
-		// exclusive lock, then check again (in case someone
-		// else is doing the same thing) before trying to
-		// retrieve the entire block.
-		err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
-		if err != nil {
-			return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
-		}
-	}
-	size, err = f.Seek(0, io.SeekEnd)
-	if err != nil {
-		return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
-	}
-	if size < int64(len(dst)+offset) {
-		// The cache file is truncated or empty, and we own it
-		// now. Fill it.
-		_, err = f.Seek(0, io.SeekStart)
-		if err != nil {
-			return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
+	cache.writingLock.Lock()
+	progress := cache.writing[cachefilename]
+	if progress != nil {
+		cache.writingLock.Unlock()
+	} else {
+		progress = &writeprogress{}
+		progress.cond = sync.NewCond(&sync.Mutex{})
+		if cache.writing == nil {
+			cache.writing = map[string]*writeprogress{}
 		}
-		n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
-		if err != nil {
-			return 0, err
-		}
-		f.Truncate(int64(n))
+		cache.writing[cachefilename] = progress
+		cache.writingLock.Unlock()
+
+		// Start a goroutine to copy from backend to f. As
+		// data arrives, wake up any waiting loops (see below)
+		// so ReadAt() requests for partial data can return as
+		// soon as the relevant bytes have been copied.
+		go func() {
+			var size int
+			var writef *os.File
+			var err error
+			defer func() {
+				closeErr := writef.Close()
+				if err == nil {
+					err = closeErr
+				}
+				progress.cond.L.Lock()
+				progress.err = err
+				progress.done = true
+				progress.size = size
+				progress.cond.L.Unlock()
+				progress.cond.Broadcast()
+				cache.writingLock.Lock()
+				delete(cache.writing, cachefilename)
+				cache.writingLock.Unlock()
+			}()
+			writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+			if err != nil {
+				err = fmt.Errorf("ReadAt: %w", err)
+				return
+			}
+			err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+			if err != nil {
+				err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+				return
+			}
+			size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
+				Locator: locator,
+				WriteTo: funcwriter(func(p []byte) (int, error) {
+					n, err := writef.Write(p)
+					if n > 0 {
+						progress.cond.L.Lock()
+						progress.size += n
+						progress.cond.L.Unlock()
+						progress.cond.Broadcast()
+					}
+					return n, err
+				})})
+		}()
+	}
+	progress.cond.L.Lock()
+	for !progress.done && progress.size < len(dst)+offset {
+		progress.cond.Wait()
+	}
+	ok := progress.size >= len(dst)+offset
+	err = progress.err
+	progress.cond.L.Unlock()
+
+	if !ok && err != nil {
+		// If the copy-from-backend goroutine encountered an
+		// error before copying enough bytes to satisfy our
+		// request, we return that error.
+		return 0, err
+	} else {
+		// Regardless of whether the copy-from-backend
+		// goroutine succeeded, or failed after copying the
+		// bytes we need, the only errors we need to report
+		// are errors reading from the cache file.
+		return readf.ReadAt(dst, int64(offset))
 	}
-	return f.ReadAt(dst, int64(offset))
 }
 
 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
@@ -374,6 +446,20 @@ func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int
 		// Other goroutine closed the file before we got RLock
 		return 0, quickReadAtLostRace
 	}
+
+	// If another goroutine is currently writing the file, wait
+	// for it to catch up to the end of the range we need.
+	cache.writingLock.Lock()
+	progress := cache.writing[cachefilename]
+	cache.writingLock.Unlock()
+	if progress != nil {
+		progress.cond.L.Lock()
+		for !progress.done && progress.size < len(dst)+offset {
+			progress.cond.Wait()
+		}
+		progress.cond.L.Unlock()
+	}
+
 	n, err := heldopen.f.ReadAt(dst, int64(offset))
 	if err != nil {
 		// wait for any concurrent users to finish, then

commit 2553652e43229a872b93a5d011c25a2727d1d18f
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 19 14:07:28 2023 -0500

    20318: Fix stuttering error message.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 189332c866..b8b3d9f588 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -213,7 +213,7 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 	}
 	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
 	if err != nil {
-		return 0, fmt.Errorf("ReadAt: open(%s) failed: %s", cachefilename, err)
+		return 0, fmt.Errorf("ReadAt: %s", cachefilename, err)
 	}
 	defer f.Close()
 
@@ -391,7 +391,7 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
 	cachefilename := cache.cacheFile(opts.Locator)
 	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
 	if err != nil {
-		cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
+		cache.debugf("BlockRead: %s", cachefilename, err)
 		return cache.KeepGateway.BlockRead(ctx, opts)
 	}
 	defer f.Close()

commit 82054f095dad0fb9d21842eac1cd9ade50ffc940
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 19 14:07:23 2023 -0500

    20318: Remove memory-backed keep block cache.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 0e0d3c43e4..bde13424dd 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -78,7 +78,6 @@ type IKeepClient interface {
 	ReadAt(locator string, p []byte, off int) (int, error)
 	ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
 	LocalLocator(locator string) (string, error)
-	ClearBlockCache()
 	SetStorageClasses(sc []string)
 }
 
@@ -2033,7 +2032,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		log.Printf("%s: %v", containerUUID, err)
 		return 1
 	}
-	kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
 	kc.Retries = 4
 
 	cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, containerUUID)
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index c533821351..276dd36661 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -368,9 +368,6 @@ func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
 	return 0, errors.New("not implemented")
 }
 
-func (client *KeepTestClient) ClearBlockCache() {
-}
-
 func (client *KeepTestClient) Close() {
 	client.Content = nil
 }
diff --git a/lib/mount/command.go b/lib/mount/command.go
index f88d977c4c..666f2cf4ac 100644
--- a/lib/mount/command.go
+++ b/lib/mount/command.go
@@ -43,7 +43,6 @@ func (c *mountCommand) RunCommand(prog string, args []string, stdin io.Reader, s
 	flags := flag.NewFlagSet(prog, flag.ContinueOnError)
 	ro := flags.Bool("ro", false, "read-only")
 	experimental := flags.Bool("experimental", false, "acknowledge this is an experimental command, and should not be used in production (required)")
-	blockCache := flags.Int("block-cache", 4, "read cache size (number of 64MiB blocks)")
 	pprof := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
 	if ok, code := cmd.ParseFlags(flags, prog, args, "[FUSE mount options]", stderr); !ok {
 		return code
@@ -69,7 +68,6 @@ func (c *mountCommand) RunCommand(prog string, args []string, stdin io.Reader, s
 		logger.Print(err)
 		return 1
 	}
-	kc.BlockCache = &keepclient.BlockCache{MaxBlocks: *blockCache}
 	host := fuse.NewFileSystemHost(&keepFS{
 		Client:     client,
 		KeepClient: kc,
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
deleted file mode 100644
index 37eee4de20..0000000000
--- a/sdk/go/keepclient/block_cache.go
+++ /dev/null
@@ -1,134 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: Apache-2.0
-
-package keepclient
-
-import (
-	"bytes"
-	"context"
-	"io"
-	"sort"
-	"strconv"
-	"strings"
-	"sync"
-	"time"
-
-	"git.arvados.org/arvados.git/sdk/go/arvados"
-)
-
-var DefaultBlockCache = &BlockCache{}
-
-type BlockCache struct {
-	// Maximum number of blocks to keep in the cache. If 0, a
-	// default size (currently 4) is used instead.
-	MaxBlocks int
-
-	cache map[string]*cacheBlock
-	mtx   sync.Mutex
-}
-
-const defaultMaxBlocks = 4
-
-// Sweep deletes the least recently used blocks from the cache until
-// there are no more than MaxBlocks left.
-func (c *BlockCache) Sweep() {
-	max := c.MaxBlocks
-	if max == 0 {
-		max = defaultMaxBlocks
-	}
-	c.mtx.Lock()
-	defer c.mtx.Unlock()
-	if len(c.cache) <= max {
-		return
-	}
-	lru := make([]time.Time, 0, len(c.cache))
-	for _, b := range c.cache {
-		lru = append(lru, b.lastUse)
-	}
-	sort.Sort(sort.Reverse(timeSlice(lru)))
-	threshold := lru[max]
-	for loc, b := range c.cache {
-		if !b.lastUse.After(threshold) {
-			delete(c.cache, loc)
-		}
-	}
-}
-
-// ReadAt returns data from the cache, first retrieving it from Keep if
-// necessary.
-func (c *BlockCache) ReadAt(upstream arvados.KeepGateway, locator string, p []byte, off int) (int, error) {
-	buf, err := c.get(upstream, locator)
-	if err != nil {
-		return 0, err
-	}
-	if off > len(buf) {
-		return 0, io.ErrUnexpectedEOF
-	}
-	return copy(p, buf[off:]), nil
-}
-
-// Get a block from the cache, first retrieving it from Keep if
-// necessary.
-func (c *BlockCache) get(upstream arvados.KeepGateway, locator string) ([]byte, error) {
-	cacheKey := locator[:32]
-	bufsize := BLOCKSIZE
-	if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
-		datasize, err := strconv.ParseInt(parts[1], 10, 32)
-		if err == nil && datasize >= 0 {
-			bufsize = int(datasize)
-		}
-	}
-	c.mtx.Lock()
-	if c.cache == nil {
-		c.cache = make(map[string]*cacheBlock)
-	}
-	b, ok := c.cache[cacheKey]
-	if !ok || b.err != nil {
-		b = &cacheBlock{
-			fetched: make(chan struct{}),
-			lastUse: time.Now(),
-		}
-		c.cache[cacheKey] = b
-		go func() {
-			buf := bytes.NewBuffer(make([]byte, 0, bufsize))
-			_, err := upstream.BlockRead(context.Background(), arvados.BlockReadOptions{Locator: locator, WriteTo: buf})
-			c.mtx.Lock()
-			b.data, b.err = buf.Bytes(), err
-			c.mtx.Unlock()
-			close(b.fetched)
-			go c.Sweep()
-		}()
-	}
-	c.mtx.Unlock()
-
-	// Wait (with mtx unlocked) for the fetch goroutine to finish,
-	// in case it hasn't already.
-	<-b.fetched
-
-	c.mtx.Lock()
-	b.lastUse = time.Now()
-	c.mtx.Unlock()
-	return b.data, b.err
-}
-
-func (c *BlockCache) Clear() {
-	c.mtx.Lock()
-	c.cache = nil
-	c.mtx.Unlock()
-}
-
-type timeSlice []time.Time
-
-func (ts timeSlice) Len() int { return len(ts) }
-
-func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
-
-func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
-
-type cacheBlock struct {
-	data    []byte
-	err     error
-	fetched chan struct{}
-	lastUse time.Time
-}
diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go
index 65dcd9ac8a..c1bad8557d 100644
--- a/sdk/go/keepclient/collectionreader_test.go
+++ b/sdk/go/keepclient/collectionreader_test.go
@@ -237,14 +237,8 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
 }
 
 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
-	// Disable disk cache
-	defer func(home string) {
-		os.Setenv("HOME", home)
-	}(os.Getenv("HOME"))
-	os.Setenv("HOME", "")
-
-	// Disable memory cache
-	s.kc.BlockCache = &BlockCache{}
+	// Disable cache
+	s.kc.gatewayStack = &keepViaHTTP{s.kc}
 
 	s.kc.PutB([]byte("foo"))
 	s.kc.PutB([]byte("bar"))
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index 35c191afe6..eeb187e107 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -67,28 +67,3 @@ func (kvh *keepViaHTTP) LocalLocator(locator string) (string, error) {
 	}
 	return loc, nil
 }
-
-// keepViaBlockCache implements arvados.KeepGateway by using the given
-// KeepClient's BlockCache with the wrapped KeepGateway.
-//
-// Note the whole KeepClient gets passed in instead of just its
-// cache. This ensures the new BlockCache gets used if it changes
-// after keepViaBlockCache is initialized.
-type keepViaBlockCache struct {
-	kc *KeepClient
-	arvados.KeepGateway
-}
-
-func (kvbc *keepViaBlockCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
-	return kvbc.kc.cache().ReadAt(kvbc.KeepGateway, locator, dst, offset)
-}
-
-func (kvbc *keepViaBlockCache) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
-	rdr, _, _, _, err := kvbc.kc.getOrHead("GET", opts.Locator, nil)
-	if err != nil {
-		return 0, err
-	}
-	defer rdr.Close()
-	n, err := io.Copy(opts.WriteTo, rdr)
-	return int(n), err
-}
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 2712e9f3a5..4e935812be 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -110,7 +110,6 @@ type KeepClient struct {
 	lock                  sync.RWMutex
 	HTTPClient            HTTPClient
 	Retries               int
-	BlockCache            *BlockCache
 	RequestID             string
 	StorageClasses        []string
 	DefaultStorageClasses []string // Set by cluster's exported config
@@ -542,17 +541,6 @@ func (kc *KeepClient) getSortedRoots(locator string) []string {
 	return found
 }
 
-func (kc *KeepClient) cache() *BlockCache {
-	if kc.BlockCache != nil {
-		return kc.BlockCache
-	}
-	return DefaultBlockCache
-}
-
-func (kc *KeepClient) ClearBlockCache() {
-	kc.cache().Clear()
-}
-
 func (kc *KeepClient) SetStorageClasses(sc []string) {
 	// make a copy so the caller can't mess with it.
 	kc.StorageClasses = append([]string{}, sc...)

commit 18541c985f7f19d9c200a592287333fb3fdab38b
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 19 10:57:27 2023 -0500

    20318: Route KeepClient block writes through disk cache.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index 262d612640..35c191afe6 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -6,7 +6,11 @@ package keepclient
 
 import (
 	"context"
+	"fmt"
 	"io"
+	"net/http"
+	"strings"
+	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 )
@@ -41,6 +45,29 @@ func (kvh *keepViaHTTP) BlockRead(ctx context.Context, opts arvados.BlockReadOpt
 	return int(n), err
 }
 
+func (kvh *keepViaHTTP) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+	return kvh.httpBlockWrite(ctx, req)
+}
+
+func (kvh *keepViaHTTP) LocalLocator(locator string) (string, error) {
+	if !strings.Contains(locator, "+R") {
+		// Either it has +A, or it's unsigned and we assume
+		// it's a local locator on a site with signatures
+		// disabled.
+		return locator, nil
+	}
+	sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
+	_, _, url, hdr, err := kvh.KeepClient.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
+	if err != nil {
+		return "", err
+	}
+	loc := hdr.Get("X-Keep-Locator")
+	if loc == "" {
+		return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
+	}
+	return loc, nil
+}
+
 // keepViaBlockCache implements arvados.KeepGateway by using the given
 // KeepClient's BlockCache with the wrapped KeepGateway.
 //
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index cae6ca1545..2712e9f3a5 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -376,22 +376,7 @@ func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
 // with a valid signature from the local cluster. If the given locator
 // already has a local signature, it is returned unchanged.
 func (kc *KeepClient) LocalLocator(locator string) (string, error) {
-	if !strings.Contains(locator, "+R") {
-		// Either it has +A, or it's unsigned and we assume
-		// it's a local locator on a site with signatures
-		// disabled.
-		return locator, nil
-	}
-	sighdr := fmt.Sprintf("local, time=%s", time.Now().UTC().Format(time.RFC3339))
-	_, _, url, hdr, err := kc.getOrHead("HEAD", locator, http.Header{"X-Keep-Signature": []string{sighdr}})
-	if err != nil {
-		return "", err
-	}
-	loc := hdr.Get("X-Keep-Locator")
-	if loc == "" {
-		return "", fmt.Errorf("missing X-Keep-Locator header in HEAD response from %s", url)
-	}
-	return loc, nil
+	return kc.upstreamGateway().LocalLocator(locator)
 }
 
 // Get retrieves a block, given a locator. Returns a reader, the
@@ -412,6 +397,12 @@ func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
 	return kc.upstreamGateway().ReadAt(locator, p, off)
 }
 
+// BlockWrite writes a full block to upstream servers and saves a copy
+// in the local cache.
+func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+	return kc.upstreamGateway().BlockWrite(ctx, req)
+}
+
 // Ask verifies that a block with the given hash is available and
 // readable, according to at least one Keep service. Unlike Get, it
 // does not retrieve the data or verify that the data content matches
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 8d299815b2..c4144bf871 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -127,7 +127,7 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo [
 	}
 }
 
-func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+func (kc *KeepClient) httpBlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
 	var resp arvados.BlockWriteResponse
 	var getReader func() io.Reader
 	if req.Data == nil && req.Reader == nil {

commit f68557c627b5a8472d33d973a2737448904c29bd
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 19 10:47:05 2023 -0500

    20318: Don't use memory-backed block cache for short reads.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 18614a77eb..cae6ca1545 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -42,6 +42,9 @@ var (
 	DefaultProxyConnectTimeout      = 30 * time.Second
 	DefaultProxyTLSHandshakeTimeout = 10 * time.Second
 	DefaultProxyKeepAlive           = 120 * time.Second
+
+	rootCacheDir = "/var/cache/arvados/keep"
+	userCacheDir = ".cache/arvados/keep" // relative to HOME
 )
 
 // Error interface with an error and boolean indicating whether the error is temporary
@@ -336,43 +339,37 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
 	return nil, 0, "", nil, err
 }
 
+// attempt to create dir/subdir/ and its parents, up to but not
+// including dir itself, using mode 0700.
+func makedirs(dir, subdir string) {
+	for _, part := range strings.Split(subdir, string(os.PathSeparator)) {
+		dir = filepath.Join(dir, part)
+		os.Mkdir(dir, 0700)
+	}
+}
+
 // upstreamGateway creates/returns the KeepGateway stack used to read
-// and write data: a memory cache, a disk-backed cache if available,
-// and an http backend.
+// and write data: a disk-backed cache on top of an http backend.
 func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
 	kc.lock.Lock()
 	defer kc.lock.Unlock()
 	if kc.gatewayStack != nil {
 		return kc.gatewayStack
 	}
-	var stack arvados.KeepGateway = &keepViaHTTP{kc}
-
-	// Wrap with a disk cache, if cache dir is writable
-	home := os.Getenv("HOME")
-	if fi, err := os.Stat(home); home != "" && err == nil && fi.IsDir() {
-		var err error
-		dir := home
-		for _, part := range []string{".cache", "arvados", "keep"} {
-			dir = filepath.Join(dir, part)
-			err = os.Mkdir(dir, 0700)
-		}
-		if err == nil || os.IsExist(err) {
-			os.Mkdir(filepath.Join(dir, "tmp"), 0700)
-			err = os.WriteFile(filepath.Join(dir, "tmp", "check.tmp"), nil, 0600)
-			if err == nil {
-				stack = &arvados.DiskCache{
-					Dir:         dir,
-					KeepGateway: stack,
-				}
-			}
-		}
+	var cachedir string
+	if os.Geteuid() == 0 {
+		cachedir = rootCacheDir
+		makedirs("/", cachedir)
+	} else {
+		home := "/" + os.Getenv("HOME")
+		makedirs(home, userCacheDir)
+		cachedir = filepath.Join(home, userCacheDir)
 	}
-	stack = &keepViaBlockCache{
-		kc:          kc,
-		KeepGateway: stack,
+	kc.gatewayStack = &arvados.DiskCache{
+		Dir:         cachedir,
+		KeepGateway: &keepViaHTTP{kc},
 	}
-	kc.gatewayStack = stack
-	return stack
+	return kc.gatewayStack
 }
 
 // LocalLocator returns a locator equivalent to the one supplied, but

commit 4ef866c34bba8bd73c0b1de48fb8c62d4f7d0661
Author: Tom Clegg <tom at curii.com>
Date:   Tue Dec 19 10:07:36 2023 -0500

    20318: Fail instead of reading through if cache dir is unusable.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 6aed35a215..189332c866 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -213,8 +213,7 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 	}
 	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
 	if err != nil {
-		cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
-		return cache.KeepGateway.ReadAt(locator, dst, offset)
+		return 0, fmt.Errorf("ReadAt: open(%s) failed: %s", cachefilename, err)
 	}
 	defer f.Close()
 

commit 85a85da473af4b66dbc92a5f8882eea8c7ce8ffd
Author: Tom Clegg <tom at curii.com>
Date:   Mon Dec 18 20:59:31 2023 -0500

    20318: Close held-open filehandles when deleting cache files.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index eb9d4607bf..6aed35a215 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -260,12 +260,33 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 
 var quickReadAtLostRace = errors.New("quickReadAt: lost race")
 
-func (cache *DiskCache) deleteHeldopen(cachefilename string, heldopen *openFileEnt) {
+// Remove the cache entry for the indicated cachefilename if it
+// matches expect (quickReadAt() usage), or if expect is nil (tidy()
+// usage).
+//
+// If expect is non-nil, close expect's filehandle.
+//
+// If expect is nil and a different cache entry is deleted, close its
+// filehandle.
+func (cache *DiskCache) deleteHeldopen(cachefilename string, expect *openFileEnt) {
+	needclose := expect
+
 	cache.heldopenLock.Lock()
-	if cache.heldopen[cachefilename] == heldopen {
+	found := cache.heldopen[cachefilename]
+	if found != nil && (expect == nil || expect == found) {
 		delete(cache.heldopen, cachefilename)
+		needclose = found
 	}
 	cache.heldopenLock.Unlock()
+
+	if needclose != nil {
+		needclose.Lock()
+		defer needclose.Unlock()
+		if needclose.f != nil {
+			needclose.f.Close()
+			needclose.f = nil
+		}
+	}
 }
 
 // quickReadAt attempts to use a cached-filehandle approach to read
@@ -522,6 +543,7 @@ func (cache *DiskCache) tidy() {
 	deleted := 0
 	for _, ent := range ents {
 		os.Remove(ent.path)
+		go cache.deleteHeldopen(ent.path, nil)
 		deleted++
 		totalsize -= ent.size
 		if totalsize <= maxsize {

commit f9a0922e50904365e40b99372ed66f3a6f992cd7
Author: Tom Clegg <tom at curii.com>
Date:   Mon Dec 18 20:19:35 2023 -0500

    20318: Test mangling cache files while reading.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 23e7dfbd9f..6d11410470 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -11,6 +11,7 @@ import (
 	"errors"
 	"fmt"
 	"io"
+	"math/rand"
 	"os"
 	"sync"
 	"time"
@@ -131,7 +132,13 @@ func (s *keepCacheSuite) TestMaxSize(c *check.C) {
 	c.Check(err, check.IsNil)
 }
 
-func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+func (s *keepCacheSuite) TestConcurrentReadersNoRefresh(c *check.C) {
+	s.testConcurrentReaders(c, true, false)
+}
+func (s *keepCacheSuite) TestConcurrentReadersMangleCache(c *check.C) {
+	s.testConcurrentReaders(c, false, true)
+}
+func (s *keepCacheSuite) testConcurrentReaders(c *check.C, cannotRefresh, mangleCache bool) {
 	blksize := 64000000
 	backend := &keepGatewayMemoryBacked{}
 	cache := DiskCache{
@@ -140,20 +147,61 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
 		Dir:         c.MkDir(),
 		Logger:      ctxlog.TestLogger(c),
 	}
-	ctx := context.Background()
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
 		Data: make([]byte, blksize),
 	})
 	c.Check(err, check.IsNil)
-	delete(backend.data, resp.Locator)
+	if cannotRefresh {
+		// Delete the block from the backing store, to ensure
+		// the cache doesn't rely on re-reading a block that
+		// it has just written.
+		delete(backend.data, resp.Locator)
+	}
+	if mangleCache {
+		// Replace cache files with truncated files (and
+		// delete them outright) while the ReadAt loop is
+		// running, to ensure the cache can re-fetch from the
+		// backend as needed.
+		var nRemove, nTrunc int
+		defer func() {
+			c.Logf("nRemove %d", nRemove)
+			c.Logf("nTrunc %d", nTrunc)
+		}()
+		go func() {
+			// Truncate/delete the cache file at various
+			// intervals. Readers should re-fetch/recover from
+			// this.
+			fnm := cache.cacheFile(resp.Locator)
+			for ctx.Err() == nil {
+				trunclen := rand.Int63() % int64(blksize*2)
+				if trunclen > int64(blksize) {
+					err := os.Remove(fnm)
+					if err == nil {
+						nRemove++
+					}
+				} else if os.WriteFile(fnm+"#", make([]byte, trunclen), 0700) == nil {
+					err := os.Rename(fnm+"#", fnm)
+					if err == nil {
+						nTrunc++
+					}
+				}
+			}
+		}()
+	}
 
 	failed := false
 	var wg sync.WaitGroup
-	for offset := 0; offset < blksize; offset += 123456 {
-		offset := offset
+	var slots = make(chan bool, 100) // limit concurrency / memory usage
+	for i := 0; i < 20000; i++ {
+		offset := (i * 123456) % blksize
+		slots <- true
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
+			defer func() { <-slots }()
 			buf := make([]byte, 654321)
 			if offset+len(buf) > blksize {
 				buf = buf[:blksize-offset]

commit fcfb6de8652973045d7c188d11817ef2471e7335
Author: Tom Clegg <tom at curii.com>
Date:   Mon Dec 18 18:45:51 2023 -0500

    20318: Add concurrent and sequential read benchmarks.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 128e47faa2..23e7dfbd9f 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -172,8 +172,73 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
 	wg.Wait()
 }
 
+var _ = check.Suite(&keepCacheBenchSuite{})
+
+type keepCacheBenchSuite struct {
+	blksize  int
+	blkcount int
+	backend  *keepGatewayMemoryBacked
+	cache    *DiskCache
+	locators []string
+}
+
+func (s *keepCacheBenchSuite) SetUpTest(c *check.C) {
+	s.blksize = 64000000
+	s.blkcount = 8
+	s.backend = &keepGatewayMemoryBacked{}
+	s.cache = &DiskCache{
+		KeepGateway: s.backend,
+		MaxSize:     int64(s.blksize),
+		Dir:         c.MkDir(),
+		Logger:      ctxlog.TestLogger(c),
+	}
+	s.locators = make([]string, s.blkcount)
+	data := make([]byte, s.blksize)
+	for b := 0; b < s.blkcount; b++ {
+		for i := range data {
+			data[i] = byte(b)
+		}
+		resp, err := s.cache.BlockWrite(context.Background(), BlockWriteOptions{
+			Data: data,
+		})
+		c.Assert(err, check.IsNil)
+		s.locators[b] = resp.Locator
+	}
+}
+
+func (s *keepCacheBenchSuite) BenchmarkConcurrentReads(c *check.C) {
+	var wg sync.WaitGroup
+	for i := 0; i < c.N; i++ {
+		i := i
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			buf := make([]byte, benchReadSize)
+			_, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+			if err != nil {
+				c.Fail()
+			}
+		}()
+	}
+	wg.Wait()
+}
+
+func (s *keepCacheBenchSuite) BenchmarkSequentialReads(c *check.C) {
+	buf := make([]byte, benchReadSize)
+	for i := 0; i < c.N; i++ {
+		_, err := s.cache.ReadAt(s.locators[i%s.blkcount], buf, int((int64(i)*1234)%int64(s.blksize-benchReadSize)))
+		if err != nil {
+			c.Fail()
+		}
+	}
+}
+
 const benchReadSize = 1000
 
+var _ = check.Suite(&fileOpsSuite{})
+
+type fileOpsSuite struct{}
+
 // BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
 // potential performance improvement of caching filehandles rather
 // than opening/closing the cache file for each read.
@@ -182,7 +247,7 @@ const benchReadSize = 1000
 // improvement: ~636 MB/s when opening/closing the file for each
 // 1000-byte read vs. ~2 GB/s when opening the file once and doing
 // concurrent reads using the same file descriptor.
-func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
+func (s *fileOpsSuite) BenchmarkOpenClose(c *check.C) {
 	fnm := c.MkDir() + "/testfile"
 	os.WriteFile(fnm, make([]byte, 64000000), 0700)
 	var wg sync.WaitGroup
@@ -206,7 +271,7 @@ func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
 	wg.Wait()
 }
 
-func (s *keepCacheSuite) BenchmarkKeepOpen(c *check.C) {
+func (s *fileOpsSuite) BenchmarkKeepOpen(c *check.C) {
 	fnm := c.MkDir() + "/testfile"
 	os.WriteFile(fnm, make([]byte, 64000000), 0700)
 	f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)

commit 6a54de705f1e129566ee7f5101fe5cbe3dbba548
Author: Tom Clegg <tom at curii.com>
Date:   Thu Dec 14 20:17:04 2023 -0500

    20318: Try to keep cache files open for subsequent/concurrent reads.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 4dabbd3506..eb9d4607bf 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -17,6 +17,7 @@ import (
 	"sort"
 	"strconv"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"syscall"
 	"time"
@@ -42,6 +43,19 @@ type DiskCache struct {
 	tidying        int32 // see tidy()
 	tidyHoldUntil  time.Time
 	defaultMaxSize int64
+
+	// The "heldopen" fields are used to open cache files for
+	// reading, and leave them open for future/concurrent ReadAt
+	// operations. See quickReadAt.
+	heldopen     map[string]*openFileEnt
+	heldopenMax  int
+	heldopenLock sync.Mutex
+}
+
+type openFileEnt struct {
+	sync.RWMutex
+	f   *os.File
+	err error // if err is non-nil, f should not be used.
 }
 
 const (
@@ -194,6 +208,9 @@ func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions)
 func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
 	cache.gotidy()
 	cachefilename := cache.cacheFile(locator)
+	if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
+		return n, err
+	}
 	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
 	if err != nil {
 		cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
@@ -241,8 +258,114 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 	return f.ReadAt(dst, int64(offset))
 }
 
-// ReadAt reads the entire block from the wrapped KeepGateway into the
-// cache if needed, and writes it to the provided writer.
+var quickReadAtLostRace = errors.New("quickReadAt: lost race")
+
+func (cache *DiskCache) deleteHeldopen(cachefilename string, heldopen *openFileEnt) {
+	cache.heldopenLock.Lock()
+	if cache.heldopen[cachefilename] == heldopen {
+		delete(cache.heldopen, cachefilename)
+	}
+	cache.heldopenLock.Unlock()
+}
+
+// quickReadAt attempts to use a cached-filehandle approach to read
+// from the indicated file. The expectation is that the caller
+// (ReadAt) will try a more robust approach when this fails, so
+// quickReadAt doesn't try especially hard to ensure success in
+// races. In particular, when there are concurrent calls, and one
+// fails, that can cause others to fail too.
+func (cache *DiskCache) quickReadAt(cachefilename string, dst []byte, offset int) (int, error) {
+	isnew := false
+	cache.heldopenLock.Lock()
+	if cache.heldopenMax == 0 {
+		// Choose a reasonable limit on open cache files based
+		// on RLIMIT_NOFILE. Note Go automatically raises
+		// softlimit to hardlimit, so it's typically 1048576,
+		// not 1024.
+		lim := syscall.Rlimit{}
+		err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim)
+		if err != nil {
+			cache.heldopenMax = 256
+		} else if lim.Cur > 40000 {
+			cache.heldopenMax = 10000
+		} else {
+			cache.heldopenMax = int(lim.Cur / 4)
+		}
+	}
+	heldopen := cache.heldopen[cachefilename]
+	if heldopen == nil {
+		isnew = true
+		heldopen = &openFileEnt{}
+		if cache.heldopen == nil {
+			cache.heldopen = make(map[string]*openFileEnt, cache.heldopenMax)
+		} else if len(cache.heldopen) > cache.heldopenMax {
+			// Rather than go to the trouble of tracking
+			// last access time, just close all files, and
+			// open again as needed. Even in the worst
+			// pathological case, this causes one extra
+			// open+close per read, which is not
+			// especially bad (see benchmarks).
+			go func(m map[string]*openFileEnt) {
+				for _, heldopen := range m {
+					heldopen.Lock()
+					defer heldopen.Unlock()
+					if heldopen.f != nil {
+						heldopen.f.Close()
+						heldopen.f = nil
+					}
+				}
+			}(cache.heldopen)
+			cache.heldopen = nil
+		}
+		cache.heldopen[cachefilename] = heldopen
+		heldopen.Lock()
+	}
+	cache.heldopenLock.Unlock()
+
+	if isnew {
+		// Open and flock the file, then call wg.Done() to
+		// unblock any other goroutines that are waiting in
+		// the !isnew case above.
+		f, err := os.Open(cachefilename)
+		if err == nil {
+			err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+			if err == nil {
+				heldopen.f = f
+			} else {
+				f.Close()
+			}
+		}
+		if err != nil {
+			heldopen.err = err
+			go cache.deleteHeldopen(cachefilename, heldopen)
+		}
+		heldopen.Unlock()
+	}
+	// Acquire read lock to ensure (1) initialization is complete,
+	// if it's done by a different goroutine, and (2) any "delete
+	// old/unused entries" waits for our read to finish before
+	// closing the file.
+	heldopen.RLock()
+	defer heldopen.RUnlock()
+	if heldopen.err != nil {
+		// Other goroutine encountered an error during setup
+		return 0, heldopen.err
+	} else if heldopen.f == nil {
+		// Other goroutine closed the file before we got RLock
+		return 0, quickReadAtLostRace
+	}
+	n, err := heldopen.f.ReadAt(dst, int64(offset))
+	if err != nil {
+		// wait for any concurrent users to finish, then
+		// delete this cache entry in case reopening the
+		// backing file helps.
+		go cache.deleteHeldopen(cachefilename, heldopen)
+	}
+	return n, err
+}
+
+// BlockRead reads the entire block from the wrapped KeepGateway into
+// the cache if needed, and writes it to the provided writer.
 func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
 	cache.gotidy()
 	cachefilename := cache.cacheFile(opts.Locator)

commit cdb63c3e5b6f11bfcb8244614d8a6fd309fbafce
Author: Tom Clegg <tom at curii.com>
Date:   Thu Dec 14 20:12:41 2023 -0500

    20318: Use const for cache-tidy hold timer.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index b9c7fea4b0..4dabbd3506 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -44,9 +44,10 @@ type DiskCache struct {
 	defaultMaxSize int64
 }
 
-var (
-	cacheFileSuffix = ".keepcacheblock"
-	tmpFileSuffix   = ".tmp"
+const (
+	cacheFileSuffix  = ".keepcacheblock"
+	tmpFileSuffix    = ".tmp"
+	tidyHoldDuration = 10 * time.Second
 )
 
 func (cache *DiskCache) cacheFile(locator string) string {
@@ -306,7 +307,7 @@ func (cache *DiskCache) gotidy() {
 	}
 	go func() {
 		cache.tidy()
-		cache.tidyHoldUntil = time.Now().Add(10 * time.Second)
+		cache.tidyHoldUntil = time.Now().Add(tidyHoldDuration)
 		atomic.AddInt32(&cache.tidying, -1)
 	}()
 }

commit 01ecf2938246c47dda5cdf667c4dcf29f49693be
Author: Tom Clegg <tom at curii.com>
Date:   Wed Nov 29 15:06:36 2023 -0500

    20318: Benchmark open/close vs. shared fd.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
index 6cc5a2b8dd..128e47faa2 100644
--- a/sdk/go/arvados/keep_cache_test.go
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -11,6 +11,7 @@ import (
 	"errors"
 	"fmt"
 	"io"
+	"os"
 	"sync"
 	"time"
 
@@ -170,3 +171,62 @@ func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
 	}
 	wg.Wait()
 }
+
+const benchReadSize = 1000
+
+// BenchmarkOpenClose and BenchmarkKeepOpen can be used to measure the
+// potential performance improvement of caching filehandles rather
+// than opening/closing the cache file for each read.
+//
+// Results from a development machine indicate a ~3x throughput
+// improvement: ~636 MB/s when opening/closing the file for each
+// 1000-byte read vs. ~2 GB/s when opening the file once and doing
+// concurrent reads using the same file descriptor.
+func (s *keepCacheSuite) BenchmarkOpenClose(c *check.C) {
+	fnm := c.MkDir() + "/testfile"
+	os.WriteFile(fnm, make([]byte, 64000000), 0700)
+	var wg sync.WaitGroup
+	for i := 0; i < c.N; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+			if err != nil {
+				c.Fail()
+				return
+			}
+			_, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+			if err != nil {
+				c.Fail()
+				return
+			}
+			f.Close()
+		}()
+	}
+	wg.Wait()
+}
+
+func (s *keepCacheSuite) BenchmarkKeepOpen(c *check.C) {
+	fnm := c.MkDir() + "/testfile"
+	os.WriteFile(fnm, make([]byte, 64000000), 0700)
+	f, err := os.OpenFile(fnm, os.O_CREATE|os.O_RDWR, 0700)
+	if err != nil {
+		c.Fail()
+		return
+	}
+	var wg sync.WaitGroup
+	for i := 0; i < c.N; i++ {
+		i := i
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			_, err = f.ReadAt(make([]byte, benchReadSize), (int64(i)*1000000)%63123123)
+			if err != nil {
+				c.Fail()
+				return
+			}
+		}()
+	}
+	wg.Wait()
+	f.Close()
+}

commit ef1a56aa4c6aff593767fcaba693ebc042df5d4b
Author: Tom Clegg <tom at curii.com>
Date:   Mon Nov 27 16:25:56 2023 -0500

    20318: Add filesystem cache tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache_test.go b/sdk/go/arvados/keep_cache_test.go
new file mode 100644
index 0000000000..6cc5a2b8dd
--- /dev/null
+++ b/sdk/go/arvados/keep_cache_test.go
@@ -0,0 +1,172 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+	"bytes"
+	"context"
+	"crypto/md5"
+	"errors"
+	"fmt"
+	"io"
+	"sync"
+	"time"
+
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&keepCacheSuite{})
+
+type keepCacheSuite struct {
+}
+
+type keepGatewayBlackHole struct {
+}
+
+func (*keepGatewayBlackHole) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	return 0, errors.New("block not found")
+}
+func (*keepGatewayBlackHole) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+	return 0, errors.New("block not found")
+}
+func (*keepGatewayBlackHole) LocalLocator(locator string) (string, error) {
+	return locator, nil
+}
+func (*keepGatewayBlackHole) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+	h := md5.New()
+	var size int64
+	if opts.Reader == nil {
+		size, _ = io.Copy(h, bytes.NewReader(opts.Data))
+	} else {
+		size, _ = io.Copy(h, opts.Reader)
+	}
+	return BlockWriteResponse{Locator: fmt.Sprintf("%x+%d", h.Sum(nil), size), Replicas: 1}, nil
+}
+
+type keepGatewayMemoryBacked struct {
+	mtx  sync.RWMutex
+	data map[string][]byte
+}
+
+func (k *keepGatewayMemoryBacked) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	k.mtx.RLock()
+	data := k.data[locator]
+	k.mtx.RUnlock()
+	if data == nil {
+		return 0, errors.New("block not found: " + locator)
+	}
+	var n int
+	if len(data) > offset {
+		n = copy(dst, data[offset:])
+	}
+	if n < len(dst) {
+		return n, io.EOF
+	}
+	return n, nil
+}
+func (k *keepGatewayMemoryBacked) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+	k.mtx.RLock()
+	data := k.data[opts.Locator]
+	k.mtx.RUnlock()
+	if data == nil {
+		return 0, errors.New("block not found: " + opts.Locator)
+	}
+	return opts.WriteTo.Write(data)
+}
+func (k *keepGatewayMemoryBacked) LocalLocator(locator string) (string, error) {
+	return locator, nil
+}
+func (k *keepGatewayMemoryBacked) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+	h := md5.New()
+	data := bytes.NewBuffer(nil)
+	if opts.Reader == nil {
+		data.Write(opts.Data)
+		h.Write(data.Bytes())
+	} else {
+		io.Copy(io.MultiWriter(h, data), opts.Reader)
+	}
+	locator := fmt.Sprintf("%x+%d", h.Sum(nil), data.Len())
+	k.mtx.Lock()
+	if k.data == nil {
+		k.data = map[string][]byte{}
+	}
+	k.data[locator] = data.Bytes()
+	k.mtx.Unlock()
+	return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
+}
+
+func (s *keepCacheSuite) TestMaxSize(c *check.C) {
+	backend := &keepGatewayMemoryBacked{}
+	cache := DiskCache{
+		KeepGateway: backend,
+		MaxSize:     40000000,
+		Dir:         c.MkDir(),
+		Logger:      ctxlog.TestLogger(c),
+	}
+	ctx := context.Background()
+	resp1, err := cache.BlockWrite(ctx, BlockWriteOptions{
+		Data: make([]byte, 44000000),
+	})
+	c.Check(err, check.IsNil)
+	time.Sleep(time.Millisecond)
+	resp2, err := cache.BlockWrite(ctx, BlockWriteOptions{
+		Data: make([]byte, 32000000),
+	})
+	c.Check(err, check.IsNil)
+	delete(backend.data, resp1.Locator)
+	delete(backend.data, resp2.Locator)
+	cache.tidyHoldUntil = time.Time{}
+	cache.tidy()
+
+	n, err := cache.ReadAt(resp1.Locator, make([]byte, 2), 0)
+	c.Check(n, check.Equals, 0)
+	c.Check(err, check.ErrorMatches, `block not found: .*\+44000000`)
+
+	n, err = cache.ReadAt(resp2.Locator, make([]byte, 2), 0)
+	c.Check(n > 0, check.Equals, true)
+	c.Check(err, check.IsNil)
+}
+
+func (s *keepCacheSuite) TestConcurrentReaders(c *check.C) {
+	blksize := 64000000
+	backend := &keepGatewayMemoryBacked{}
+	cache := DiskCache{
+		KeepGateway: backend,
+		MaxSize:     int64(blksize),
+		Dir:         c.MkDir(),
+		Logger:      ctxlog.TestLogger(c),
+	}
+	ctx := context.Background()
+	resp, err := cache.BlockWrite(ctx, BlockWriteOptions{
+		Data: make([]byte, blksize),
+	})
+	c.Check(err, check.IsNil)
+	delete(backend.data, resp.Locator)
+
+	failed := false
+	var wg sync.WaitGroup
+	for offset := 0; offset < blksize; offset += 123456 {
+		offset := offset
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			buf := make([]byte, 654321)
+			if offset+len(buf) > blksize {
+				buf = buf[:blksize-offset]
+			}
+			n, err := cache.ReadAt(resp.Locator, buf, offset)
+			if failed {
+				// don't fill logs with subsequent errors
+				return
+			}
+			if !c.Check(err, check.IsNil, check.Commentf("offset=%d", offset)) {
+				failed = true
+			}
+			c.Assert(n, check.Equals, len(buf))
+		}()
+	}
+	wg.Wait()
+}
diff --git a/sdk/go/arvadostest/api.go b/sdk/go/arvadostest/api.go
index 4e214414d7..706e49542f 100644
--- a/sdk/go/arvadostest/api.go
+++ b/sdk/go/arvadostest/api.go
@@ -356,6 +356,26 @@ func (as *APIStub) APIClientAuthorizationGet(ctx context.Context, options arvado
 	as.appendCall(ctx, as.APIClientAuthorizationGet, options)
 	return arvados.APIClientAuthorization{}, as.Error
 }
+func (as *APIStub) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	as.appendCall(context.TODO(), as.ReadAt, struct {
+		locator string
+		dst     []byte
+		offset  int
+	}{locator, dst, offset})
+	return 0, as.Error
+}
+func (as *APIStub) BlockRead(ctx context.Context, options arvados.BlockReadOptions) (int, error) {
+	as.appendCall(ctx, as.BlockRead, options)
+	return 0, as.Error
+}
+func (as *APIStub) BlockWrite(ctx context.Context, options arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+	as.appendCall(ctx, as.BlockWrite, options)
+	return arvados.BlockWriteResponse{}, as.Error
+}
+func (as *APIStub) LocalLocator(locator string) (int, error) {
+	as.appendCall(context.TODO(), as.LocalLocator, locator)
+	return 0, as.Error
+}
 
 func (as *APIStub) appendCall(ctx context.Context, method interface{}, options interface{}) {
 	as.mtx.Lock()
diff --git a/sdk/go/arvadostest/keep_stub.go b/sdk/go/arvadostest/keep_stub.go
new file mode 100644
index 0000000000..ddfa3909bb
--- /dev/null
+++ b/sdk/go/arvadostest/keep_stub.go
@@ -0,0 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+type KeepStub struct{}

commit 52fa01051f4633e3bbbfcdf8c55994e7cd91212a
Author: Tom Clegg <tom at curii.com>
Date:   Mon Nov 27 11:20:09 2023 -0500

    20318: Add disk-backed block cache.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go
index f4ac1ab3c4..2a527567e5 100644
--- a/sdk/go/arvados/api.go
+++ b/sdk/go/arvados/api.go
@@ -240,11 +240,16 @@ type LogoutOptions struct {
 	ReturnTo string `json:"return_to"` // Redirect to this URL after logging out
 }
 
+type BlockReadOptions struct {
+	Locator string
+	WriteTo io.Writer
+}
+
 type BlockWriteOptions struct {
 	Hash           string
 	Data           []byte
-	Reader         io.Reader
-	DataSize       int // Must be set if Data is nil.
+	Reader         io.Reader // Must be set if Data is nil.
+	DataSize       int       // Must be set if Data is nil.
 	RequestID      string
 	StorageClasses []string
 	Replicas       int
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
new file mode 100644
index 0000000000..b9c7fea4b0
--- /dev/null
+++ b/sdk/go/arvados/keep_cache.go
@@ -0,0 +1,414 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+	"bytes"
+	"context"
+	"crypto/md5"
+	"errors"
+	"fmt"
+	"io"
+	"io/fs"
+	"os"
+	"path/filepath"
+	"sort"
+	"strconv"
+	"strings"
+	"sync/atomic"
+	"syscall"
+	"time"
+
+	"github.com/sirupsen/logrus"
+	"golang.org/x/sys/unix"
+)
+
+type KeepGateway interface {
+	ReadAt(locator string, dst []byte, offset int) (int, error)
+	BlockRead(ctx context.Context, opts BlockReadOptions) (int, error)
+	BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error)
+	LocalLocator(locator string) (string, error)
+}
+
+// DiskCache wraps KeepGateway, adding a disk-based cache layer.
+type DiskCache struct {
+	KeepGateway
+	Dir     string
+	MaxSize int64
+	Logger  logrus.FieldLogger
+
+	tidying        int32 // see tidy()
+	tidyHoldUntil  time.Time
+	defaultMaxSize int64
+}
+
+var (
+	cacheFileSuffix = ".keepcacheblock"
+	tmpFileSuffix   = ".tmp"
+)
+
+func (cache *DiskCache) cacheFile(locator string) string {
+	hash := locator
+	if i := strings.Index(hash, "+"); i > 0 {
+		hash = hash[:i]
+	}
+	return filepath.Join(cache.Dir, hash[:3], hash+cacheFileSuffix)
+}
+
+// Open a cache file, creating the parent dir if necessary.
+func (cache *DiskCache) openFile(name string, flags int) (*os.File, error) {
+	f, err := os.OpenFile(name, flags, 0600)
+	if os.IsNotExist(err) {
+		// Create the parent dir and try again. (We could have
+		// checked/created the parent dir before, but that
+		// would be less efficient in the much more common
+		// situation where it already exists.)
+		parent, _ := filepath.Split(name)
+		os.Mkdir(parent, 0700)
+		f, err = os.OpenFile(name, flags, 0600)
+	}
+	return f, err
+}
+
+// Rename a file, creating the new path's parent dir if necessary.
+func (cache *DiskCache) rename(old, new string) error {
+	if nil == os.Rename(old, new) {
+		return nil
+	}
+	parent, _ := filepath.Split(new)
+	os.Mkdir(parent, 0700)
+	return os.Rename(old, new)
+}
+
+func (cache *DiskCache) debugf(format string, args ...interface{}) {
+	logger := cache.Logger
+	if logger == nil {
+		return
+	}
+	logger.Debugf(format, args...)
+}
+
+// BlockWrite writes through to the wrapped KeepGateway, and (if
+// possible) retains a copy of the written block in the cache.
+func (cache *DiskCache) BlockWrite(ctx context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+	cache.gotidy()
+	unique := fmt.Sprintf("%x.%p%s", os.Getpid(), &opts, tmpFileSuffix)
+	tmpfilename := filepath.Join(cache.Dir, "tmp", unique)
+	tmpfile, err := cache.openFile(tmpfilename, os.O_CREATE|os.O_EXCL|os.O_RDWR)
+	if err != nil {
+		cache.debugf("BlockWrite: open(%s) failed: %s", tmpfilename, err)
+		return cache.KeepGateway.BlockWrite(ctx, opts)
+	}
+
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+	copyerr := make(chan error, 1)
+
+	// Start a goroutine to copy the caller's source data to
+	// tmpfile, a hash checker, and (via pipe) the wrapped
+	// KeepGateway.
+	pipereader, pipewriter := io.Pipe()
+	defer pipereader.Close()
+	go func() {
+		defer tmpfile.Close()
+		defer os.Remove(tmpfilename)
+		defer pipewriter.Close()
+
+		// Copy from opts.Data or opts.Reader, depending on
+		// which was provided.
+		var src io.Reader
+		if opts.Data != nil {
+			src = bytes.NewReader(opts.Data)
+		} else {
+			src = opts.Reader
+		}
+
+		hashcheck := md5.New()
+		n, err := io.Copy(io.MultiWriter(tmpfile, pipewriter, hashcheck), src)
+		if err != nil {
+			copyerr <- err
+			cancel()
+			return
+		} else if opts.DataSize > 0 && opts.DataSize != int(n) {
+			copyerr <- fmt.Errorf("block size %d did not match provided size %d", n, opts.DataSize)
+			cancel()
+			return
+		}
+		err = tmpfile.Close()
+		if err != nil {
+			// Don't rename tmpfile into place, but allow
+			// the BlockWrite call to succeed if nothing
+			// else goes wrong.
+			return
+		}
+		hash := fmt.Sprintf("%x", hashcheck.Sum(nil))
+		if opts.Hash != "" && opts.Hash != hash {
+			// Even if the wrapped KeepGateway doesn't
+			// notice a problem, this should count as an
+			// error.
+			copyerr <- fmt.Errorf("block hash %s did not match provided hash %s", hash, opts.Hash)
+			cancel()
+			return
+		}
+		cachefilename := cache.cacheFile(hash)
+		err = cache.rename(tmpfilename, cachefilename)
+		if err != nil {
+			cache.debugf("BlockWrite: rename(%s, %s) failed: %s", tmpfilename, cachefilename, err)
+		}
+	}()
+
+	// Write through to the wrapped KeepGateway from the pipe,
+	// instead of the original reader.
+	newopts := opts
+	if newopts.DataSize == 0 {
+		newopts.DataSize = len(newopts.Data)
+	}
+	newopts.Reader = pipereader
+	newopts.Data = nil
+
+	resp, err := cache.KeepGateway.BlockWrite(ctx, newopts)
+	if len(copyerr) > 0 {
+		// If the copy-to-pipe goroutine failed, that error
+		// will be more helpful than the resulting "context
+		// canceled" or "read [from pipereader] failed" error
+		// seen by the wrapped KeepGateway.
+		//
+		// If the wrapped KeepGateway encounters an error
+		// before all the data is copied into the pipe, it
+		// stops reading from the pipe, which causes the
+		// io.Copy() in the goroutine to block until our
+		// deferred pipereader.Close() call runs. In that case
+		// len(copyerr)==0 here, so the wrapped KeepGateway
+		// error is the one we return to our caller.
+		err = <-copyerr
+	}
+	return resp, err
+}
+
+// ReadAt reads the entire block from the wrapped KeepGateway into the
+// cache if needed, and copies the requested portion into the provided
+// slice.
+func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	cache.gotidy()
+	cachefilename := cache.cacheFile(locator)
+	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+	if err != nil {
+		cache.debugf("ReadAt: open(%s) failed: %s", cachefilename, err)
+		return cache.KeepGateway.ReadAt(locator, dst, offset)
+	}
+	defer f.Close()
+
+	err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+	if err != nil {
+		return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+	}
+
+	size, err := f.Seek(0, io.SeekEnd)
+	if err != nil {
+		return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
+	}
+	if size < int64(len(dst)+offset) {
+		// The cache file seems to be truncated or empty
+		// (possibly because we just created it). Wait for an
+		// exclusive lock, then check again (in case someone
+		// else is doing the same thing) before trying to
+		// retrieve the entire block.
+		err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+		if err != nil {
+			return 0, fmt.Errorf("flock(%s, lock_ex) failed: %w", cachefilename, err)
+		}
+	}
+	size, err = f.Seek(0, io.SeekEnd)
+	if err != nil {
+		return 0, fmt.Errorf("seek(%s, seek_end) failed: %w", cachefilename, err)
+	}
+	if size < int64(len(dst)+offset) {
+		// The cache file is truncated or empty, and we own it
+		// now. Fill it.
+		_, err = f.Seek(0, io.SeekStart)
+		if err != nil {
+			return 0, fmt.Errorf("seek(%s, seek_start) failed: %w", cachefilename, err)
+		}
+		n, err := cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{Locator: locator, WriteTo: f})
+		if err != nil {
+			return 0, err
+		}
+		f.Truncate(int64(n))
+	}
+	return f.ReadAt(dst, int64(offset))
+}
+
+// ReadAt reads the entire block from the wrapped KeepGateway into the
+// cache if needed, and writes it to the provided writer.
+func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (int, error) {
+	cache.gotidy()
+	cachefilename := cache.cacheFile(opts.Locator)
+	f, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
+	if err != nil {
+		cache.debugf("BlockRead: open(%s) failed: %s", cachefilename, err)
+		return cache.KeepGateway.BlockRead(ctx, opts)
+	}
+	defer f.Close()
+
+	i := strings.Index(opts.Locator, "+")
+	if i < 0 || i >= len(opts.Locator) {
+		return 0, errors.New("invalid block locator: no size hint")
+	}
+	sizestr := opts.Locator[i+1:]
+	i = strings.Index(sizestr, "+")
+	if i > 0 {
+		sizestr = sizestr[:i]
+	}
+	blocksize, err := strconv.ParseInt(sizestr, 10, 32)
+	if err != nil || blocksize < 0 {
+		return 0, errors.New("invalid block locator: invalid size hint")
+	}
+
+	err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+	if err != nil {
+		return 0, err
+	}
+	filesize, err := f.Seek(0, io.SeekEnd)
+	if err != nil {
+		return 0, err
+	}
+	_, err = f.Seek(0, io.SeekStart)
+	if err != nil {
+		return 0, err
+	}
+	if filesize == blocksize {
+		n, err := io.Copy(opts.WriteTo, f)
+		return int(n), err
+	}
+	err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+	if err != nil {
+		return 0, err
+	}
+	opts.WriteTo = io.MultiWriter(f, opts.WriteTo)
+	n, err := cache.KeepGateway.BlockRead(ctx, opts)
+	if err != nil {
+		return int(n), err
+	}
+	f.Truncate(int64(n))
+	return n, nil
+}
+
+// Start a tidy() goroutine, unless one is already running / recently
+// finished.
+func (cache *DiskCache) gotidy() {
+	// Return quickly if another tidy goroutine is running in this process.
+	n := atomic.AddInt32(&cache.tidying, 1)
+	if n != 1 || time.Now().Before(cache.tidyHoldUntil) {
+		atomic.AddInt32(&cache.tidying, -1)
+		return
+	}
+	go func() {
+		cache.tidy()
+		cache.tidyHoldUntil = time.Now().Add(10 * time.Second)
+		atomic.AddInt32(&cache.tidying, -1)
+	}()
+}
+
+// Delete cache files as needed to control disk usage.
+func (cache *DiskCache) tidy() {
+	maxsize := cache.MaxSize
+	if maxsize < 1 {
+		if maxsize = atomic.LoadInt64(&cache.defaultMaxSize); maxsize == 0 {
+			var stat unix.Statfs_t
+			if nil == unix.Statfs(cache.Dir, &stat) {
+				maxsize = int64(stat.Bavail) * stat.Bsize / 10
+			}
+			atomic.StoreInt64(&cache.defaultMaxSize, maxsize)
+		}
+	}
+
+	// Bail if a tidy goroutine is running in a different process.
+	lockfile, err := cache.openFile(filepath.Join(cache.Dir, "tmp", "tidy.lock"), os.O_CREATE|os.O_WRONLY)
+	if err != nil {
+		return
+	}
+	defer lockfile.Close()
+	err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+	if err != nil {
+		return
+	}
+
+	type entT struct {
+		path  string
+		atime time.Time
+		size  int64
+	}
+	var ents []entT
+	var totalsize int64
+	filepath.Walk(cache.Dir, func(path string, info fs.FileInfo, err error) error {
+		if err != nil {
+			cache.debugf("tidy: skipping dir %s: %s", path, err)
+			return nil
+		}
+		if info.IsDir() {
+			return nil
+		}
+		if !strings.HasSuffix(path, cacheFileSuffix) && !strings.HasSuffix(path, tmpFileSuffix) {
+			return nil
+		}
+		var atime time.Time
+		if stat, ok := info.Sys().(*syscall.Stat_t); ok {
+			// Access time is available (hopefully the
+			// filesystem is not mounted with noatime)
+			atime = time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
+		} else {
+			// If access time isn't available we fall back
+			// to sorting by modification time.
+			atime = info.ModTime()
+		}
+		ents = append(ents, entT{path, atime, info.Size()})
+		totalsize += info.Size()
+		return nil
+	})
+	if cache.Logger != nil {
+		cache.Logger.WithFields(logrus.Fields{
+			"totalsize": totalsize,
+			"maxsize":   maxsize,
+		}).Debugf("DiskCache: checked current cache usage")
+	}
+
+	// If MaxSize wasn't specified and we failed to come up with a
+	// defaultSize above, use the larger of {current cache size, 1
+	// GiB} as the defaultSize for subsequent tidy() operations.
+	if maxsize == 0 {
+		if totalsize < 1<<30 {
+			atomic.StoreInt64(&cache.defaultMaxSize, 1<<30)
+		} else {
+			atomic.StoreInt64(&cache.defaultMaxSize, totalsize)
+		}
+		cache.debugf("found initial size %d, setting defaultMaxSize %d", totalsize, cache.defaultMaxSize)
+		return
+	}
+
+	if totalsize <= maxsize {
+		return
+	}
+
+	// Delete oldest entries until totalsize < maxsize.
+	sort.Slice(ents, func(i, j int) bool {
+		return ents[i].atime.Before(ents[j].atime)
+	})
+	deleted := 0
+	for _, ent := range ents {
+		os.Remove(ent.path)
+		deleted++
+		totalsize -= ent.size
+		if totalsize <= maxsize {
+			break
+		}
+	}
+
+	if cache.Logger != nil {
+		cache.Logger.WithFields(logrus.Fields{
+			"deleted":   deleted,
+			"totalsize": totalsize,
+		}).Debugf("DiskCache: remaining cache usage after deleting")
+	}
+}
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index 89eecc6e27..37eee4de20 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -5,13 +5,16 @@
 package keepclient
 
 import (
-	"fmt"
+	"bytes"
+	"context"
 	"io"
 	"sort"
 	"strconv"
 	"strings"
 	"sync"
 	"time"
+
+	"git.arvados.org/arvados.git/sdk/go/arvados"
 )
 
 var DefaultBlockCache = &BlockCache{}
@@ -54,8 +57,8 @@ func (c *BlockCache) Sweep() {
 
 // ReadAt returns data from the cache, first retrieving it from Keep if
 // necessary.
-func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
-	buf, err := c.Get(kc, locator)
+func (c *BlockCache) ReadAt(upstream arvados.KeepGateway, locator string, p []byte, off int) (int, error) {
+	buf, err := c.get(upstream, locator)
 	if err != nil {
 		return 0, err
 	}
@@ -65,9 +68,9 @@ func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (
 	return copy(p, buf[off:]), nil
 }
 
-// Get returns data from the cache, first retrieving it from Keep if
+// Get a block from the cache, first retrieving it from Keep if
 // necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) get(upstream arvados.KeepGateway, locator string) ([]byte, error) {
 	cacheKey := locator[:32]
 	bufsize := BLOCKSIZE
 	if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
@@ -88,21 +91,10 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
 		}
 		c.cache[cacheKey] = b
 		go func() {
-			rdr, size, _, err := kc.Get(locator)
-			var data []byte
-			if err == nil {
-				data = make([]byte, size, bufsize)
-				_, err = io.ReadFull(rdr, data)
-				err2 := rdr.Close()
-				if err == nil && err2 != nil {
-					err = fmt.Errorf("close(): %w", err2)
-				}
-				if err != nil {
-					err = fmt.Errorf("Get %s: %w", locator, err)
-				}
-			}
+			buf := bytes.NewBuffer(make([]byte, 0, bufsize))
+			_, err := upstream.BlockRead(context.Background(), arvados.BlockReadOptions{Locator: locator, WriteTo: buf})
 			c.mtx.Lock()
-			b.data, b.err = data, err
+			b.data, b.err = buf.Bytes(), err
 			c.mtx.Unlock()
 			close(b.fetched)
 			go c.Sweep()
diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go
index 75603f1baa..65dcd9ac8a 100644
--- a/sdk/go/keepclient/collectionreader_test.go
+++ b/sdk/go/keepclient/collectionreader_test.go
@@ -237,7 +237,15 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
 }
 
 func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+	// Disable disk cache
+	defer func(home string) {
+		os.Setenv("HOME", home)
+	}(os.Getenv("HOME"))
+	os.Setenv("HOME", "")
+
+	// Disable memory cache
 	s.kc.BlockCache = &BlockCache{}
+
 	s.kc.PutB([]byte("foo"))
 	s.kc.PutB([]byte("bar"))
 	s.kc.PutB([]byte("baz"))
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
new file mode 100644
index 0000000000..262d612640
--- /dev/null
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -0,0 +1,67 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package keepclient
+
+import (
+	"context"
+	"io"
+
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+)
+
+// keepViaHTTP implements arvados.KeepGateway by using a KeepClient to
+// do upstream requests to keepstore and keepproxy.
+type keepViaHTTP struct {
+	*KeepClient
+}
+
+func (kvh *keepViaHTTP) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	rdr, _, _, _, err := kvh.getOrHead("GET", locator, nil)
+	if err != nil {
+		return 0, err
+	}
+	defer rdr.Close()
+	_, err = io.CopyN(io.Discard, rdr, int64(offset))
+	if err != nil {
+		return 0, err
+	}
+	n, err := rdr.Read(dst)
+	return int(n), err
+}
+
+func (kvh *keepViaHTTP) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+	rdr, _, _, _, err := kvh.getOrHead("GET", opts.Locator, nil)
+	if err != nil {
+		return 0, err
+	}
+	defer rdr.Close()
+	n, err := io.Copy(opts.WriteTo, rdr)
+	return int(n), err
+}
+
+// keepViaBlockCache implements arvados.KeepGateway by using the given
+// KeepClient's BlockCache with the wrapped KeepGateway.
+//
+// Note the whole KeepClient gets passed in instead of just its
+// cache. This ensures the new BlockCache gets used if it changes
+// after keepViaBlockCache is initialized.
+type keepViaBlockCache struct {
+	kc *KeepClient
+	arvados.KeepGateway
+}
+
+func (kvbc *keepViaBlockCache) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	return kvbc.kc.cache().ReadAt(kvbc.KeepGateway, locator, dst, offset)
+}
+
+func (kvbc *keepViaBlockCache) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+	rdr, _, _, _, err := kvbc.kc.getOrHead("GET", opts.Locator, nil)
+	if err != nil {
+		return 0, err
+	}
+	defer rdr.Close()
+	n, err := io.Copy(opts.WriteTo, rdr)
+	return int(n), err
+}
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 68ac886ddd..18614a77eb 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -16,6 +16,8 @@ import (
 	"io/ioutil"
 	"net"
 	"net/http"
+	"os"
+	"path/filepath"
 	"regexp"
 	"strconv"
 	"strings"
@@ -118,6 +120,8 @@ type KeepClient struct {
 
 	// Disable automatic discovery of keep services
 	disableDiscovery bool
+
+	gatewayStack arvados.KeepGateway
 }
 
 func (kc *KeepClient) loadDefaultClasses() error {
@@ -332,6 +336,45 @@ func (kc *KeepClient) getOrHead(method string, locator string, header http.Heade
 	return nil, 0, "", nil, err
 }
 
+// upstreamGateway creates/returns the KeepGateway stack used to read
+// and write data: a memory cache, a disk-backed cache if available,
+// and an http backend.
+func (kc *KeepClient) upstreamGateway() arvados.KeepGateway {
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	if kc.gatewayStack != nil {
+		return kc.gatewayStack
+	}
+	var stack arvados.KeepGateway = &keepViaHTTP{kc}
+
+	// Wrap with a disk cache, if cache dir is writable
+	home := os.Getenv("HOME")
+	if fi, err := os.Stat(home); home != "" && err == nil && fi.IsDir() {
+		var err error
+		dir := home
+		for _, part := range []string{".cache", "arvados", "keep"} {
+			dir = filepath.Join(dir, part)
+			err = os.Mkdir(dir, 0700)
+		}
+		if err == nil || os.IsExist(err) {
+			os.Mkdir(filepath.Join(dir, "tmp"), 0700)
+			err = os.WriteFile(filepath.Join(dir, "tmp", "check.tmp"), nil, 0600)
+			if err == nil {
+				stack = &arvados.DiskCache{
+					Dir:         dir,
+					KeepGateway: stack,
+				}
+			}
+		}
+	}
+	stack = &keepViaBlockCache{
+		kc:          kc,
+		KeepGateway: stack,
+	}
+	kc.gatewayStack = stack
+	return stack
+}
+
 // LocalLocator returns a locator equivalent to the one supplied, but
 // with a valid signature from the local cluster. If the given locator
 // already has a local signature, it is returned unchanged.
@@ -369,7 +412,7 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
 // ReadAt retrieves a portion of block from the cache if it's
 // present, otherwise from the network.
 func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
-	return kc.cache().ReadAt(kc, locator, p, off)
+	return kc.upstreamGateway().ReadAt(locator, p, off)
 }
 
 // Ask verifies that a block with the given hash is available and

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list