[ARVADOS] updated: 2.1.0-2164-g32c9b81e2
Git user
git at public.arvados.org
Tue Mar 29 16:03:24 UTC 2022
Summary of changes:
sdk/python/arvados/arvfile.py | 15 +++++++++++----
sdk/python/arvados/keep.py | 13 ++++++++-----
2 files changed, 19 insertions(+), 9 deletions(-)
via 32c9b81e2c1bce19673c73cb14490f6e9dde0fc6 (commit)
from ff635ccb09b0b79663b0220e16b8a0ef00997f5d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 32c9b81e2c1bce19673c73cb14490f6e9dde0fc6
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue Mar 29 15:42:11 2022 +0000
18941: bugfixing prefetch
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 0fcdc1e63..b21ebd331 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -479,7 +479,7 @@ class _BlockManager(object):
"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 2
+ DEFAULT_GET_THREADS = 4
def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
"""keep: KeepClient object to use"""
@@ -593,6 +593,9 @@ class _BlockManager(object):
b = self._prefetch_queue.get()
if b is None:
return
+ if self._keep.has_cache_slot(b):
+ continue
+ _logger.debug("prefetching %s", b)
self._keep.get(b)
except Exception:
_logger.exception("Exception doing block prefetch")
@@ -841,7 +844,7 @@ class _BlockManager(object):
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
+ if self._keep.has_cache_slot(locator):
return
with self.lock:
@@ -849,6 +852,7 @@ class _BlockManager(object):
return
self.start_get_threads()
+ # _logger.debug("pushing %s to prefetch", locator)
self._prefetch_queue.put(locator)
@@ -1099,7 +1103,7 @@ class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
locs = set()
data = []
@@ -1117,7 +1121,10 @@ class ArvadosFile(object):
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- return b''.join(data)
+ if len(data) == 1:
+ return data[0]
+ else:
+ return b''.join(data)
@must_be_writable
@synchronized
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 1a83eae94..df01c3a55 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -176,7 +176,7 @@ class Keep(object):
class KeepBlockCache(object):
# Default RAM cache is 256MiB
- def __init__(self, cache_max=(256 * 1024 * 1024)):
+ def __init__(self, cache_max=(1024 * 1024 * 1024)):
self.cache_max = cache_max
self._cache = []
self._cache_lock = threading.Lock()
@@ -1036,14 +1036,19 @@ class KeepClient(object):
else:
return None
- def get_from_cache(self, loc):
+ def get_from_cache(self, loc_s):
"""Fetch a block only if is in the cache, otherwise return None."""
- slot = self.block_cache.get(loc)
+ locator = KeepLocator(loc_s)
+ slot = self.block_cache.get(locator.md5sum)
if slot is not None and slot.ready.is_set():
return slot.get()
else:
return None
+ def has_cache_slot(self, loc_s):
+ locator = KeepLocator(loc_s)
+ return self.block_cache.get(locator.md5sum) is not None
+
def refresh_signature(self, loc):
"""Ask Keep to get the remote block and return its local signature"""
now = datetime.datetime.utcnow().isoformat("T") + 'Z'
@@ -1333,5 +1338,3 @@ class KeepClient(object):
if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
return True
- def is_cached(self, locator):
- return self.block_cache.reserve_cache(expect_hash)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list