[arvados] created: 2.7.0-6010-gc6c7eb9990
git repository hosting
git at public.arvados.org
Wed Feb 21 22:08:28 UTC 2024
at c6c7eb999044a2d54ab75e4a25d2104fb56544ae (commit)
commit c6c7eb999044a2d54ab75e4a25d2104fb56544ae
Author: Tom Clegg <tom at curii.com>
Date: Wed Feb 21 17:07:37 2024 -0500
18961: Pre-fetch next data block(s) when reading many small files.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 5f17f1ca7a..4c0c866570 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -649,10 +649,14 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
fn.segments[ptr.segmentIdx] = ss
}
- n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
+ current := fn.segments[ptr.segmentIdx]
+ n, err = current.ReadAt(p, int64(ptr.segmentOff))
if n > 0 {
ptr.off += int64(n)
ptr.segmentOff += n
+ if dn, ok := fn.parent.(*dirnode); ok {
+ go dn.prefetch(fn, fn.fileinfo.name, ptr)
+ }
if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
ptr.segmentIdx++
ptr.segmentOff = 0
@@ -661,24 +665,6 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
}
}
}
-
- prefetch := maxBlockSize
- for inext := ptr.segmentIdx; inext < len(fn.segments) && prefetch > 0; inext++ {
- if inext == ptr.segmentIdx && ptr.segmentOff > 0 {
- // We already implicitly pre-fetched the
- // remainder of the current segment by calling
- // ReadAt above.
- prefetch -= fn.segments[inext].Len() - ptr.segmentOff
- continue
- }
- if next, ok := fn.segments[inext].(storedSegment); ok && !next.didRead {
- next.didRead = true
- fn.segments[inext] = next
- go next.ReadAt([]byte{}, 0)
- }
- prefetch -= fn.segments[inext].Len()
- }
-
return
}
@@ -1003,6 +989,9 @@ func (fn *filenode) Splice(repl inode) error {
type dirnode struct {
fs *collectionFileSystem
treenode
+
+ prefetchNames []string
+ prefetchDone map[string]int64
}
func (dn *dirnode) FS() FileSystem {
@@ -1224,6 +1213,143 @@ func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) er
return cg.Wait()
}
+// Prefetch file data based on expected future usage.
+//
+// After a read from this dirnode's child fn with the given name that
+// leaves the file pointer at ptr (which may be at the end of a
+// segment), guess the next maxBlockSize bytes most likely to be read
+// soon, and prod the cache layer to start loading the needed blocks.
+//
+// This implementation prioritizes efficiency over completeness. The
+// main requirements are:
+//
+// - when reading a large file sequentially, pre-fetch the next
+// maxBlockSize bytes that will be needed (regardless of actual
+// block size)
+//
+// - when reading many small files in lexical order, pre-fetch the
+// next maxBlockSize bytes that will be needed (regardless of
+// actual block size)
+//
+// - minimize the overhead cost for typical sequences of read
+// operations (e.g., when a caller reads a file sequentially 1024
+// bytes at a time, and prefetch is called on each read, most of
+// the calls should be nearly free)
+func (dn *dirnode) prefetch(fn *filenode, name string, ptr filenodePtr) {
+ // pre-fetch following blocks until we're this many bytes ahead
+ todo := maxBlockSize
+
+ // Check the common case where there was a recent read from a
+ // slightly earlier offset in the same file, and as a result
+ // we have already prefetched enough data to cover
+ // ptr.off+todo.
+ {
+ dn.Lock()
+ done, ok := dn.prefetchDone[name]
+ dn.Unlock()
+ if ok && done >= ptr.off && done < ptr.off+int64(todo) {
+ return
+ }
+ }
+
+ // last locator prefetched, if any
+ var lastlocator string
+
+ fn.Lock()
+ for inext := ptr.segmentIdx; inext < len(fn.segments) && todo > 0; inext++ {
+ if inext == ptr.segmentIdx {
+ // Caller (i.e., (*filenode)Read()) has
+ // already fetched the current segment.
+ todo -= fn.segments[inext].Len() - ptr.segmentOff
+ continue
+ }
+ if next, ok := fn.segments[inext].(storedSegment); !ok {
+ todo -= fn.segments[inext].Len()
+ } else if next.didRead {
+ todo -= next.size
+ lastlocator = next.locator
+ } else {
+ next.didRead = true
+ fn.segments[inext] = next
+ go next.ReadAt([]byte{}, 0)
+ todo -= next.size
+ lastlocator = next.locator
+ }
+ }
+ fn.Unlock()
+
+ if todo < 0 {
+ return
+ }
+
+ dn.Lock()
+ defer dn.Unlock()
+ if dn.prefetchDone == nil {
+ dn.prefetchDone = make(map[string]int64)
+ }
+ if todo < 0 {
+ if done := ptr.off - int64(todo); done > dn.prefetchDone[name] {
+ dn.prefetchDone[name] = done
+ }
+ return
+ }
+ if dn.prefetchNames == nil {
+ dn.prefetchNames = make([]string, 0, len(dn.inodes))
+ for name, node := range dn.inodes {
+ if _, ok := node.(*filenode); ok {
+ dn.prefetchNames = append(dn.prefetchNames, name)
+ }
+ }
+ sort.Strings(dn.prefetchNames)
+ }
+ for iname := sort.Search(len(dn.prefetchNames), func(x int) bool {
+ return dn.prefetchNames[x] > name
+ }); iname < len(dn.prefetchNames) && todo > 0; iname++ {
+ fn, ok := dn.inodes[dn.prefetchNames[iname]].(*filenode)
+ if !ok {
+ continue
+ }
+ fn.Lock()
+ for inext := 0; inext < len(fn.segments) && todo > 0; inext++ {
+ next, ok := fn.segments[inext].(storedSegment)
+ if !ok {
+ // count in-memory data as already
+ // prefetched
+ todo -= fn.segments[inext].Len()
+ continue
+ }
+ if next.locator == lastlocator {
+ // we already subtracted this block's
+ // size from todo
+ } else if next.didRead {
+ // we already prefetched this block
+ todo -= next.size
+ } else {
+ next.didRead = true
+ fn.segments[inext] = next
+ go next.ReadAt([]byte{}, 0)
+ todo -= next.size
+ }
+ lastlocator = next.locator
+ }
+ fn.Unlock()
+ }
+ // Typically we exceed our target and a future call to
+ // prefetch(), referencing the same file with a slightly
+ // larger offset, will be a no-op. Here we record the highest
+ // ptr.off for this file that we expect to be a no-op based on
+ // the work we've just done.
+ //
+ // Note this means reading from the end of a file, then
+ // reading sequentially from the beginning, will effectively
+ // prevent prefetching data for that file. This is not ideal,
+ // but it is preferable to the performance hit of checking
+ // prefetch on every single read.
+ if done := ptr.off - int64(todo); done > dn.prefetchDone[name] {
+ dn.prefetchDone[name] = done
+ }
+}
+
func (dn *dirnode) MemorySize() (size int64) {
dn.RLock()
todo := make([]inode, 0, len(dn.inodes))
diff --git a/sdk/go/arvados/fs_filehandle.go b/sdk/go/arvados/fs_filehandle.go
index f50dd4612b..f08629c033 100644
--- a/sdk/go/arvados/fs_filehandle.go
+++ b/sdk/go/arvados/fs_filehandle.go
@@ -23,8 +23,8 @@ func (f *filehandle) Read(p []byte) (n int, err error) {
if !f.readable {
return 0, ErrWriteOnlyMode
}
- f.inode.RLock()
- defer f.inode.RUnlock()
+ f.inode.Lock()
+ defer f.inode.Unlock()
n, f.ptr, err = f.inode.Read(p, f.ptr)
return
}
commit b1bd2898c1c763054ec0fa45844b80ea3790be18
Author: Tom Clegg <tom at curii.com>
Date: Wed Feb 21 15:45:59 2024 -0500
18961: Pre-fetch next data block(s) when reading collectionFS.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 052cc1aa37..5f17f1ca7a 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -632,7 +632,7 @@ func (fn *filenode) MemorySize() (size int64) {
// Read reads file data from a single segment, starting at startPtr,
// into p. startPtr is assumed not to be up-to-date. Caller must have
-// RLock or Lock.
+// lock.
func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
ptr = fn.seek(startPtr)
if ptr.off < 0 {
@@ -645,8 +645,10 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
}
if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
ss.locator = fn.fs.refreshSignature(ss.locator)
+ ss.didRead = true
fn.segments[ptr.segmentIdx] = ss
}
+
n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
if n > 0 {
ptr.off += int64(n)
@@ -659,6 +661,24 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
}
}
}
+
+ prefetch := maxBlockSize
+ for inext := ptr.segmentIdx; inext < len(fn.segments) && prefetch > 0; inext++ {
+ if inext == ptr.segmentIdx && ptr.segmentOff > 0 {
+ // We already implicitly pre-fetched the
+ // remainder of the current segment by calling
+ // ReadAt above.
+ prefetch -= fn.segments[inext].Len() - ptr.segmentOff
+ continue
+ }
+ if next, ok := fn.segments[inext].(storedSegment); ok && !next.didRead {
+ next.didRead = true
+ fn.segments[inext] = next
+ go next.ReadAt([]byte{}, 0)
+ }
+ prefetch -= fn.segments[inext].Len()
+ }
+
return
}
@@ -1766,6 +1786,11 @@ type storedSegment struct {
size int // size of stored block (also encoded in locator)
offset int // position of segment within the stored block
length int // bytes in this segment (offset + length <= size)
+
+ // set when we first try to read from this segment, and
+ // checked before pre-fetch, to avoid unnecessary cache
+ // thrashing
+ didRead bool
}
func (se storedSegment) Len() int {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index a29371b76c..a54929d0fd 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1501,6 +1501,39 @@ func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
c.Check(string(buf), check.Equals, filedata1)
}
+func (s *CollectionFSSuite) TestPrefetchLargeFile(c *check.C) {
+ defer func(orig int) { maxBlockSize = orig }(maxBlockSize)
+ maxBlockSize = 1_000_000
+ txt := "."
+ nblocks := 10
+ locator := make([]string, nblocks)
+ data := make([]byte, maxBlockSize)
+ for i := 0; i < nblocks; i++ {
+ data[0] = byte(i)
+ resp, err := s.kc.BlockWrite(context.Background(), BlockWriteOptions{Data: data})
+ c.Assert(err, check.IsNil)
+ locator[i] = resp.Locator
+ txt += " " + resp.Locator
+ }
+ txt += fmt.Sprintf(" 0:%d:bigfile\n", nblocks*maxBlockSize)
+ fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ c.Assert(s.kc.reads, check.HasLen, 0)
+
+ // Reading the first few bytes of the file requires reading
+ // the first block, and should also trigger pre-fetch of the
+ // second block.
+ f, err := fs.Open("bigfile")
+ c.Assert(err, check.IsNil)
+ _, err = f.Read(make([]byte, 8192))
+ c.Assert(err, check.IsNil)
+ for deadline := time.Now().Add(time.Second); len(s.kc.reads) < 2 && c.Check(time.Now().Before(deadline), check.Equals, true); time.Sleep(time.Millisecond) {
+ }
+ if c.Check(s.kc.reads, check.HasLen, 2) {
+ c.Check(s.kc.reads[1], check.Not(check.Equals), s.kc.reads[0])
+ }
+}
+
func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
filedata1 := "hello refresh signatures world\n"
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index 260824453d..dd66c8c81f 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -28,6 +28,15 @@ type keepViaHTTP struct {
}
func (kvh *keepViaHTTP) ReadAt(locator string, dst []byte, offset int) (int, error) {
+ if len(dst) == 0 {
+ // arvados.collectionFileSystem uses a zero-length
+ // read to trigger pre-fetching a block into the cache
+ // before it's actually needed. If a pre-fetch
+ // request gets this far, it means there's no cache
+ // above us in the stack, so the pre-fetch signal is a
+ // no-op.
+ return 0, nil
+ }
rdr, _, _, _, err := kvh.getOrHead("GET", locator, nil)
if err != nil {
return 0, err
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list