[arvados] created: 2.7.0-6265-gcc3f89292c
git repository hosting
git at public.arvados.org
Mon Apr 1 15:11:14 UTC 2024
at cc3f89292c0136ce5d9e56506f82ea743c59fff8 (commit)
commit cc3f89292c0136ce5d9e56506f82ea743c59fff8
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Apr 1 11:10:52 2024 -0400
21639: Don't return broken cache slots
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 23c0a80cbf..4949574f2b 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -13,6 +13,7 @@ import time
import errno
import logging
import weakref
+import collections
_logger = logging.getLogger('arvados.keep')
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 51d40dbece..1d0fc5f159 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -273,7 +273,7 @@ class KeepBlockCache(object):
_evict_candidates = collections.deque(self._cache.values())
sm = sum([slot.size() for slot in _evict_candidates])
- while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+ while len(_evict_candidates) > 0 and (sm > cache_max or len(self._cache) > max_slots):
slot = _evict_candidates.popleft()
if not slot.ready.is_set():
continue
@@ -313,7 +313,10 @@ class KeepBlockCache(object):
# Test if the locator is already in the cache
if locator in self._cache:
n = self._cache[locator]
- self._cache.move_to_back(locator)
+ if n.ready.is_set() and n.content is None:
+ del self._cache[n.locator]
+ return None
+ self._cache.move_to_end(locator)
return n
if self._disk_cache:
# see if it exists on disk
commit 062542e47eb3bbb1ad911f2bcb6e51967f80db86
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Apr 1 09:52:53 2024 -0400
21639: Use a more efficient data structure for the keep block cache
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index f8fca57803..23c0a80cbf 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -237,13 +237,13 @@ class DiskCacheSlot(object):
# Map in all the files we found, up to maxslots, if we exceed
# maxslots, start throwing things out.
- cachelist = []
+ cachelist = collections.OrderedDict()
for b in blocks:
got = DiskCacheSlot.get_from_disk(b[0], cachedir)
if got is None:
continue
if len(cachelist) < maxslots:
- cachelist.append(got)
+ cachelist[got.locator] = got
else:
# we found more blocks than maxslots, try to
# throw it out of the cache.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4b00f7df8b..51d40dbece 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -182,7 +182,7 @@ class Keep(object):
class KeepBlockCache(object):
def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
- self._cache = []
+ self._cache = collections.OrderedDict()
self._cache_lock = threading.Lock()
self._max_slots = max_slots
self._disk_cache = disk_cache
@@ -271,33 +271,36 @@ class KeepBlockCache(object):
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
+ _evict_candidates = collections.deque(self._cache.values())
+ sm = sum([slot.size() for slot in _evict_candidates])
while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
- for i in range(len(self._cache)-1, -1, -1):
- # start from the back, find a slot that is a candidate to evict
- if self._cache[i].ready.is_set():
- sz = self._cache[i].size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if self._cache[i].evict():
- sm -= sz
-
- # check to make sure the underlying data is gone
- if self._cache[i].gone():
- # either way we forget about it. either the
- # other process will delete it, or if we need
- # it again and it is still there, we'll find
- # it on disk.
- del self._cache[i]
- break
+ slot = _evict_candidates.popleft()
+ if not slot.ready.is_set():
+ continue
+
+ if slot.content is None:
+ # error
+ del self._cache[slot.locator]
+ continue
+
+ sz = slot.size()
+
+ # If evict returns false it means the
+ # underlying disk cache couldn't lock the file
+ # for deletion because another process was using
+ # it. Don't count it as reducing the amount
+ # of data in the cache, find something else to
+ # throw out.
+ if slot.evict():
+ sm -= sz
+
+ # check to make sure the underlying data is gone
+ if slot.gone():
+ # either way we forget about it. either the
+ # other process will delete it, or if we need
+ # it again and it is still there, we'll find
+ # it on disk.
+ del self._cache[slot.locator]
def cap_cache(self):
@@ -308,19 +311,15 @@ class KeepBlockCache(object):
def _get(self, locator):
# Test if the locator is already in the cache
- for i in range(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n
+ if locator in self._cache:
+ n = self._cache[locator]
+ self._cache.move_to_back(locator)
+ return n
if self._disk_cache:
# see if it exists on disk
n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
if n is not None:
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
return n
return None
@@ -350,7 +349,7 @@ class KeepBlockCache(object):
n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
else:
n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
return n, True
def set(self, slot, blob):
@@ -365,7 +364,7 @@ class KeepBlockCache(object):
elif e.errno == errno.ENOSPC:
# Reduce disk max space to current - 256 MiB, cap cache and retry
with self._cache_lock:
- sm = sum([st.size() for st in self._cache])
+ sm = sum([st.size() for st in self._cache.values()])
self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
elif e.errno == errno.ENODEV:
_logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
@@ -1426,6 +1425,9 @@ class KeepClient(object):
does not block.
"""
+ if self.block_cache.get(locator) is not None:
+ return
+
self._start_prefetch_threads()
self._prefetch_queue.put(locator)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list