[arvados] created: 2.1.0-3178-g9f7c39451

git repository hosting git at public.arvados.org
Tue Dec 13 22:43:54 UTC 2022


        at  9f7c39451c16003c6c6e0fb8de5a990781cb300f (commit)


commit 9f7c39451c16003c6c6e0fb8de5a990781cb300f
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Tue Dec 13 22:35:44 2022 +0000

    19872: Set max_slots lower because mmap uses another FD
    
    - Reduce max slots to 3/8 max fds instead of 1/2 because mmap() uses a
    second file descriptor, and we keep the original file descriptor open
    for flock()
    
    - Rework how cache slots are allocated to try evicting things _before_
    allocating a new cache slot, so the cache should be somewhat better
    behaved about staying within its configured limits.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 15afa23a8..f8fca5780 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -12,13 +12,14 @@ import fcntl
 import time
 import errno
 import logging
+import weakref
 
 _logger = logging.getLogger('arvados.keep')
 
 cacheblock_suffix = ".keepcacheblock"
 
 class DiskCacheSlot(object):
-    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle")
+    __slots__ = ("locator", "ready", "content", "cachedir", "filehandle", "linger")
 
     def __init__(self, locator, cachedir):
         self.locator = locator
@@ -26,6 +27,7 @@ class DiskCacheSlot(object):
         self.content = None
         self.cachedir = cachedir
         self.filehandle = None
+        self.linger = None
 
     def get(self):
         self.ready.wait()
@@ -81,6 +83,13 @@ class DiskCacheSlot(object):
 
     def size(self):
         if self.content is None:
+            if self.linger is not None:
+                # If it is still lingering (object is still accessible
+                # through the weak reference) it is still taking up
+                # space.
+                content = self.linger()
+                if content is not None:
+                    return len(content)
             return 0
         else:
             return len(self.content)
@@ -138,8 +147,13 @@ class DiskCacheSlot(object):
                 pass
             finally:
                 self.filehandle = None
+                self.linger = weakref.ref(self.content)
                 self.content = None
-            return False
+        return False
+
+    def gone(self):
+        # Test if an evicted object is lingering
+        return self.content is None and (self.linger is None or self.linger() is None)
 
     @staticmethod
     def get_from_disk(locator, cachedir):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b9e22748c..afb9180e6 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -31,6 +31,7 @@ import resource
 from . import timer
 import urllib.parse
 import traceback
+import weakref
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -185,6 +186,7 @@ class KeepBlockCache(object):
         self._max_slots = max_slots
         self._disk_cache = disk_cache
         self._disk_cache_dir = disk_cache_dir
+        self._cache_updating = threading.Condition(self._cache_lock)
 
         if self._disk_cache and self._disk_cache_dir is None:
             self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
@@ -192,10 +194,18 @@ class KeepBlockCache(object):
 
         if self._max_slots == 0:
             if self._disk_cache:
-                # default max slots to half of maximum file handles
+                # Each block uses two file descriptors, one used to
+                # open it initially and hold the flock(), and a second
+                # hidden one used by mmap().
+                #
+                # Set max slots to 3/8 of maximum file handles.  This
+                # means we'll use at most 3/4 of total file handles.
+                #
                 # NOFILE typically defaults to 1024 on Linux so this
-                # will be 512 slots.
-                self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+                # is 384 slots (768 file handles), which means we can
+                # cache up to 24 GiB of 64 MiB blocks.  This leaves
+                # 256 file handles for sockets and other stuff.
+                self._max_slots = int((resource.getrlimit(resource.RLIMIT_NOFILE)[0] * 3) / 8)
             else:
                 # RAM cache slots
                 self._max_slots = 512
@@ -246,36 +256,50 @@ class KeepBlockCache(object):
                 return len(self.content)
 
         def evict(self):
-            return True
-
-    def cap_cache(self):
-        '''Cap the cache size to self.cache_max'''
-        with self._cache_lock:
-            # Select all slots except those where ready.is_set() and content is
-            # None (that means there was an error reading the block).
-            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-            sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
-                for i in range(len(self._cache)-1, -1, -1):
-                    # start from the back, find a slot that is a candidate to evict
-                    if self._cache[i].ready.is_set():
-                        sz = self._cache[i].size()
-
-                        # If evict returns false it means the
-                        # underlying disk cache couldn't lock the file
-                        # for deletion because another process was using
-                        # it. Don't count it as reducing the amount
-                        # of data in the cache, find something else to
-                        # throw out.
-                        if self._cache[i].evict():
-                            sm -= sz
-
+            self.content = None
+            return self.gone()
+
+        def gone(self):
+            return (self.content is None)
+
+    def _resize_cache(self, cache_max, max_slots):
+        # Try and make sure the contents of the cache do not exceed
+        # the supplied maximums.
+
+        # Select all slots except those where ready.is_set() and content is
+        # None (that means there was an error reading the block).
+        self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+        sm = sum([slot.size() for slot in self._cache])
+        while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+            for i in range(len(self._cache)-1, -1, -1):
+                # start from the back, find a slot that is a candidate to evict
+                if self._cache[i].ready.is_set():
+                    sz = self._cache[i].size()
+
+                    # If evict returns false it means the
+                    # underlying disk cache couldn't lock the file
+                    # for deletion because another process was using
+                    # it. Don't count it as reducing the amount
+                    # of data in the cache, find something else to
+                    # throw out.
+                    if self._cache[i].evict():
+                        sm -= sz
+
+                    # check to make sure the underlying data is gone
+                    if self._cache[i].gone():
                         # either way we forget about it.  either the
                         # other process will delete it, or if we need
                         # it again and it is still there, we'll find
                         # it on disk.
                         del self._cache[i]
-                        break
+                    break
+
+
+    def cap_cache(self):
+        '''Cap the cache size to self.cache_max'''
+        with self._cache_updating:
+            self._resize_cache(self.cache_max, self._max_slots)
+            self._cache_updating.notify_all()
 
     def _get(self, locator):
         # Test if the locator is already in the cache
@@ -302,12 +326,19 @@ class KeepBlockCache(object):
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        with self._cache_lock:
+        with self._cache_updating:
             n = self._get(locator)
             if n:
                 return n, False
             else:
                 # Add a new cache slot for the locator
+                self._resize_cache(self.cache_max, self._max_slots-1)
+                while len(self._cache) >= self._max_slots:
+                    # If there isn't a slot available, need to wait
+                    # until some other cache action happens.
+                    self._cache_updating.wait()
+                    self._resize_cache(self.cache_max, self._max_slots-1)
+
                 if self._disk_cache:
                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
                 else:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list