[arvados] updated: 2.7.0-6268-gbd04725444
git repository hosting
git at public.arvados.org
Tue Apr 2 01:05:08 UTC 2024
Summary of changes:
sdk/python/arvados/arvfile.py | 19 ++++--
sdk/python/arvados/diskcache.py | 116 +++++++++++++++++-----------------
sdk/python/arvados/keep.py | 53 +++++++---------
services/fuse/arvados_fuse/command.py | 12 ++--
4 files changed, 97 insertions(+), 103 deletions(-)
via bd04725444b7ab15ea81647c519a580ca50a94de (commit)
via 743b9941ce35d8877365742700c9b4c1eded0968 (commit)
via b91d06bf3ede4b9afa5a74070a4f8ca95d16f629 (commit)
from cc3f89292c0136ce5d9e56506f82ea743c59fff8 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit bd04725444b7ab15ea81647c519a580ca50a94de
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Apr 1 20:32:49 2024 -0400
21639: Note about 0 prefetch threads
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 29ace2e52e..4c8ae899e1 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -482,12 +482,11 @@ class Mount(object):
disk_cache=self.args.disk_cache,
disk_cache_dir=self.args.disk_cache_dir)
- # If there's too many prefetch threads and you
- # max out the CPU, delivering data to the FUSE
- # layer actually ends up being slower.
- # Experimentally, capping 7 threads seems to
- # be a sweet spot.
- #prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+ # Profiling indicates that prefetching has more of a
+ # negative impact on the read() fast path (by requiring it
+ # to do more work and take additional locks) than benefit.
+ # Also, the kernel does some readahead itself, which has a
+ # similar effect.
prefetch_threads = 0
self.api = arvados.safeapi.ThreadSafeApiCache(
commit 743b9941ce35d8877365742700c9b4c1eded0968
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Apr 1 20:17:22 2024 -0400
21639: Keep a running total instead of recomputing total
From profiling, it turns out just summing the block sizes to get the
cache usage was a significant drag on the read() fast path.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 4949574f2b..b0ae364c88 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -40,18 +40,18 @@ class DiskCacheSlot(object):
if value is None:
self.content = None
self.ready.set()
- return
+ return False
if len(value) == 0:
# Can't mmap a 0 length file
self.content = b''
self.ready.set()
- return
+ return True
if self.content is not None:
# Has been set already
self.ready.set()
- return
+ return False
blockdir = os.path.join(self.cachedir, self.locator[0:3])
os.makedirs(blockdir, mode=0o700, exist_ok=True)
@@ -74,6 +74,7 @@ class DiskCacheSlot(object):
self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
# only set the event when mmap is successful
self.ready.set()
+ return True
finally:
if tmpfile is not None:
# If the tempfile hasn't been renamed on disk yet, try to delete it.
@@ -81,6 +82,7 @@ class DiskCacheSlot(object):
os.remove(tmpfile)
except:
pass
+ return False
def size(self):
if self.content is None:
@@ -96,65 +98,61 @@ class DiskCacheSlot(object):
return len(self.content)
def evict(self):
- if self.content is not None and len(self.content) > 0:
- # The mmap region might be in use when we decided to evict
- # it. This can happen if the cache is too small.
- #
- # If we call close() now, it'll throw an error if
- # something tries to access it.
- #
- # However, we don't need to explicitly call mmap.close()
- #
- # I confirmed in mmapmodule.c that that both close
- # and deallocate do the same thing:
+ if self.content is None or len(self.content) == 0:
+ return
+
+ # The mmap region might be in use when we decided to evict
+ # it. This can happen if the cache is too small.
+ #
+ # If we call close() now, it'll throw an error if
+ # something tries to access it.
+ #
+ # However, we don't need to explicitly call mmap.close()
+ #
+ # I confirmed in mmapmodule.c that that both close
+ # and deallocate do the same thing:
+ #
+ # a) close the file descriptor
+ # b) unmap the memory range
+ #
+ # So we can forget it in the cache and delete the file on
+ # disk, and it will tear it down after any other
+ # lingering Python references to the mapped memory are
+ # gone.
+
+ blockdir = os.path.join(self.cachedir, self.locator[0:3])
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+ try:
+ fcntl.flock(self.filehandle, fcntl.LOCK_UN)
+
+ # try to get an exclusive lock, this ensures other
+ # processes are not using the block. It is
+ # nonblocking and will throw an exception if we
+ # can't get it, which is fine because that means
+ # we just won't try to delete it.
#
- # a) close the file descriptor
- # b) unmap the memory range
+ # I should note here, the file locking is not
+ # strictly necessary, we could just remove it and
+ # the kernel would ensure that the underlying
+ # inode remains available as long as other
+ # processes still have the file open. However, if
+ # you have multiple processes sharing the cache
+ # and deleting each other's files, you'll end up
+ # with a bunch of ghost files that don't show up
+ # in the file system but are still taking up
+ # space, which isn't particularly user friendly.
+ # The locking strategy ensures that cache blocks
+ # in use remain visible.
#
- # So we can forget it in the cache and delete the file on
- # disk, and it will tear it down after any other
- # lingering Python references to the mapped memory are
- # gone.
+ fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
- blockdir = os.path.join(self.cachedir, self.locator[0:3])
- final = os.path.join(blockdir, self.locator) + cacheblock_suffix
- try:
- fcntl.flock(self.filehandle, fcntl.LOCK_UN)
-
- # try to get an exclusive lock, this ensures other
- # processes are not using the block. It is
- # nonblocking and will throw an exception if we
- # can't get it, which is fine because that means
- # we just won't try to delete it.
- #
- # I should note here, the file locking is not
- # strictly necessary, we could just remove it and
- # the kernel would ensure that the underlying
- # inode remains available as long as other
- # processes still have the file open. However, if
- # you have multiple processes sharing the cache
- # and deleting each other's files, you'll end up
- # with a bunch of ghost files that don't show up
- # in the file system but are still taking up
- # space, which isn't particularly user friendly.
- # The locking strategy ensures that cache blocks
- # in use remain visible.
- #
- fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
-
- os.remove(final)
- return True
- except OSError:
- pass
- finally:
- self.filehandle = None
- self.linger = weakref.ref(self.content)
- self.content = None
- return False
-
- def gone(self):
- # Test if an evicted object is lingering
- return self.content is None and (self.linger is None or self.linger() is None)
+ os.remove(final)
+ return True
+ except OSError:
+ pass
+ finally:
+ self.filehandle = None
+ self.content = None
@staticmethod
def get_from_disk(locator, cachedir):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 6b34a1f933..94f8abc73f 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -233,11 +233,13 @@ class KeepBlockCache(object):
self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+ self.cache_total = 0
if self._disk_cache:
self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ for slot in self._cache.values():
+ self.cache_total += slot.size()
self.cap_cache()
-
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
@@ -251,8 +253,11 @@ class KeepBlockCache(object):
return self.content
def set(self, value):
+ if self.content is not None:
+ return False
self.content = value
self.ready.set()
+ return True
def size(self):
if self.content is None:
@@ -262,51 +267,25 @@ class KeepBlockCache(object):
def evict(self):
self.content = None
- return self.gone()
- def gone(self):
- return (self.content is None)
def _resize_cache(self, cache_max, max_slots):
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
- sm = 0
- for slot in self._cache.values():
- sm += slot.size()
-
- if sm <= cache_max and len(self._cache) <= max_slots:
+ if self.cache_total <= cache_max and len(self._cache) <= max_slots:
return
_evict_candidates = collections.deque(self._cache.values())
- while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+ while len(_evict_candidates) > 0 and (self.cache_total > cache_max or len(self._cache) > max_slots):
slot = _evict_candidates.popleft()
if not slot.ready.is_set():
continue
- if slot.content is None:
- # error
- del self._cache[slot.locator]
- continue
-
sz = slot.size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if slot.evict():
- sm -= sz
-
- # check to make sure the underlying data is gone
- if slot.gone():
- # either way we forget about it. either the
- # other process will delete it, or if we need
- # it again and it is still there, we'll find
- # it on disk.
- del self._cache[slot.locator]
+ slot.evict()
+ self.cache_total -= sz
+ del self._cache[slot.locator]
def cap_cache(self):
@@ -329,6 +308,7 @@ class KeepBlockCache(object):
n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
if n is not None:
self._cache[n.locator] = n
+ self.cache_total += n.size()
return n
return None
@@ -363,7 +343,8 @@ class KeepBlockCache(object):
def set(self, slot, blob):
try:
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
return
except OSError as e:
if e.errno == errno.ENOMEM:
@@ -391,7 +372,8 @@ class KeepBlockCache(object):
# exception handler adjusts limits downward in some cases
# to free up resources, which would make the operation
# succeed.
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
except Exception as e:
# It failed again. Give up.
slot.set(None)
commit b91d06bf3ede4b9afa5a74070a4f8ca95d16f629
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Apr 1 15:58:06 2024 -0400
21639: Improve critical path of read() from cache
* Don't use tobytes(), it makes a copy, and it should be be zero-copy.
* Prefetching adds a lot of overhead. Don't do it.
* Don't use a list comprehension to calculate cache size
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 4b95835aac..0cc7d25a33 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1060,7 +1060,8 @@ class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
+ if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
locs = set()
data = []
@@ -1068,17 +1069,21 @@ class ArvadosFile(object):
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
blockview = memoryview(block)
- data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
locs.add(lr.locator)
else:
break
- for lr in prefetch:
- if lr.locator not in locs:
- self.parent._my_block_manager().block_prefetch(lr.locator)
- locs.add(lr.locator)
+ if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+ for lr in prefetch:
+ if lr.locator not in locs:
+ self.parent._my_block_manager().block_prefetch(lr.locator)
+ locs.add(lr.locator)
- return b''.join(data)
+ if len(data) == 1:
+ return data[0]
+ else:
+ return b''.join(data)
@must_be_writable
@synchronized
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 1d0fc5f159..6b34a1f933 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -271,8 +271,14 @@ class KeepBlockCache(object):
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
+ sm = 0
+ for slot in self._cache.values():
+ sm += slot.size()
+
+ if sm <= cache_max and len(self._cache) <= max_slots:
+ return
+
_evict_candidates = collections.deque(self._cache.values())
- sm = sum([slot.size() for slot in _evict_candidates])
while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
slot = _evict_candidates.popleft()
if not slot.ready.is_set():
@@ -926,7 +932,10 @@ class KeepClient(object):
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
- self.num_prefetch_threads = num_prefetch_threads or 2
+ if num_prefetch_threads is not None:
+ self.num_prefetch_threads = num_prefetch_threads
+ else:
+ self.num_prefetch_threads = 2
self._prefetch_queue = None
self._prefetch_threads = None
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 719ec7ee95..29ace2e52e 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -487,7 +487,8 @@ class Mount(object):
# layer actually ends up being slower.
# Experimentally, capping 7 threads seems to
# be a sweet spot.
- prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+ #prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+ prefetch_threads = 0
self.api = arvados.safeapi.ThreadSafeApiCache(
apiconfig=arvados.config.settings(),
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list