[arvados] updated: 2.7.2-5-ge5dad797b0
git repository hosting
git at public.arvados.org
Wed May 22 18:18:12 UTC 2024
Summary of changes:
sdk/go/arvados/collection.go | 59 +++++++++++++++++------
sdk/go/arvados/fs_collection.go | 88 ++++++++++++++++++++--------------
sdk/go/arvados/fs_collection_test.go | 81 +++++++++++++++++++++++++------
sdk/python/arvados/arvfile.py | 62 +++++++++++++++++-------
sdk/python/arvados/collection.py | 3 +-
services/fuse/arvados_fuse/fusefile.py | 2 +-
6 files changed, 210 insertions(+), 85 deletions(-)
via e5dad797b0dc62ca2af798826d7a2fbb8b7a5ce7 (commit)
via 78c2cfac3a5de71aa5f83ae815f462990454214c (commit)
from 1c61a775680970a5ee8b5e8080fa96120113b7cf (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e5dad797b0dc62ca2af798826d7a2fbb8b7a5ce7
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed May 22 14:15:46 2024 -0400
Merge branch '21718-memoryview-readfrom-v2' refs #21718
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index e0e972b5c1..33246e3fdd 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1048,18 +1048,27 @@ class ArvadosFile(object):
# size == self.size()
pass
- def readfrom(self, offset, size, num_retries, exact=False):
+ def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
"""Read up to `size` bytes from the file starting at `offset`.
- :exact:
- If False (default), return less data than requested if the read
- crosses a block boundary and the next block isn't cached. If True,
- only return less data than requested when hitting EOF.
+ Arguments:
+
+ * exact: bool --- If False (default), return less data than
+ requested if the read crosses a block boundary and the next
+ block isn't cached. If True, only return less data than
+ requested when hitting EOF.
+
+ * return_memoryview: bool --- If False (default) return a
+ `bytes` object, which may entail making a copy in some
+ situations. If True, return a `memoryview` object which may
+ avoid making a copy, but may be incompatible with code
+ expecting a `bytes` object.
+
"""
with self.lock:
if size == 0 or offset >= self.size():
- return b''
+ return memoryview(b'') if return_memoryview else b''
readsegs = locators_and_ranges(self._segments, offset, size)
prefetch = None
@@ -1099,9 +1108,10 @@ class ArvadosFile(object):
locs.add(lr.locator)
if len(data) == 1:
- return data[0]
+ return data[0] if return_memoryview else data[0].tobytes()
else:
- return b''.join(data)
+ return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
+
@must_be_writable
@synchronized
@@ -1266,33 +1276,49 @@ class ArvadosFileReader(ArvadosFileReaderBase):
@_FileLikeObjectBase._before_close
@retry_method
- def read(self, size=None, num_retries=None):
+ def read(self, size=-1, num_retries=None, return_memoryview=False):
"""Read up to `size` bytes from the file and return the result.
- Starts at the current file position. If `size` is None, read the
- entire remainder of the file.
+ Starts at the current file position. If `size` is negative or None,
+ read the entire remainder of the file.
+
+ Returns None if the file pointer is at the end of the file.
+
+ Returns a `bytes` object, unless `return_memoryview` is True,
+ in which case it returns a memory view, which may avoid an
+ unnecessary data copy in some situations.
+
"""
- if size is None:
+ if size < 0 or size is None:
data = []
- rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+ #
+ # specify exact=False, return_memoryview=True here so that we
+ # only copy data once into the final buffer.
+ #
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
while rd:
data.append(rd)
self._filepos += len(rd)
- rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
- return b''.join(data)
+ rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
+ return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
else:
- data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
self._filepos += len(data)
return data
@_FileLikeObjectBase._before_close
@retry_method
- def readfrom(self, offset, size, num_retries=None):
+ def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
"""Read up to `size` bytes from the stream, starting at the specified file offset.
This method does not change the file position.
+
+ Returns a `bytes` object, unless `return_memoryview` is True,
+ in which case it returns a memory view, which may avoid an
+ unnecessary data copy in some situations.
+
"""
- return self.arvadosfile.readfrom(offset, size, num_retries)
+ return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
def flush(self):
pass
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 9e6bd06071..cf8dbee1ae 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -341,7 +341,7 @@ class RichCollectionBase(CollectionBase):
self,
path: str,
mode: str="r",
- encoding: Optional[str]=None,
+ encoding: Optional[str]=None
) -> IO:
"""Open a file-like object within the collection
@@ -361,6 +361,7 @@ class RichCollectionBase(CollectionBase):
* encoding: str | None --- The text encoding of the file. Only used
when the file is opened in text mode. The default is
platform-dependent.
+
"""
if not re.search(r'^[rwa][bt]?\+?$', mode):
raise errors.ArgumentError("Invalid mode {!r}".format(mode))
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 9279f7d99d..deb210d405 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -63,7 +63,7 @@ class FuseArvadosFile(File):
def readfrom(self, off, size, num_retries=0):
with llfuse.lock_released:
- return self.arvfile.readfrom(off, size, num_retries, exact=True)
+ return self.arvfile.readfrom(off, size, num_retries, exact=True, return_memoryview=True)
def writeto(self, off, buf, num_retries=0):
with llfuse.lock_released:
commit 78c2cfac3a5de71aa5f83ae815f462990454214c
Author: Tom Clegg <tom at curii.com>
Date: Tue Apr 23 11:36:26 2024 -0400
Merge branch '21696-slow-propfind'
fixes #21696
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
index 389fe4e484..1e9616c428 100644
--- a/sdk/go/arvados/collection.go
+++ b/sdk/go/arvados/collection.go
@@ -104,28 +104,57 @@ type CollectionList struct {
Limit int `json:"limit"`
}
-var (
- blkRe = regexp.MustCompile(`^ [0-9a-f]{32}\+\d+`)
- tokRe = regexp.MustCompile(` ?[^ ]*`)
-)
-
// PortableDataHash computes the portable data hash of the given
// manifest.
func PortableDataHash(mt string) string {
+ // To calculate the PDH, we write the manifest to an md5 hash
+ // func, except we skip the "extra" part of block tokens that
+ // look like "abcdef0123456789abcdef0123456789+12345+extra".
+ //
+ // This code is simplified by the facts that (A) all block
+ // tokens -- even the first and last in a stream -- are
+ // preceded and followed by a space character; and (B) all
+ // non-block tokens either start with '.' or contain ':'.
+ //
+ // A regexp-based approach (like the one this replaced) would
+ // be more readable, but very slow.
h := md5.New()
size := 0
- _ = tokRe.ReplaceAllFunc([]byte(mt), func(tok []byte) []byte {
- if m := blkRe.Find(tok); m != nil {
- // write hash+size, ignore remaining block hints
- tok = m
+ todo := []byte(mt)
+ for len(todo) > 0 {
+ // sp is the end of the current token (note that if
+ // the current token is the last file token in a
+ // stream, we'll also include the \n and the dirname
+ // token on the next line, which is perfectly fine for
+ // our purposes).
+ sp := bytes.IndexByte(todo, ' ')
+ if sp < 0 {
+ // Last token of the manifest, which is never
+ // a block token.
+ n, _ := h.Write(todo)
+ size += n
+ break
}
- n, err := h.Write(tok)
- if err != nil {
- panic(err)
+ if sp >= 34 && todo[32] == '+' && bytes.IndexByte(todo[:32], ':') == -1 && todo[0] != '.' {
+ // todo[:sp] is a block token.
+ sizeend := bytes.IndexByte(todo[33:sp], '+')
+ if sizeend < 0 {
+ // "hash+size"
+ sizeend = sp
+ } else {
+ // "hash+size+extra"
+ sizeend += 33
+ }
+ n, _ := h.Write(todo[:sizeend])
+ h.Write([]byte{' '})
+ size += n + 1
+ } else {
+ // todo[:sp] is not a block token.
+ n, _ := h.Write(todo[:sp+1])
+ size += n
}
- size += n
- return nil
- })
+ todo = todo[sp+1:]
+ }
return fmt.Sprintf("%x+%d", h.Sum(nil), size)
}
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 052cc1aa37..101fade74b 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -1358,6 +1358,10 @@ func (dn *dirnode) loadManifest(txt string) error {
}
streams = streams[:len(streams)-1]
segments := []storedSegment{}
+ // streamoffset[n] is the position in the stream of the nth
+ // block, i.e., ∑ segments[j].size ∀ 0≤j<n. We ensure
+ // len(streamoffset) == len(segments) + 1.
+ streamoffset := []int64{0}
// To reduce allocs, we reuse a single "pathparts" slice
// (pre-split on "/" separators) for the duration of this
// func.
@@ -1385,10 +1389,11 @@ func (dn *dirnode) loadManifest(txt string) error {
}
for i, stream := range streams {
lineno := i + 1
+ fnodeCache := make(map[string]*filenode)
var anyFileTokens bool
- var pos int64
var segIdx int
segments = segments[:0]
+ streamoffset = streamoffset[:1]
pathparts = nil
streamparts := 0
for i, token := range bytes.Split(stream, []byte{' '}) {
@@ -1408,6 +1413,7 @@ func (dn *dirnode) loadManifest(txt string) error {
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
+ streamoffset = append(streamoffset, streamoffset[len(segments)]+int64(length))
segments = append(segments, storedSegment{
locator: string(token),
size: int(length),
@@ -1431,49 +1437,64 @@ func (dn *dirnode) loadManifest(txt string) error {
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- if !bytes.ContainsAny(toks[2], `\/`) {
- // optimization for a common case
- pathparts = append(pathparts[:streamparts], string(toks[2]))
- } else {
- pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ fnode, cached := fnodeCache[string(toks[2])]
+ if !cached {
+ if !bytes.ContainsAny(toks[2], `\/`) {
+ // optimization for a common case
+ pathparts = append(pathparts[:streamparts], string(toks[2]))
+ } else {
+ pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ }
+ fnode, err = dn.createFileAndParents(pathparts)
+ if err != nil {
+ return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
+ }
+ fnodeCache[string(toks[2])] = fnode
}
- fnode, err := dn.createFileAndParents(pathparts)
- if fnode == nil && err == nil && length == 0 {
+ if fnode == nil {
+ // name matches an existing directory
+ if length != 0 {
+ return fmt.Errorf("line %d: cannot use name %q with length %d: is a directory", lineno, toks[2], length)
+ }
// Special case: an empty file used as
// a marker to preserve an otherwise
// empty directory in a manifest.
continue
}
- if err != nil || (fnode == nil && length != 0) {
- return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
- }
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
// corresponding storedSegments to the filenode
- if pos > offset {
- // Can't continue where we left off.
- // TODO: binary search instead of
- // rewinding all the way (but this
- // situation might be rare anyway)
- segIdx, pos = 0, 0
+ if segIdx < len(segments) && streamoffset[segIdx] <= offset && streamoffset[segIdx+1] > offset {
+ // common case with an easy
+ // optimization: start where the
+ // previous segment ended
+ } else if guess := int(offset >> 26); guess >= 0 && guess < len(segments) && streamoffset[guess] <= offset && streamoffset[guess+1] > offset {
+ // another common case with an easy
+ // optimization: all blocks are 64 MiB
+ // (or close enough)
+ segIdx = guess
+ } else {
+ // general case
+ segIdx = sort.Search(len(segments), func(i int) bool {
+ return streamoffset[i+1] > offset
+ })
}
for ; segIdx < len(segments); segIdx++ {
- seg := segments[segIdx]
- next := pos + int64(seg.Len())
- if next <= offset || seg.Len() == 0 {
- pos = next
- continue
- }
- if pos >= offset+length {
+ blkStart := streamoffset[segIdx]
+ if blkStart >= offset+length {
break
}
+ seg := &segments[segIdx]
+ if seg.size == 0 {
+ continue
+ }
var blkOff int
- if pos < offset {
- blkOff = int(offset - pos)
+ if blkStart < offset {
+ blkOff = int(offset - blkStart)
}
- blkLen := seg.Len() - blkOff
- if pos+int64(blkOff+blkLen) > offset+length {
- blkLen = int(offset + length - pos - int64(blkOff))
+ blkLen := seg.size - blkOff
+ if blkStart+int64(seg.size) > offset+length {
+ blkLen = int(offset + length - blkStart - int64(blkOff))
}
fnode.appendSegment(storedSegment{
kc: dn.fs,
@@ -1482,14 +1503,9 @@ func (dn *dirnode) loadManifest(txt string) error {
offset: blkOff,
length: blkLen,
})
- if next > offset+length {
- break
- } else {
- pos = next
- }
}
- if segIdx == len(segments) && pos < offset+length {
- return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
+ if segIdx == len(segments) && streamoffset[segIdx] < offset+length {
+ return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, streamoffset[segIdx], token)
}
}
if !anyFileTokens {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index a29371b76c..b57f9aa30f 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1639,29 +1639,71 @@ type CollectionFSUnitSuite struct{}
var _ = check.Suite(&CollectionFSUnitSuite{})
// expect ~2 seconds to load a manifest with 256K files
-func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
+func (s *CollectionFSUnitSuite) TestLargeManifest_ManyFiles(c *check.C) {
if testing.Short() {
c.Skip("slow")
}
+ s.testLargeManifest(c, 512, 512, 1, 0)
+}
- const (
- dirCount = 512
- fileCount = 512
- )
+func (s *CollectionFSUnitSuite) TestLargeManifest_LargeFiles(c *check.C) {
+ if testing.Short() {
+ c.Skip("slow")
+ }
+ s.testLargeManifest(c, 1, 800, 1000, 0)
+}
+func (s *CollectionFSUnitSuite) TestLargeManifest_InterleavedFiles(c *check.C) {
+ if testing.Short() {
+ c.Skip("slow")
+ }
+ // Timing figures here are from a dev host, (0)->(1)->(2)->(3)
+ // (0) no optimizations (main branch commit ea697fb1e8)
+ // (1) resolve streampos->blkidx with binary search
+ // (2) ...and rewrite PortableDataHash() without regexp
+ // (3) ...and use fnodeCache in loadManifest
+ s.testLargeManifest(c, 1, 800, 100, 4<<20) // 127s -> 12s -> 2.5s -> 1.5s
+ s.testLargeManifest(c, 1, 50, 1000, 4<<20) // 44s -> 10s -> 1.5s -> 0.8s
+ s.testLargeManifest(c, 1, 200, 100, 4<<20) // 13s -> 4s -> 0.6s -> 0.3s
+ s.testLargeManifest(c, 1, 200, 150, 4<<20) // 26s -> 4s -> 1s -> 0.5s
+ s.testLargeManifest(c, 1, 200, 200, 4<<20) // 38s -> 6s -> 1.3s -> 0.7s
+ s.testLargeManifest(c, 1, 200, 225, 4<<20) // 46s -> 7s -> 1.5s -> 1s
+ s.testLargeManifest(c, 1, 400, 400, 4<<20) // 477s -> 24s -> 5s -> 3s
+ // s.testLargeManifest(c, 1, 800, 1000, 4<<20) // timeout -> 186s -> 28s -> 17s
+}
+
+func (s *CollectionFSUnitSuite) testLargeManifest(c *check.C, dirCount, filesPerDir, blocksPerFile, interleaveChunk int) {
+ t0 := time.Now()
+ const blksize = 1 << 26
+ c.Logf("%s building manifest with dirCount=%d filesPerDir=%d blocksPerFile=%d", time.Now(), dirCount, filesPerDir, blocksPerFile)
mb := bytes.NewBuffer(make([]byte, 0, 40000000))
+ blkid := 0
for i := 0; i < dirCount; i++ {
fmt.Fprintf(mb, "./dir%d", i)
- for j := 0; j <= fileCount; j++ {
- fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
+ for j := 0; j < filesPerDir; j++ {
+ for k := 0; k < blocksPerFile; k++ {
+ blkid++
+ fmt.Fprintf(mb, " %032x+%d+A%040x@%08x", blkid, blksize, blkid, blkid)
+ }
}
- for j := 0; j < fileCount; j++ {
- fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
+ for j := 0; j < filesPerDir; j++ {
+ if interleaveChunk == 0 {
+ fmt.Fprintf(mb, " %d:%d:dir%d/file%d", (filesPerDir-j-1)*blocksPerFile*blksize, blocksPerFile*blksize, j, j)
+ continue
+ }
+ for todo := int64(blocksPerFile) * int64(blksize); todo > 0; todo -= int64(interleaveChunk) {
+ size := int64(interleaveChunk)
+ if size > todo {
+ size = todo
+ }
+ offset := rand.Int63n(int64(blocksPerFile)*int64(blksize)*int64(filesPerDir) - size)
+ fmt.Fprintf(mb, " %d:%d:dir%d/file%d", offset, size, j, j)
+ }
}
mb.Write([]byte{'\n'})
}
coll := Collection{ManifestText: mb.String()}
- c.Logf("%s built", time.Now())
+ c.Logf("%s built manifest size=%d", time.Now(), mb.Len())
var memstats runtime.MemStats
runtime.ReadMemStats(&memstats)
@@ -1670,17 +1712,28 @@ func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
f, err := coll.FileSystem(NewClientFromEnv(), &keepClientStub{})
c.Check(err, check.IsNil)
c.Logf("%s loaded", time.Now())
- c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
+ c.Check(f.Size(), check.Equals, int64(dirCount*filesPerDir*blocksPerFile*blksize))
+ // Stat() and OpenFile() each file. This mimics the behavior
+ // of webdav propfind, which opens each file even when just
+ // listing directory entries.
for i := 0; i < dirCount; i++ {
- for j := 0; j < fileCount; j++ {
- f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
+ for j := 0; j < filesPerDir; j++ {
+ fnm := fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j)
+ fi, err := f.Stat(fnm)
+ c.Assert(err, check.IsNil)
+ c.Check(fi.IsDir(), check.Equals, false)
+ f, err := f.OpenFile(fnm, os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ f.Close()
}
}
- c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
+ c.Logf("%s OpenFile() x %d", time.Now(), dirCount*filesPerDir)
runtime.ReadMemStats(&memstats)
c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
+ c.Logf("%s MemorySize=%d", time.Now(), f.MemorySize())
+ c.Logf("%s ... test duration %s", time.Now(), time.Now().Sub(t0))
}
// Gocheck boilerplate
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list