[arvados] updated: 2.7.2-5-ge5dad797b0

git repository hosting git at public.arvados.org
Wed May 22 18:18:12 UTC 2024


Summary of changes:
 sdk/go/arvados/collection.go           | 59 +++++++++++++++++------
 sdk/go/arvados/fs_collection.go        | 88 ++++++++++++++++++++--------------
 sdk/go/arvados/fs_collection_test.go   | 81 +++++++++++++++++++++++++------
 sdk/python/arvados/arvfile.py          | 62 +++++++++++++++++-------
 sdk/python/arvados/collection.py       |  3 +-
 services/fuse/arvados_fuse/fusefile.py |  2 +-
 6 files changed, 210 insertions(+), 85 deletions(-)

       via  e5dad797b0dc62ca2af798826d7a2fbb8b7a5ce7 (commit)
       via  78c2cfac3a5de71aa5f83ae815f462990454214c (commit)
      from  1c61a775680970a5ee8b5e8080fa96120113b7cf (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e5dad797b0dc62ca2af798826d7a2fbb8b7a5ce7
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Wed May 22 14:15:46 2024 -0400

    Merge branch '21718-memoryview-readfrom-v2'  refs #21718
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index e0e972b5c1..33246e3fdd 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1048,18 +1048,27 @@ class ArvadosFile(object):
             # size == self.size()
             pass
 
-    def readfrom(self, offset, size, num_retries, exact=False):
+    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
         """Read up to `size` bytes from the file starting at `offset`.
 
-        :exact:
-         If False (default), return less data than requested if the read
-         crosses a block boundary and the next block isn't cached.  If True,
-         only return less data than requested when hitting EOF.
+        Arguments:
+
+        * exact: bool --- If False (default), return less data than
+         requested if the read crosses a block boundary and the next
+         block isn't cached.  If True, only return less data than
+         requested when hitting EOF.
+
+        * return_memoryview: bool --- If False (default) return a
+          `bytes` object, which may entail making a copy in some
+          situations.  If True, return a `memoryview` object which may
+          avoid making a copy, but may be incompatible with code
+          expecting a `bytes` object.
+
         """
 
         with self.lock:
             if size == 0 or offset >= self.size():
-                return b''
+                return memoryview(b'') if return_memoryview else b''
             readsegs = locators_and_ranges(self._segments, offset, size)
 
             prefetch = None
@@ -1099,9 +1108,10 @@ class ArvadosFile(object):
                     locs.add(lr.locator)
 
         if len(data) == 1:
-            return data[0]
+            return data[0] if return_memoryview else data[0].tobytes()
         else:
-            return b''.join(data)
+            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
+
 
     @must_be_writable
     @synchronized
@@ -1266,33 +1276,49 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
     @_FileLikeObjectBase._before_close
     @retry_method
-    def read(self, size=None, num_retries=None):
+    def read(self, size=-1, num_retries=None, return_memoryview=False):
         """Read up to `size` bytes from the file and return the result.
 
-        Starts at the current file position.  If `size` is None, read the
-        entire remainder of the file.
+        Starts at the current file position.  If `size` is negative or None,
+        read the entire remainder of the file.
+
+        Returns None if the file pointer is at the end of the file.
+
+        Returns a `bytes` object, unless `return_memoryview` is True,
+        in which case it returns a memory view, which may avoid an
+        unnecessary data copy in some situations.
+
         """
-        if size is None:
+        if size < 0 or size is None:
             data = []
-            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+            #
+            # specify exact=False, return_memoryview=True here so that we
+            # only copy data once into the final buffer.
+            #
+            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
             while rd:
                 data.append(rd)
                 self._filepos += len(rd)
-                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
-            return b''.join(data)
+                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
+            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
         else:
-            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
+            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
             self._filepos += len(data)
             return data
 
     @_FileLikeObjectBase._before_close
     @retry_method
-    def readfrom(self, offset, size, num_retries=None):
+    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
         """Read up to `size` bytes from the stream, starting at the specified file offset.
 
         This method does not change the file position.
+
+        Returns a `bytes` object, unless `return_memoryview` is True,
+        in which case it returns a memory view, which may avoid an
+        unnecessary data copy in some situations.
+
         """
-        return self.arvadosfile.readfrom(offset, size, num_retries)
+        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
 
     def flush(self):
         pass
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 9e6bd06071..cf8dbee1ae 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -341,7 +341,7 @@ class RichCollectionBase(CollectionBase):
             self,
             path: str,
             mode: str="r",
-            encoding: Optional[str]=None,
+            encoding: Optional[str]=None
     ) -> IO:
         """Open a file-like object within the collection
 
@@ -361,6 +361,7 @@ class RichCollectionBase(CollectionBase):
         * encoding: str | None --- The text encoding of the file. Only used
           when the file is opened in text mode. The default is
           platform-dependent.
+
         """
         if not re.search(r'^[rwa][bt]?\+?$', mode):
             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 9279f7d99d..deb210d405 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -63,7 +63,7 @@ class FuseArvadosFile(File):
 
     def readfrom(self, off, size, num_retries=0):
         with llfuse.lock_released:
-            return self.arvfile.readfrom(off, size, num_retries, exact=True)
+            return self.arvfile.readfrom(off, size, num_retries, exact=True, return_memoryview=True)
 
     def writeto(self, off, buf, num_retries=0):
         with llfuse.lock_released:

commit 78c2cfac3a5de71aa5f83ae815f462990454214c
Author: Tom Clegg <tom at curii.com>
Date:   Tue Apr 23 11:36:26 2024 -0400

    Merge branch '21696-slow-propfind'
    
    fixes #21696
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
index 389fe4e484..1e9616c428 100644
--- a/sdk/go/arvados/collection.go
+++ b/sdk/go/arvados/collection.go
@@ -104,28 +104,57 @@ type CollectionList struct {
 	Limit          int          `json:"limit"`
 }
 
-var (
-	blkRe = regexp.MustCompile(`^ [0-9a-f]{32}\+\d+`)
-	tokRe = regexp.MustCompile(` ?[^ ]*`)
-)
-
 // PortableDataHash computes the portable data hash of the given
 // manifest.
 func PortableDataHash(mt string) string {
+	// To calculate the PDH, we write the manifest to an md5 hash
+	// func, except we skip the "extra" part of block tokens that
+	// look like "abcdef0123456789abcdef0123456789+12345+extra".
+	//
+	// This code is simplified by the facts that (A) all block
+	// tokens -- even the first and last in a stream -- are
+	// preceded and followed by a space character; and (B) all
+	// non-block tokens either start with '.'  or contain ':'.
+	//
+	// A regexp-based approach (like the one this replaced) would
+	// be more readable, but very slow.
 	h := md5.New()
 	size := 0
-	_ = tokRe.ReplaceAllFunc([]byte(mt), func(tok []byte) []byte {
-		if m := blkRe.Find(tok); m != nil {
-			// write hash+size, ignore remaining block hints
-			tok = m
+	todo := []byte(mt)
+	for len(todo) > 0 {
+		// sp is the end of the current token (note that if
+		// the current token is the last file token in a
+		// stream, we'll also include the \n and the dirname
+		// token on the next line, which is perfectly fine for
+		// our purposes).
+		sp := bytes.IndexByte(todo, ' ')
+		if sp < 0 {
+			// Last token of the manifest, which is never
+			// a block token.
+			n, _ := h.Write(todo)
+			size += n
+			break
 		}
-		n, err := h.Write(tok)
-		if err != nil {
-			panic(err)
+		if sp >= 34 && todo[32] == '+' && bytes.IndexByte(todo[:32], ':') == -1 && todo[0] != '.' {
+			// todo[:sp] is a block token.
+			sizeend := bytes.IndexByte(todo[33:sp], '+')
+			if sizeend < 0 {
+				// "hash+size"
+				sizeend = sp
+			} else {
+				// "hash+size+extra"
+				sizeend += 33
+			}
+			n, _ := h.Write(todo[:sizeend])
+			h.Write([]byte{' '})
+			size += n + 1
+		} else {
+			// todo[:sp] is not a block token.
+			n, _ := h.Write(todo[:sp+1])
+			size += n
 		}
-		size += n
-		return nil
-	})
+		todo = todo[sp+1:]
+	}
 	return fmt.Sprintf("%x+%d", h.Sum(nil), size)
 }
 
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 052cc1aa37..101fade74b 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -1358,6 +1358,10 @@ func (dn *dirnode) loadManifest(txt string) error {
 	}
 	streams = streams[:len(streams)-1]
 	segments := []storedSegment{}
+	// streamoffset[n] is the position in the stream of the nth
+	// block, i.e., ∑ segments[j].size ∀ 0≤j<n. We ensure
+	// len(streamoffset) == len(segments) + 1.
+	streamoffset := []int64{0}
 	// To reduce allocs, we reuse a single "pathparts" slice
 	// (pre-split on "/" separators) for the duration of this
 	// func.
@@ -1385,10 +1389,11 @@ func (dn *dirnode) loadManifest(txt string) error {
 	}
 	for i, stream := range streams {
 		lineno := i + 1
+		fnodeCache := make(map[string]*filenode)
 		var anyFileTokens bool
-		var pos int64
 		var segIdx int
 		segments = segments[:0]
+		streamoffset = streamoffset[:1]
 		pathparts = nil
 		streamparts := 0
 		for i, token := range bytes.Split(stream, []byte{' '}) {
@@ -1408,6 +1413,7 @@ func (dn *dirnode) loadManifest(txt string) error {
 				if err != nil || length < 0 {
 					return fmt.Errorf("line %d: bad locator %q", lineno, token)
 				}
+				streamoffset = append(streamoffset, streamoffset[len(segments)]+int64(length))
 				segments = append(segments, storedSegment{
 					locator: string(token),
 					size:    int(length),
@@ -1431,49 +1437,64 @@ func (dn *dirnode) loadManifest(txt string) error {
 			if err != nil || length < 0 {
 				return fmt.Errorf("line %d: bad file segment %q", lineno, token)
 			}
-			if !bytes.ContainsAny(toks[2], `\/`) {
-				// optimization for a common case
-				pathparts = append(pathparts[:streamparts], string(toks[2]))
-			} else {
-				pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+			fnode, cached := fnodeCache[string(toks[2])]
+			if !cached {
+				if !bytes.ContainsAny(toks[2], `\/`) {
+					// optimization for a common case
+					pathparts = append(pathparts[:streamparts], string(toks[2]))
+				} else {
+					pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+				}
+				fnode, err = dn.createFileAndParents(pathparts)
+				if err != nil {
+					return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
+				}
+				fnodeCache[string(toks[2])] = fnode
 			}
-			fnode, err := dn.createFileAndParents(pathparts)
-			if fnode == nil && err == nil && length == 0 {
+			if fnode == nil {
+				// name matches an existing directory
+				if length != 0 {
+					return fmt.Errorf("line %d: cannot use name %q with length %d: is a directory", lineno, toks[2], length)
+				}
 				// Special case: an empty file used as
 				// a marker to preserve an otherwise
 				// empty directory in a manifest.
 				continue
 			}
-			if err != nil || (fnode == nil && length != 0) {
-				return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
-			}
 			// Map the stream offset/range coordinates to
 			// block/offset/range coordinates and add
 			// corresponding storedSegments to the filenode
-			if pos > offset {
-				// Can't continue where we left off.
-				// TODO: binary search instead of
-				// rewinding all the way (but this
-				// situation might be rare anyway)
-				segIdx, pos = 0, 0
+			if segIdx < len(segments) && streamoffset[segIdx] <= offset && streamoffset[segIdx+1] > offset {
+				// common case with an easy
+				// optimization: start where the
+				// previous segment ended
+			} else if guess := int(offset >> 26); guess >= 0 && guess < len(segments) && streamoffset[guess] <= offset && streamoffset[guess+1] > offset {
+				// another common case with an easy
+				// optimization: all blocks are 64 MiB
+				// (or close enough)
+				segIdx = guess
+			} else {
+				// general case
+				segIdx = sort.Search(len(segments), func(i int) bool {
+					return streamoffset[i+1] > offset
+				})
 			}
 			for ; segIdx < len(segments); segIdx++ {
-				seg := segments[segIdx]
-				next := pos + int64(seg.Len())
-				if next <= offset || seg.Len() == 0 {
-					pos = next
-					continue
-				}
-				if pos >= offset+length {
+				blkStart := streamoffset[segIdx]
+				if blkStart >= offset+length {
 					break
 				}
+				seg := &segments[segIdx]
+				if seg.size == 0 {
+					continue
+				}
 				var blkOff int
-				if pos < offset {
-					blkOff = int(offset - pos)
+				if blkStart < offset {
+					blkOff = int(offset - blkStart)
 				}
-				blkLen := seg.Len() - blkOff
-				if pos+int64(blkOff+blkLen) > offset+length {
-					blkLen = int(offset + length - pos - int64(blkOff))
+				blkLen := seg.size - blkOff
+				if blkStart+int64(seg.size) > offset+length {
+					blkLen = int(offset + length - blkStart - int64(blkOff))
 				}
 				fnode.appendSegment(storedSegment{
 					kc:      dn.fs,
@@ -1482,14 +1503,9 @@ func (dn *dirnode) loadManifest(txt string) error {
 					offset:  blkOff,
 					length:  blkLen,
 				})
-				if next > offset+length {
-					break
-				} else {
-					pos = next
-				}
 			}
-			if segIdx == len(segments) && pos < offset+length {
-				return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
+			if segIdx == len(segments) && streamoffset[segIdx] < offset+length {
+				return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, streamoffset[segIdx], token)
 			}
 		}
 		if !anyFileTokens {
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index a29371b76c..b57f9aa30f 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -1639,29 +1639,71 @@ type CollectionFSUnitSuite struct{}
 var _ = check.Suite(&CollectionFSUnitSuite{})
 
 // expect ~2 seconds to load a manifest with 256K files
-func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
+func (s *CollectionFSUnitSuite) TestLargeManifest_ManyFiles(c *check.C) {
 	if testing.Short() {
 		c.Skip("slow")
 	}
+	s.testLargeManifest(c, 512, 512, 1, 0)
+}
 
-	const (
-		dirCount  = 512
-		fileCount = 512
-	)
+func (s *CollectionFSUnitSuite) TestLargeManifest_LargeFiles(c *check.C) {
+	if testing.Short() {
+		c.Skip("slow")
+	}
+	s.testLargeManifest(c, 1, 800, 1000, 0)
+}
 
+func (s *CollectionFSUnitSuite) TestLargeManifest_InterleavedFiles(c *check.C) {
+	if testing.Short() {
+		c.Skip("slow")
+	}
+	// Timing figures here are from a dev host, (0)->(1)->(2)->(3)
+	// (0) no optimizations (main branch commit ea697fb1e8)
+	// (1) resolve streampos->blkidx with binary search
+	// (2) ...and rewrite PortableDataHash() without regexp
+	// (3) ...and use fnodeCache in loadManifest
+	s.testLargeManifest(c, 1, 800, 100, 4<<20) // 127s    -> 12s  -> 2.5s -> 1.5s
+	s.testLargeManifest(c, 1, 50, 1000, 4<<20) // 44s     -> 10s  -> 1.5s -> 0.8s
+	s.testLargeManifest(c, 1, 200, 100, 4<<20) // 13s     -> 4s   -> 0.6s -> 0.3s
+	s.testLargeManifest(c, 1, 200, 150, 4<<20) // 26s     -> 4s   -> 1s   -> 0.5s
+	s.testLargeManifest(c, 1, 200, 200, 4<<20) // 38s     -> 6s   -> 1.3s -> 0.7s
+	s.testLargeManifest(c, 1, 200, 225, 4<<20) // 46s     -> 7s   -> 1.5s -> 1s
+	s.testLargeManifest(c, 1, 400, 400, 4<<20) // 477s    -> 24s  -> 5s   -> 3s
+	// s.testLargeManifest(c, 1, 800, 1000, 4<<20) // timeout -> 186s -> 28s  -> 17s
+}
+
+func (s *CollectionFSUnitSuite) testLargeManifest(c *check.C, dirCount, filesPerDir, blocksPerFile, interleaveChunk int) {
+	t0 := time.Now()
+	const blksize = 1 << 26
+	c.Logf("%s building manifest with dirCount=%d filesPerDir=%d blocksPerFile=%d", time.Now(), dirCount, filesPerDir, blocksPerFile)
 	mb := bytes.NewBuffer(make([]byte, 0, 40000000))
+	blkid := 0
 	for i := 0; i < dirCount; i++ {
 		fmt.Fprintf(mb, "./dir%d", i)
-		for j := 0; j <= fileCount; j++ {
-			fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
+		for j := 0; j < filesPerDir; j++ {
+			for k := 0; k < blocksPerFile; k++ {
+				blkid++
+				fmt.Fprintf(mb, " %032x+%d+A%040x@%08x", blkid, blksize, blkid, blkid)
+			}
 		}
-		for j := 0; j < fileCount; j++ {
-			fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
+		for j := 0; j < filesPerDir; j++ {
+			if interleaveChunk == 0 {
+				fmt.Fprintf(mb, " %d:%d:dir%d/file%d", (filesPerDir-j-1)*blocksPerFile*blksize, blocksPerFile*blksize, j, j)
+				continue
+			}
+			for todo := int64(blocksPerFile) * int64(blksize); todo > 0; todo -= int64(interleaveChunk) {
+				size := int64(interleaveChunk)
+				if size > todo {
+					size = todo
+				}
+				offset := rand.Int63n(int64(blocksPerFile)*int64(blksize)*int64(filesPerDir) - size)
+				fmt.Fprintf(mb, " %d:%d:dir%d/file%d", offset, size, j, j)
+			}
 		}
 		mb.Write([]byte{'\n'})
 	}
 	coll := Collection{ManifestText: mb.String()}
-	c.Logf("%s built", time.Now())
+	c.Logf("%s built manifest size=%d", time.Now(), mb.Len())
 
 	var memstats runtime.MemStats
 	runtime.ReadMemStats(&memstats)
@@ -1670,17 +1712,28 @@ func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
 	f, err := coll.FileSystem(NewClientFromEnv(), &keepClientStub{})
 	c.Check(err, check.IsNil)
 	c.Logf("%s loaded", time.Now())
-	c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
+	c.Check(f.Size(), check.Equals, int64(dirCount*filesPerDir*blocksPerFile*blksize))
 
+	// Stat() and OpenFile() each file. This mimics the behavior
+	// of webdav propfind, which opens each file even when just
+	// listing directory entries.
 	for i := 0; i < dirCount; i++ {
-		for j := 0; j < fileCount; j++ {
-			f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
+		for j := 0; j < filesPerDir; j++ {
+			fnm := fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j)
+			fi, err := f.Stat(fnm)
+			c.Assert(err, check.IsNil)
+			c.Check(fi.IsDir(), check.Equals, false)
+			f, err := f.OpenFile(fnm, os.O_RDONLY, 0)
+			c.Assert(err, check.IsNil)
+			f.Close()
 		}
 	}
-	c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
+	c.Logf("%s OpenFile() x %d", time.Now(), dirCount*filesPerDir)
 
 	runtime.ReadMemStats(&memstats)
 	c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
+	c.Logf("%s MemorySize=%d", time.Now(), f.MemorySize())
+	c.Logf("%s ... test duration %s", time.Now(), time.Now().Sub(t0))
 }
 
 // Gocheck boilerplate

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list