[arvados] created: 2.7.0-6265-gcc3f89292c

git repository hosting git at public.arvados.org
Mon Apr 1 15:11:14 UTC 2024


        at  cc3f89292c0136ce5d9e56506f82ea743c59fff8 (commit)


commit cc3f89292c0136ce5d9e56506f82ea743c59fff8
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Apr 1 11:10:52 2024 -0400

    21639: Don't return broken cache slots
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 23c0a80cbf..4949574f2b 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -13,6 +13,7 @@ import time
 import errno
 import logging
 import weakref
+import collections
 
 _logger = logging.getLogger('arvados.keep')
 
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 51d40dbece..1d0fc5f159 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -273,7 +273,7 @@ class KeepBlockCache(object):
 
         _evict_candidates = collections.deque(self._cache.values())
         sm = sum([slot.size() for slot in _evict_candidates])
-        while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+        while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
             slot = _evict_candidates.popleft()
             if not slot.ready.is_set():
                 continue
@@ -313,7 +313,10 @@ class KeepBlockCache(object):
         # Test if the locator is already in the cache
         if locator in self._cache:
             n = self._cache[locator]
-            self._cache.move_to_back(locator)
+            if n.ready.is_set() and n.content is None:
+                del self._cache[n.locator]
+                return None
+            self._cache.move_to_end(locator)
             return n
         if self._disk_cache:
             # see if it exists on disk

commit 062542e47eb3bbb1ad911f2bcb6e51967f80db86
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Apr 1 09:52:53 2024 -0400

    21639: Use a more efficient data structure for the keep block cache
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index f8fca57803..23c0a80cbf 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -237,13 +237,13 @@ class DiskCacheSlot(object):
 
         # Map in all the files we found, up to maxslots, if we exceed
         # maxslots, start throwing things out.
-        cachelist = []
+        cachelist = collections.OrderedDict()
         for b in blocks:
             got = DiskCacheSlot.get_from_disk(b[0], cachedir)
             if got is None:
                 continue
             if len(cachelist) < maxslots:
-                cachelist.append(got)
+                cachelist[got.locator] = got
             else:
                 # we found more blocks than maxslots, try to
                 # throw it out of the cache.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4b00f7df8b..51d40dbece 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -182,7 +182,7 @@ class Keep(object):
 class KeepBlockCache(object):
     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
-        self._cache = []
+        self._cache = collections.OrderedDict()
         self._cache_lock = threading.Lock()
         self._max_slots = max_slots
         self._disk_cache = disk_cache
@@ -271,33 +271,36 @@ class KeepBlockCache(object):
         # Try and make sure the contents of the cache do not exceed
         # the supplied maximums.
 
-        # Select all slots except those where ready.is_set() and content is
-        # None (that means there was an error reading the block).
-        self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-        sm = sum([slot.size() for slot in self._cache])
+        _evict_candidates = collections.deque(self._cache.values())
+        sm = sum([slot.size() for slot in _evict_candidates])
         while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
-            for i in range(len(self._cache)-1, -1, -1):
-                # start from the back, find a slot that is a candidate to evict
-                if self._cache[i].ready.is_set():
-                    sz = self._cache[i].size()
-
-                    # If evict returns false it means the
-                    # underlying disk cache couldn't lock the file
-                    # for deletion because another process was using
-                    # it. Don't count it as reducing the amount
-                    # of data in the cache, find something else to
-                    # throw out.
-                    if self._cache[i].evict():
-                        sm -= sz
-
-                    # check to make sure the underlying data is gone
-                    if self._cache[i].gone():
-                        # either way we forget about it.  either the
-                        # other process will delete it, or if we need
-                        # it again and it is still there, we'll find
-                        # it on disk.
-                        del self._cache[i]
-                    break
+            slot = _evict_candidates.popleft()
+            if not slot.ready.is_set():
+                continue
+
+            if slot.content is None:
+                # error
+                del self._cache[slot.locator]
+                continue
+
+            sz = slot.size()
+
+            # If evict returns false it means the
+            # underlying disk cache couldn't lock the file
+            # for deletion because another process was using
+            # it. Don't count it as reducing the amount
+            # of data in the cache, find something else to
+            # throw out.
+            if slot.evict():
+                sm -= sz
+
+            # check to make sure the underlying data is gone
+            if slot.gone():
+                # either way we forget about it.  either the
+                # other process will delete it, or if we need
+                # it again and it is still there, we'll find
+                # it on disk.
+                del self._cache[slot.locator]
 
 
     def cap_cache(self):
@@ -308,19 +311,15 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in range(0, len(self._cache)):
-            if self._cache[i].locator == locator:
-                n = self._cache[i]
-                if i != 0:
-                    # move it to the front
-                    del self._cache[i]
-                    self._cache.insert(0, n)
-                return n
+        if locator in self._cache:
+            n = self._cache[locator]
+            self._cache.move_to_back(locator)
+            return n
         if self._disk_cache:
             # see if it exists on disk
             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
             if n is not None:
-                self._cache.insert(0, n)
+                self._cache[n.locator] = n
                 return n
         return None
 
@@ -350,7 +349,7 @@ class KeepBlockCache(object):
                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
                 else:
                     n = KeepBlockCache.CacheSlot(locator)
-                self._cache.insert(0, n)
+                self._cache[n.locator] = n
                 return n, True
 
     def set(self, slot, blob):
@@ -365,7 +364,7 @@ class KeepBlockCache(object):
             elif e.errno == errno.ENOSPC:
                 # Reduce disk max space to current - 256 MiB, cap cache and retry
                 with self._cache_lock:
-                    sm = sum([st.size() for st in self._cache])
+                    sm = sum([st.size() for st in self._cache.values()])
                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
             elif e.errno == errno.ENODEV:
                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
@@ -1426,6 +1425,9 @@ class KeepClient(object):
         does not block.
         """
 
+        if self.block_cache.get(locator) is not None:
+            return
+
         self._start_prefetch_threads()
         self._prefetch_queue.put(locator)
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list