[arvados] created: 2.7.0-6011-g1dcde0921d

git repository hosting git at public.arvados.org
Mon Feb 26 17:18:34 UTC 2024


        at  1dcde0921ddb62ae1a4ea01bc9d5179c6a994882 (commit)


commit 1dcde0921ddb62ae1a4ea01bc9d5179c6a994882
Author: Tom Clegg <tom at curii.com>
Date:   Mon Feb 26 12:18:13 2024 -0500

    18961: Add more prefetch tests, fix end-of-directory optimizations.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 9ecdc582db..c3a210faa3 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -1213,6 +1213,22 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
 	return cg.Wait()
 }
 
+var (
+	// These prefetch* counters are used by tests to check that
+	// the short-circuit optimizations in prefetch() are working
+	// as expected.
+	prefetchCall        atomic.Int64
+	prefetchWalkCurrent atomic.Int64
+	prefetchSearchNext  atomic.Int64
+	prefetchWalkNext    atomic.Int64
+	prefetchReadCurrent atomic.Int64
+	prefetchReadNext    atomic.Int64
+
+	// At runtime this is a no-op. Test cases replace this func
+	// with a call to Add(1).
+	profAdd1 = func(*atomic.Int64) {}
+)
+
 // Prefetch file data based on expected future usage.
 //
 // After a read from this dirnode's child fn with the given name that
@@ -1227,15 +1243,17 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
 //     maxBlockSize bytes that will be needed (regardless of actual
 //     block size)
 //
-//   - when reading many small files in lexical order, pre-fetch the
-//     next maxBlockSize bytes that will be needed (regardless of
-//     actual block size)
+//   - when reading many consecutive small files in lexical order,
+//     pre-fetch the next maxBlockSize bytes that will be needed
+//     (regardless of actual block size)
 //
 //   - minimize the overhead cost for typical sequences of read
 //     operations (e.g., when a caller reads a file sequentially 1024
 //     bytes at a time, and prefetch is called on each read, most of
 //     the calls should be nearly free)
 func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
+	profAdd1(&prefetchCall)
+
 	// pre-fetch following blocks until we're this many bytes ahead
 	todo := maxBlockSize
 
@@ -1255,6 +1273,7 @@ func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
 	// last locator prefetched, if any
 	var lastlocator string
 
+	profAdd1(&prefetchWalkCurrent)
 	fn.Lock()
 	for inext := ptr.segmentIdx; inext < len(fn.segments) && todo > 0; inext++ {
 		if inext == ptr.segmentIdx {
@@ -1269,6 +1288,7 @@ func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
 			todo -= next.size
 			lastlocator = next.locator
 		} else {
+			profAdd1(&prefetchReadCurrent)
 			next.didRead = true
 			fn.segments[inext] = next
 			go next.ReadAt([]byte{}, 0)
@@ -1298,9 +1318,12 @@ func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
 		}
 		sort.Strings(dn.prefetchNames)
 	}
-	for iname := sort.Search(len(dn.prefetchNames), func(x int) bool {
+	profAdd1(&prefetchSearchNext)
+	iname := sort.Search(len(dn.prefetchNames), func(x int) bool {
 		return dn.prefetchNames[x] > name
-	}); iname < len(dn.prefetchNames) && todo > 0; iname++ {
+	})
+	for ; iname < len(dn.prefetchNames) && todo > 0; iname++ {
+		profAdd1(&prefetchWalkNext)
 		fn, ok := dn.inodes[dn.prefetchNames[iname]].(*filenode)
 		if !ok {
 			continue
@@ -1318,9 +1341,11 @@ func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
 				// we already subtracted this block's
 				// size from todo
 			} else if next.didRead {
-				// we already prefetched this block
+				// someone already fetched this
+				// segment
 				todo -= next.size
 			} else {
+				profAdd1(&prefetchReadNext)
 				next.didRead = true
 				fn.segments[inext] = next
 				go next.ReadAt([]byte{}, 0)
@@ -1330,19 +1355,22 @@ func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
 		}
 		fn.Unlock()
 	}
-	// Typically we exceed our target and a future call to
-	// prefetch(), referencing the same file with a slightly
-	// larger offset, will be a no-op.  Here we record the highest
-	// ptr.off for this file that we expect to be a no-op based on
-	// the work we've just done.
-	//
-	// Note this means reading from the end of a file, then
-	// reading sequentially from the beginning, will effectively
-	// prevent prefetching data for that file.  This is not ideal,
-	// but it is preferable to the performance hit of checking
-	// prefetch on every single read.
-	if done := ptr.off - int64(todo); done > dn.prefetchDone[name] {
-		dn.prefetchDone[name] = done
+	// Typically we overshoot our target by enough margin that a
+	// future call to prefetch(), referencing the same file with a
+	// slightly larger offset, will be a no-op.  Here we record
+	// the maximum file offset for which prefetch will be a no-op
+	// based on the work we've just done.
+	if iname == len(dn.prefetchNames) && todo > 0 {
+		// If we reached the end of the last file without
+		// reaching our target, "done" as computed above would
+		// suggest that we might be able to prefetch more
+		// blocks after a future read ending at ptr.off+1.  In
+		// fact, that would be futile, so we set done to be an
+		// imaginary offset past the end of the file, so
+		// subsequent prefetch calls can return early.
+		dn.prefetchDone[name] = ptr.off + int64(maxBlockSize)
+	} else {
+		dn.prefetchDone[name] = ptr.off - int64(todo)
 	}
 }
 
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index a54929d0fd..2a339e4e39 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1501,11 +1501,8 @@ func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
 	c.Check(string(buf), check.Equals, filedata1)
 }
 
-func (s *CollectionFSSuite) TestPrefetchLargeFile(c *check.C) {
-	defer func(orig int) { maxBlockSize = orig }(maxBlockSize)
-	maxBlockSize = 1_000_000
+func (s *CollectionFSSuite) makeManifest_BigFile(c *check.C, nblocks int) string {
 	txt := "."
-	nblocks := 10
 	locator := make([]string, nblocks)
 	data := make([]byte, maxBlockSize)
 	for i := 0; i < nblocks; i++ {
@@ -1516,7 +1513,34 @@ func (s *CollectionFSSuite) TestPrefetchLargeFile(c *check.C) {
 		txt += " " + resp.Locator
 	}
 	txt += fmt.Sprintf(" 0:%d:bigfile\n", nblocks*maxBlockSize)
-	fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+	return txt
+}
+
+func (s *CollectionFSSuite) makeManifest_SmallFiles(c *check.C, nblocks, nfiles int) string {
+	txt := "."
+	locator := make([]string, nblocks)
+	data := make([]byte, maxBlockSize)
+	for i := 0; i < nblocks; i++ {
+		data[0] = byte(i)
+		resp, err := s.kc.BlockWrite(context.Background(), BlockWriteOptions{Data: data})
+		c.Assert(err, check.IsNil)
+		locator[i] = resp.Locator
+		txt += " " + resp.Locator
+	}
+	filesize := int64(maxBlockSize) * int64(nblocks) / int64(nfiles)
+	pos := int64(0)
+	for i := 0; i < nfiles; i++ {
+		txt += fmt.Sprintf(" %d:%d:smallfile%d", pos, filesize, i)
+		pos += filesize
+	}
+	txt += "\n"
+	return txt
+}
+
+func (s *CollectionFSSuite) TestPrefetchBigFile(c *check.C) {
+	defer func(orig int) { maxBlockSize = orig }(maxBlockSize)
+	maxBlockSize = 1_000_000
+	fs, err := (&Collection{ManifestText: s.makeManifest_BigFile(c, 10)}).FileSystem(s.client, s.kc)
 	c.Assert(err, check.IsNil)
 	c.Assert(s.kc.reads, check.HasLen, 0)
 
@@ -1534,6 +1558,74 @@ func (s *CollectionFSSuite) TestPrefetchLargeFile(c *check.C) {
 	}
 }
 
+func (s *CollectionFSSuite) startPrefetchCounters() {
+	prefetchCall.Store(0)
+	prefetchWalkCurrent.Store(0)
+	prefetchSearchNext.Store(0)
+	prefetchWalkNext.Store(0)
+	prefetchReadCurrent.Store(0)
+	prefetchReadNext.Store(0)
+	profAdd1 = func(counter *atomic.Int64) { counter.Add(1) }
+}
+
+func (s *CollectionFSSuite) logPrefetchCounters(c *check.C) {
+	c.Logf("prefetch counters: call %d walkCurrent %d searchNext %d walkNext %d readCurrent %d readNext %d",
+		prefetchCall.Load(),
+		prefetchWalkCurrent.Load(),
+		prefetchSearchNext.Load(),
+		prefetchWalkNext.Load(),
+		prefetchReadCurrent.Load(),
+		prefetchReadNext.Load())
+}
+
+func (s *CollectionFSSuite) TestPrefetchOptimizations_BigFile(c *check.C) {
+	defer func(orig int) { maxBlockSize = orig }(maxBlockSize)
+	maxBlockSize = 1_000_000
+	s.startPrefetchCounters()
+	fs, err := (&Collection{ManifestText: s.makeManifest_BigFile(c, 30)}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+
+	f, err := fs.Open("bigfile")
+	c.Assert(err, check.IsNil)
+	for {
+		_, err = f.Read(make([]byte, 1024))
+		if err == io.EOF {
+			break
+		}
+		c.Check(err, check.IsNil)
+	}
+	s.logPrefetchCounters(c)
+	c.Check(prefetchCall.Load()/prefetchWalkCurrent.Load() > 50, check.Equals, true)
+}
+
+func (s *CollectionFSSuite) TestPrefetchOptimizations_SmallFiles(c *check.C) {
+	defer func(orig int) { maxBlockSize = orig }(maxBlockSize)
+	maxBlockSize = 1_000_000
+	nblocks := 30
+	nfiles := 1000
+	s.startPrefetchCounters()
+	fs, err := (&Collection{ManifestText: s.makeManifest_SmallFiles(c, nblocks, nfiles)}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+
+	for i := 0; i < nfiles; i++ {
+		f, err := fs.Open(fmt.Sprintf("smallfile%d", i))
+		c.Assert(err, check.IsNil)
+		for {
+			_, err = f.Read(make([]byte, 128))
+			if err == io.EOF {
+				break
+			}
+			c.Check(err, check.IsNil)
+		}
+	}
+	s.logPrefetchCounters(c)
+	c.Check(prefetchCall.Load()/prefetchWalkCurrent.Load() > 20, check.Equals, true)
+	c.Check(prefetchCall.Load()/prefetchSearchNext.Load() > 20, check.Equals, true)
+	c.Check(prefetchCall.Load()/prefetchWalkNext.Load() > 20, check.Equals, true)
+	c.Check(prefetchReadCurrent.Load() <= int64(nblocks), check.Equals, true)
+	c.Check(prefetchReadNext.Load() <= int64(nfiles), check.Equals, true)
+}
+
 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
 	filedata1 := "hello refresh signatures world\n"
 	fs, err := (&Collection{}).FileSystem(s.client, s.kc)

commit 4548e551e46bc15bb35596043d9326c07b1cdacb
Author: Tom Clegg <tom at curii.com>
Date:   Thu Feb 22 14:05:21 2024 -0500

    18961: Pre-fetch next data block(s) when reading many small files.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 5f17f1ca7a..9ecdc582db 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -649,10 +649,14 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
 		fn.segments[ptr.segmentIdx] = ss
 	}
 
-	n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
+	current := fn.segments[ptr.segmentIdx]
+	n, err = current.ReadAt(p, int64(ptr.segmentOff))
 	if n > 0 {
 		ptr.off += int64(n)
 		ptr.segmentOff += n
+		if dn, ok := fn.parent.(*dirnode); ok {
+			go dn.prefetch(fn, fn.fileinfo.name, ptr)
+		}
 		if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
 			ptr.segmentIdx++
 			ptr.segmentOff = 0
@@ -661,24 +665,6 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
 			}
 		}
 	}
-
-	prefetch := maxBlockSize
-	for inext := ptr.segmentIdx; inext < len(fn.segments) && prefetch > 0; inext++ {
-		if inext == ptr.segmentIdx && ptr.segmentOff > 0 {
-			// We already implicitly pre-fetched the
-			// remainder of the current segment by calling
-			// ReadAt above.
-			prefetch -= fn.segments[inext].Len() - ptr.segmentOff
-			continue
-		}
-		if next, ok := fn.segments[inext].(storedSegment); ok && !next.didRead {
-			next.didRead = true
-			fn.segments[inext] = next
-			go next.ReadAt([]byte{}, 0)
-		}
-		prefetch -= fn.segments[inext].Len()
-	}
-
 	return
 }
 
@@ -1003,6 +989,9 @@ func (fn *filenode) Splice(repl inode) error {
 type dirnode struct {
 	fs *collectionFileSystem
 	treenode
+
+	prefetchNames []string
+	prefetchDone  map[string]int64
 }
 
 func (dn *dirnode) FS() FileSystem {
@@ -1224,6 +1213,139 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
 	return cg.Wait()
 }
 
+// Prefetch file data based on expected future usage.
+//
+// After a read from this dirnode's child fn with the given name that
+// leaves the file pointer at ptr (which may be at the end of a
+// segment), guess the next maxBlockSize bytes most likely to be read
+// soon, and prod the cache layer to start loading the needed blocks.
+//
+// This implementation prioritizes efficiency over completeness. The
+// main requirements are:
+//
+//   - when reading a large file sequentially, pre-fetch the next
+//     maxBlockSize bytes that will be needed (regardless of actual
+//     block size)
+//
+//   - when reading many small files in lexical order, pre-fetch the
+//     next maxBlockSize bytes that will be needed (regardless of
+//     actual block size)
+//
+//   - minimize the overhead cost for typical sequences of read
+//     operations (e.g., when a caller reads a file sequentially 1024
+//     bytes at a time, and prefetch is called on each read, most of
+//     the calls should be nearly free)
+func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
+	// pre-fetch following blocks until we're this many bytes ahead
+	todo := maxBlockSize
+
+	// Check the common case where there was a recent read from a
+	// slightly earlier offset in the same file, and as a result
+	// we have already prefetched enough data to cover
+	// ptr.off+todo.
+	{
+		dn.Lock()
+		done, ok := dn.prefetchDone[name]
+		dn.Unlock()
+		if ok && done >= ptr.off && done < ptr.off+int64(todo) {
+			return
+		}
+	}
+
+	// last locator prefetched, if any
+	var lastlocator string
+
+	fn.Lock()
+	for inext := ptr.segmentIdx; inext < len(fn.segments) && todo > 0; inext++ {
+		if inext == ptr.segmentIdx {
+			// Caller (i.e., (*filenode)Read()) has
+			// already fetched the current segment.
+			todo -= fn.segments[inext].Len() - ptr.segmentOff
+			continue
+		}
+		if next, ok := fn.segments[inext].(storedSegment); !ok {
+			todo -= fn.segments[inext].Len()
+		} else if next.didRead {
+			todo -= next.size
+			lastlocator = next.locator
+		} else {
+			next.didRead = true
+			fn.segments[inext] = next
+			go next.ReadAt([]byte{}, 0)
+			todo -= next.size
+			lastlocator = next.locator
+		}
+	}
+	fn.Unlock()
+
+	dn.Lock()
+	defer dn.Unlock()
+	if dn.prefetchDone == nil {
+		dn.prefetchDone = make(map[string]int64)
+	}
+	if todo < 0 {
+		if done := ptr.off - int64(todo); done > dn.prefetchDone[name] {
+			dn.prefetchDone[name] = done
+		}
+		return
+	}
+	if dn.prefetchNames == nil {
+		dn.prefetchNames = make([]string, 0, len(dn.inodes))
+		for name, node := range dn.inodes {
+			if _, ok := node.(*filenode); ok {
+				dn.prefetchNames = append(dn.prefetchNames, name)
+			}
+		}
+		sort.Strings(dn.prefetchNames)
+	}
+	for iname := sort.Search(len(dn.prefetchNames), func(x int) bool {
+		return dn.prefetchNames[x] > name
+	}); iname < len(dn.prefetchNames) && todo > 0; iname++ {
+		fn, ok := dn.inodes[dn.prefetchNames[iname]].(*filenode)
+		if !ok {
+			continue
+		}
+		fn.Lock()
+		for inext := 0; inext < len(fn.segments) && todo > 0; inext++ {
+			next, ok := fn.segments[inext].(storedSegment)
+			if !ok {
+				// count in-memory data as already
+				// prefetched
+				todo -= fn.segments[inext].Len()
+				continue
+			}
+			if next.locator == lastlocator {
+				// we already subtracted this block's
+				// size from todo
+			} else if next.didRead {
+				// we already prefetched this block
+				todo -= next.size
+			} else {
+				next.didRead = true
+				fn.segments[inext] = next
+				go next.ReadAt([]byte{}, 0)
+				todo -= next.size
+			}
+			lastlocator = next.locator
+		}
+		fn.Unlock()
+	}
+	// Typically we exceed our target and a future call to
+	// prefetch(), referencing the same file with a slightly
+	// larger offset, will be a no-op.  Here we record the highest
+	// ptr.off for this file that we expect to be a no-op based on
+	// the work we've just done.
+	//
+	// Note this means reading from the end of a file, then
+	// reading sequentially from the beginning, will effectively
+	// prevent prefetching data for that file.  This is not ideal,
+	// but it is preferable to the performance hit of checking
+	// prefetch on every single read.
+	if done := ptr.off - int64(todo); done > dn.prefetchDone[name] {
+		dn.prefetchDone[name] = done
+	}
+}
+
 func (dn *dirnode) MemorySize() (size int64) {
 	dn.RLock()
 	todo := make([]inode, 0, len(dn.inodes))
diff --git a/sdk/go/arvados/fs_filehandle.go b/sdk/go/arvados/fs_filehandle.go
index f50dd4612b..f08629c033 100644
--- a/sdk/go/arvados/fs_filehandle.go
+++ b/sdk/go/arvados/fs_filehandle.go
@@ -23,8 +23,8 @@ func (f *filehandle) Read(p []byte) (n int, err error) {
 	if !f.readable {
 		return 0, ErrWriteOnlyMode
 	}
-	f.inode.RLock()
-	defer f.inode.RUnlock()
+	f.inode.Lock()
+	defer f.inode.Unlock()
 	n, f.ptr, err = f.inode.Read(p, f.ptr)
 	return
 }

commit b1bd2898c1c763054ec0fa45844b80ea3790be18
Author: Tom Clegg <tom at curii.com>
Date:   Wed Feb 21 15:45:59 2024 -0500

    18961: Pre-fetch next data block(s) when reading collectionFS.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 052cc1aa37..5f17f1ca7a 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -632,7 +632,7 @@ func (fn *filenode) MemorySize() (size int64) {
 
 // Read reads file data from a single segment, starting at startPtr,
 // into p. startPtr is assumed not to be up-to-date. Caller must have
-// RLock or Lock.
+// lock.
 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
 	ptr = fn.seek(startPtr)
 	if ptr.off < 0 {
@@ -645,8 +645,10 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
 	}
 	if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
 		ss.locator = fn.fs.refreshSignature(ss.locator)
+		ss.didRead = true
 		fn.segments[ptr.segmentIdx] = ss
 	}
+
 	n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
 	if n > 0 {
 		ptr.off += int64(n)
@@ -659,6 +661,24 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
 			}
 		}
 	}
+
+	prefetch := maxBlockSize
+	for inext := ptr.segmentIdx; inext < len(fn.segments) && prefetch > 0; inext++ {
+		if inext == ptr.segmentIdx && ptr.segmentOff > 0 {
+			// We already implicitly pre-fetched the
+			// remainder of the current segment by calling
+			// ReadAt above.
+			prefetch -= fn.segments[inext].Len() - ptr.segmentOff
+			continue
+		}
+		if next, ok := fn.segments[inext].(storedSegment); ok && !next.didRead {
+			next.didRead = true
+			fn.segments[inext] = next
+			go next.ReadAt([]byte{}, 0)
+		}
+		prefetch -= fn.segments[inext].Len()
+	}
+
 	return
 }
 
@@ -1766,6 +1786,11 @@ type storedSegment struct {
 	size    int // size of stored block (also encoded in locator)
 	offset  int // position of segment within the stored block
 	length  int // bytes in this segment (offset + length <= size)
+
+	// set when we first try to read from this segment, and
+	// checked before pre-fetch, to avoid unnecessary cache
+	// thrashing
+	didRead bool
 }
 
 func (se storedSegment) Len() int {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index a29371b76c..a54929d0fd 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1501,6 +1501,39 @@ func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
 	c.Check(string(buf), check.Equals, filedata1)
 }
 
+func (s *CollectionFSSuite) TestPrefetchLargeFile(c *check.C) {
+	defer func(orig int) { maxBlockSize = orig }(maxBlockSize)
+	maxBlockSize = 1_000_000
+	txt := "."
+	nblocks := 10
+	locator := make([]string, nblocks)
+	data := make([]byte, maxBlockSize)
+	for i := 0; i < nblocks; i++ {
+		data[0] = byte(i)
+		resp, err := s.kc.BlockWrite(context.Background(), BlockWriteOptions{Data: data})
+		c.Assert(err, check.IsNil)
+		locator[i] = resp.Locator
+		txt += " " + resp.Locator
+	}
+	txt += fmt.Sprintf(" 0:%d:bigfile\n", nblocks*maxBlockSize)
+	fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+	c.Assert(err, check.IsNil)
+	c.Assert(s.kc.reads, check.HasLen, 0)
+
+	// Reading the first few bytes of the file requires reading
+	// the first block, and should also trigger pre-fetch of the
+	// second block.
+	f, err := fs.Open("bigfile")
+	c.Assert(err, check.IsNil)
+	_, err = f.Read(make([]byte, 8192))
+	c.Assert(err, check.IsNil)
+	for deadline := time.Now().Add(time.Second); len(s.kc.reads) < 2 && c.Check(time.Now().Before(deadline), check.Equals, true); time.Sleep(time.Millisecond) {
+	}
+	if c.Check(s.kc.reads, check.HasLen, 2) {
+		c.Check(s.kc.reads[1], check.Not(check.Equals), s.kc.reads[0])
+	}
+}
+
 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
 	filedata1 := "hello refresh signatures world\n"
 	fs, err := (&Collection{}).FileSystem(s.client, s.kc)
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index 260824453d..dd66c8c81f 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -28,6 +28,15 @@ type keepViaHTTP struct {
 }
 
 func (kvh *keepViaHTTP) ReadAt(locator string, dst []byte, offset int) (int, error) {
+	if len(dst) == 0 {
+		// arvados.collectionFileSystem uses a zero-length
+		// read to trigger pre-fetching a block into the cache
+		// before it's actually needed.  If a pre-fetch
+		// request gets this far, it means there's no cache
+		// above us in the stack, so the pre-fetch signal is a
+		// no-op.
+		return 0, nil
+	}
 	rdr, _, _, _, err := kvh.getOrHead("GET", locator, nil)
 	if err != nil {
 		return 0, err

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list