[arvados] updated: 2.7.0-6268-gbd04725444

git repository hosting git at public.arvados.org
Tue Apr 2 01:05:08 UTC 2024


Summary of changes:
 sdk/python/arvados/arvfile.py         |  19 ++++--
 sdk/python/arvados/diskcache.py       | 116 +++++++++++++++++-----------------
 sdk/python/arvados/keep.py            |  53 +++++++---------
 services/fuse/arvados_fuse/command.py |  12 ++--
 4 files changed, 97 insertions(+), 103 deletions(-)

       via  bd04725444b7ab15ea81647c519a580ca50a94de (commit)
       via  743b9941ce35d8877365742700c9b4c1eded0968 (commit)
       via  b91d06bf3ede4b9afa5a74070a4f8ca95d16f629 (commit)
      from  cc3f89292c0136ce5d9e56506f82ea743c59fff8 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit bd04725444b7ab15ea81647c519a580ca50a94de
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Apr 1 20:32:49 2024 -0400

    21639: Note about 0 prefetch threads
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 29ace2e52e..4c8ae899e1 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -482,12 +482,11 @@ class Mount(object):
                                                       disk_cache=self.args.disk_cache,
                                                       disk_cache_dir=self.args.disk_cache_dir)
 
-            # If there's too many prefetch threads and you
-            # max out the CPU, delivering data to the FUSE
-            # layer actually ends up being slower.
-            # Experimentally, capping 7 threads seems to
-            # be a sweet spot.
-            #prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+            # Profiling indicates that prefetching has more of a
+            # negative impact on the read() fast path (by requiring it
+            # to do more work and take additional locks) than benefit.
+            # Also, the kernel does some readahead itself, which has a
+            # similar effect.
             prefetch_threads = 0
 
             self.api = arvados.safeapi.ThreadSafeApiCache(

commit 743b9941ce35d8877365742700c9b4c1eded0968
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Apr 1 20:17:22 2024 -0400

    21639: Keep a running total instead of recomputing total
    
    From profiling, it turns out just summing the block sizes to get the
    cache usage was a significant drag on the read() fast path.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 4949574f2b..b0ae364c88 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -40,18 +40,18 @@ class DiskCacheSlot(object):
             if value is None:
                 self.content = None
                 self.ready.set()
-                return
+                return False
 
             if len(value) == 0:
                 # Can't mmap a 0 length file
                 self.content = b''
                 self.ready.set()
-                return
+                return True
 
             if self.content is not None:
                 # Has been set already
                 self.ready.set()
-                return
+                return False
 
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             os.makedirs(blockdir, mode=0o700, exist_ok=True)
@@ -74,6 +74,7 @@ class DiskCacheSlot(object):
             self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
             # only set the event when mmap is successful
             self.ready.set()
+            return True
         finally:
             if tmpfile is not None:
                 # If the tempfile hasn't been renamed on disk yet, try to delete it.
@@ -81,6 +82,7 @@ class DiskCacheSlot(object):
                     os.remove(tmpfile)
                 except:
                     pass
+        return False
 
     def size(self):
         if self.content is None:
@@ -96,65 +98,61 @@ class DiskCacheSlot(object):
             return len(self.content)
 
     def evict(self):
-        if self.content is not None and len(self.content) > 0:
-            # The mmap region might be in use when we decided to evict
-            # it.  This can happen if the cache is too small.
-            #
-            # If we call close() now, it'll throw an error if
-            # something tries to access it.
-            #
-            # However, we don't need to explicitly call mmap.close()
-            #
-            # I confirmed in mmapmodule.c that that both close
-            # and deallocate do the same thing:
+        if self.content is None or len(self.content) == 0:
+            return
+
+        # The mmap region might be in use when we decided to evict
+        # it.  This can happen if the cache is too small.
+        #
+        # If we call close() now, it'll throw an error if
+        # something tries to access it.
+        #
+        # However, we don't need to explicitly call mmap.close()
+        #
+        # I confirmed in mmapmodule.c that that both close
+        # and deallocate do the same thing:
+        #
+        # a) close the file descriptor
+        # b) unmap the memory range
+        #
+        # So we can forget it in the cache and delete the file on
+        # disk, and it will tear it down after any other
+        # lingering Python references to the mapped memory are
+        # gone.
+
+        blockdir = os.path.join(self.cachedir, self.locator[0:3])
+        final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+        try:
+            fcntl.flock(self.filehandle, fcntl.LOCK_UN)
+
+            # try to get an exclusive lock, this ensures other
+            # processes are not using the block.  It is
+            # nonblocking and will throw an exception if we
+            # can't get it, which is fine because that means
+            # we just won't try to delete it.
             #
-            # a) close the file descriptor
-            # b) unmap the memory range
+            # I should note here, the file locking is not
+            # strictly necessary, we could just remove it and
+            # the kernel would ensure that the underlying
+            # inode remains available as long as other
+            # processes still have the file open.  However, if
+            # you have multiple processes sharing the cache
+            # and deleting each other's files, you'll end up
+            # with a bunch of ghost files that don't show up
+            # in the file system but are still taking up
+            # space, which isn't particularly user friendly.
+            # The locking strategy ensures that cache blocks
+            # in use remain visible.
             #
-            # So we can forget it in the cache and delete the file on
-            # disk, and it will tear it down after any other
-            # lingering Python references to the mapped memory are
-            # gone.
+            fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
 
-            blockdir = os.path.join(self.cachedir, self.locator[0:3])
-            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
-            try:
-                fcntl.flock(self.filehandle, fcntl.LOCK_UN)
-
-                # try to get an exclusive lock, this ensures other
-                # processes are not using the block.  It is
-                # nonblocking and will throw an exception if we
-                # can't get it, which is fine because that means
-                # we just won't try to delete it.
-                #
-                # I should note here, the file locking is not
-                # strictly necessary, we could just remove it and
-                # the kernel would ensure that the underlying
-                # inode remains available as long as other
-                # processes still have the file open.  However, if
-                # you have multiple processes sharing the cache
-                # and deleting each other's files, you'll end up
-                # with a bunch of ghost files that don't show up
-                # in the file system but are still taking up
-                # space, which isn't particularly user friendly.
-                # The locking strategy ensures that cache blocks
-                # in use remain visible.
-                #
-                fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
-
-                os.remove(final)
-                return True
-            except OSError:
-                pass
-            finally:
-                self.filehandle = None
-                self.linger = weakref.ref(self.content)
-                self.content = None
-        return False
-
-    def gone(self):
-        # Test if an evicted object is lingering
-        return self.content is None and (self.linger is None or self.linger() is None)
+            os.remove(final)
+            return True
+        except OSError:
+            pass
+        finally:
+            self.filehandle = None
+            self.content = None
 
     @staticmethod
     def get_from_disk(locator, cachedir):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 6b34a1f933..94f8abc73f 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -233,11 +233,13 @@ class KeepBlockCache(object):
 
         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 
+        self.cache_total = 0
         if self._disk_cache:
             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            for slot in self._cache.values():
+                self.cache_total += slot.size()
             self.cap_cache()
 
-
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
 
@@ -251,8 +253,11 @@ class KeepBlockCache(object):
             return self.content
 
         def set(self, value):
+            if self.content is not None:
+                return False
             self.content = value
             self.ready.set()
+            return True
 
         def size(self):
             if self.content is None:
@@ -262,51 +267,25 @@ class KeepBlockCache(object):
 
         def evict(self):
             self.content = None
-            return self.gone()
 
-        def gone(self):
-            return (self.content is None)
 
     def _resize_cache(self, cache_max, max_slots):
         # Try and make sure the contents of the cache do not exceed
         # the supplied maximums.
 
-        sm = 0
-        for slot in self._cache.values():
-            sm += slot.size()
-
-        if sm <= cache_max and len(self._cache) <= max_slots:
+        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
             return
 
         _evict_candidates = collections.deque(self._cache.values())
-        while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+        while len(_evict_candidates) > 0 and (self.cache_total > cache_max or len(self._cache) > max_slots):
             slot = _evict_candidates.popleft()
             if not slot.ready.is_set():
                 continue
 
-            if slot.content is None:
-                # error
-                del self._cache[slot.locator]
-                continue
-
             sz = slot.size()
-
-            # If evict returns false it means the
-            # underlying disk cache couldn't lock the file
-            # for deletion because another process was using
-            # it. Don't count it as reducing the amount
-            # of data in the cache, find something else to
-            # throw out.
-            if slot.evict():
-                sm -= sz
-
-            # check to make sure the underlying data is gone
-            if slot.gone():
-                # either way we forget about it.  either the
-                # other process will delete it, or if we need
-                # it again and it is still there, we'll find
-                # it on disk.
-                del self._cache[slot.locator]
+            slot.evict()
+            self.cache_total -= sz
+            del self._cache[slot.locator]
 
 
     def cap_cache(self):
@@ -329,6 +308,7 @@ class KeepBlockCache(object):
             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
             if n is not None:
                 self._cache[n.locator] = n
+                self.cache_total += n.size()
                 return n
         return None
 
@@ -363,7 +343,8 @@ class KeepBlockCache(object):
 
     def set(self, slot, blob):
         try:
-            slot.set(blob)
+            if slot.set(blob):
+                self.cache_total += slot.size()
             return
         except OSError as e:
             if e.errno == errno.ENOMEM:
@@ -391,7 +372,8 @@ class KeepBlockCache(object):
             # exception handler adjusts limits downward in some cases
             # to free up resources, which would make the operation
             # succeed.
-            slot.set(blob)
+            if slot.set(blob):
+                self.cache_total += slot.size()
         except Exception as e:
             # It failed again.  Give up.
             slot.set(None)

commit b91d06bf3ede4b9afa5a74070a4f8ca95d16f629
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Apr 1 15:58:06 2024 -0400

    21639: Improve critical path of read() from cache
    
    * Don't use tobytes(), it makes a copy, and it should be be zero-copy.
    * Prefetching adds a lot of overhead.  Don't do it.
    * Don't use a list comprehension to calculate cache size
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 4b95835aac..0cc7d25a33 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1060,7 +1060,8 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
+            if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+                prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
 
         locs = set()
         data = []
@@ -1068,17 +1069,21 @@ class ArvadosFile(object):
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
                 blockview = memoryview(block)
-                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
                 locs.add(lr.locator)
             else:
                 break
 
-        for lr in prefetch:
-            if lr.locator not in locs:
-                self.parent._my_block_manager().block_prefetch(lr.locator)
-                locs.add(lr.locator)
+        if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+            for lr in prefetch:
+                if lr.locator not in locs:
+                    self.parent._my_block_manager().block_prefetch(lr.locator)
+                    locs.add(lr.locator)
 
-        return b''.join(data)
+        if len(data) == 1:
+            return data[0]
+        else:
+            return b''.join(data)
 
     @must_be_writable
     @synchronized
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 1d0fc5f159..6b34a1f933 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -271,8 +271,14 @@ class KeepBlockCache(object):
         # Try and make sure the contents of the cache do not exceed
         # the supplied maximums.
 
+        sm = 0
+        for slot in self._cache.values():
+            sm += slot.size()
+
+        if sm <= cache_max and len(self._cache) <= max_slots:
+            return
+
         _evict_candidates = collections.deque(self._cache.values())
-        sm = sum([slot.size() for slot in _evict_candidates])
         while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
             slot = _evict_candidates.popleft()
             if not slot.ready.is_set():
@@ -926,7 +932,10 @@ class KeepClient(object):
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
-        self.num_prefetch_threads = num_prefetch_threads or 2
+        if num_prefetch_threads is not None:
+            self.num_prefetch_threads = num_prefetch_threads
+        else:
+            self.num_prefetch_threads = 2
         self._prefetch_queue = None
         self._prefetch_threads = None
 
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 719ec7ee95..29ace2e52e 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -487,7 +487,8 @@ class Mount(object):
             # layer actually ends up being slower.
             # Experimentally, capping 7 threads seems to
             # be a sweet spot.
-            prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+            #prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+            prefetch_threads = 0
 
             self.api = arvados.safeapi.ThreadSafeApiCache(
                 apiconfig=arvados.config.settings(),

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list