[arvados] created: 2.1.0-3178-g9f7c39451
git repository hosting
git at public.arvados.org
Tue Dec 13 22:43:54 UTC 2022
at 9f7c39451c16003c6c6e0fb8de5a990781cb300f (commit)
commit 9f7c39451c16003c6c6e0fb8de5a990781cb300f
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue Dec 13 22:35:44 2022 +0000
19872: Set max_slots lower because mmap uses another FD
- Reduce max slots to 3/8 max fds instead of 1/2 because mmap() uses a
second file descriptor, and we keep the original file descriptor open
for flock()
- Rework how cache slots are allocated to try evicting things _before_
allocating a new cache slot, so the cache should be somewhat better
behaved about staying within its configured limits.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 15afa23a8..f8fca5780 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -12,13 +12,14 @@ import fcntl
import time
import errno
import logging
+import weakref
_logger = logging.getLogger('arvados.keep')
cacheblock_suffix = ".keepcacheblock"
class DiskCacheSlot(object):
- __slots__ = ("locator", "ready", "content", "cachedir", "filehandle")
+ __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")
def __init__(self, locator, cachedir):
self.locator = locator
@@ -26,6 +27,7 @@ class DiskCacheSlot(object):
self.content = None
self.cachedir = cachedir
self.filehandle = None
+ self.linger = None
def get(self):
self.ready.wait()
@@ -81,6 +83,13 @@ class DiskCacheSlot(object):
def size(self):
if self.content is None:
+ if self.linger is not None:
+ # If it is still lingering (object is still accessible
+ # through the weak reference) it is still taking up
+ # space.
+ content = self.linger()
+ if content is not None:
+ return len(content)
return 0
else:
return len(self.content)
@@ -138,8 +147,13 @@ class DiskCacheSlot(object):
pass
finally:
self.filehandle = None
+ self.linger = weakref.ref(self.content)
self.content = None
- return False
+ return False
+
+ def gone(self):
+ # Test if an evicted object is lingering
+ return self.content is None and (self.linger is None or self.linger() is None)
@staticmethod
def get_from_disk(locator, cachedir):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b9e22748c..afb9180e6 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -31,6 +31,7 @@ import resource
from . import timer
import urllib.parse
import traceback
+import weakref
if sys.version_info >= (3, 0):
from io import BytesIO
@@ -185,6 +186,7 @@ class KeepBlockCache(object):
self._max_slots = max_slots
self._disk_cache = disk_cache
self._disk_cache_dir = disk_cache_dir
+ self._cache_updating = threading.Condition(self._cache_lock)
if self._disk_cache and self._disk_cache_dir is None:
self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
@@ -192,10 +194,18 @@ class KeepBlockCache(object):
if self._max_slots == 0:
if self._disk_cache:
- # default max slots to half of maximum file handles
+ # Each block uses two file descriptors, one used to
+ # open it initially and hold the flock(), and a second
+ # hidden one used by mmap().
+ #
+ # Set max slots to 3/8 of maximum file handles. This
+ # means we'll use at most 3/4 of total file handles.
+ #
# NOFILE typically defaults to 1024 on Linux so this
- # will be 512 slots.
- self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+ # is 384 slots (768 file handles), which means we can
+ # cache up to 24 GiB of 64 MiB blocks. This leaves
+ # 256 file handles for sockets and other stuff.
+ self._max_slots = int((resource.getrlimit(resource.RLIMIT_NOFILE)[0] * 3) / 8)
else:
# RAM cache slots
self._max_slots = 512
@@ -246,36 +256,50 @@ class KeepBlockCache(object):
return len(self.content)
def evict(self):
- return True
-
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- with self._cache_lock:
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
- for i in range(len(self._cache)-1, -1, -1):
- # start from the back, find a slot that is a candidate to evict
- if self._cache[i].ready.is_set():
- sz = self._cache[i].size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if self._cache[i].evict():
- sm -= sz
-
+ self.content = None
+ return self.gone()
+
+ def gone(self):
+ return (self.content is None)
+
+ def _resize_cache(self, cache_max, max_slots):
+ # Try and make sure the contents of the cache do not exceed
+ # the supplied maximums.
+
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+ for i in range(len(self._cache)-1, -1, -1):
+ # start from the back, find a slot that is a candidate to evict
+ if self._cache[i].ready.is_set():
+ sz = self._cache[i].size()
+
+ # If evict returns false it means the
+ # underlying disk cache couldn't lock the file
+ # for deletion because another process was using
+ # it. Don't count it as reducing the amount
+ # of data in the cache, find something else to
+ # throw out.
+ if self._cache[i].evict():
+ sm -= sz
+
+ # check to make sure the underlying data is gone
+ if self._cache[i].gone():
# either way we forget about it. either the
# other process will delete it, or if we need
# it again and it is still there, we'll find
# it on disk.
del self._cache[i]
- break
+ break
+
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ with self._cache_updating:
+ self._resize_cache(self.cache_max, self._max_slots)
+ self._cache_updating.notify_all()
def _get(self, locator):
# Test if the locator is already in the cache
@@ -302,12 +326,19 @@ class KeepBlockCache(object):
def reserve_cache(self, locator):
'''Reserve a cache slot for the specified locator,
or return the existing slot.'''
- with self._cache_lock:
+ with self._cache_updating:
n = self._get(locator)
if n:
return n, False
else:
# Add a new cache slot for the locator
+ self._resize_cache(self.cache_max, self._max_slots-1)
+ while len(self._cache) >= self._max_slots:
+ # If there isn't a slot available, need to wait
+ # until some other cache action happens.
+ self._cache_updating.wait()
+ self._resize_cache(self.cache_max, self._max_slots-1)
+
if self._disk_cache:
n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
else:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list