[arvados] created: 2.6.0-281-g4e4c935d6
git repository hosting
git at public.arvados.org
Fri Jun 16 19:53:47 UTC 2023
at 4e4c935d6fddb68997a50a382bff01c223dd00df (commit)
commit 4e4c935d6fddb68997a50a382bff01c223dd00df
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Jun 16 15:46:48 2023 -0400
20637: Fix tests
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 691716b3f..cf6dec1a5 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -27,6 +27,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
+ self.num_prefetch_threads = 1
def get(self, locator, num_retries=0, prefetch=False):
self.requests.append(locator)
return self.blocks.get(locator)
@@ -37,6 +38,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
pdh = tutil.str_keep_locator(data)
self.blocks[pdh] = bytes(data)
return pdh
+ def block_prefetch(self, loc):
+ self.requests.append(loc)
class MockApi(object):
def __init__(self, b, r):
@@ -627,7 +630,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
def __init__(self, blocks, nocache):
self.blocks = blocks
self.nocache = nocache
- self.num_get_threads = 1
+ self._keep = ArvadosFileWriterTestCase.MockKeep({})
def block_prefetch(self, loc):
pass
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index c79607fca..56c2352ad 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -323,6 +323,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
class MockKeep(object):
def __init__(self, content, num_retries=0):
self.content = content
+ self.num_prefetch_threads = 1
def get(self, locator, num_retries=0, prefetch=False):
return self.content[locator]
commit ea1d18899dc2d6518e53f508ffec1e14ebb8af51
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Jun 16 14:53:49 2023 -0400
20637: Fix arv-get
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py
index 89b333808..c4db072cc 100755
--- a/sdk/python/arvados/commands/get.py
+++ b/sdk/python/arvados/commands/get.py
@@ -197,8 +197,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
try:
reader = arvados.CollectionReader(
col_loc, api_client=api_client, num_retries=args.retries,
- keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
- get_threads=args.threads)
+ keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
except Exception as error:
logger.error("failed to read collection: {}".format(error))
return 1
commit cdf63c1c50a044fe66f224b5cf36b632ecff12a5
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Jun 16 14:35:20 2023 -0400
20637: Move the prefetch thread pool to KeepClient
This avoids the problem of every Collection with its own BlockManager
creating its own prefetch thread pool, which becomes a resource leak
when reading files from 1000s of separate Collection objects.
The 'put' thread pool remains with the BlockManager but it now stops
the put threads on 'BlockManager.commit_all'. This is because this
method always flushes pending blocks anyway, and is called before the
collection record is written to the API server -- so we can assume
we've just finished a batch of writes to that collection, and might
not need the put thread pool any more, and if we do, we can just make
a new one.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2ce0e46b3..a1b2241b3 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -479,20 +479,20 @@ class _BlockManager(object):
"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
+ def __init__(self, keep,
+ copies=None,
+ put_threads=None,
+ num_retries=None,
+ storage_classes_func=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
- self._prefetch_queue = None
- self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
@@ -586,29 +586,6 @@ class _BlockManager(object):
thread.daemon = True
thread.start()
- def _block_prefetch_worker(self):
- """The background downloader thread."""
- while True:
- try:
- b = self._prefetch_queue.get()
- if b is None:
- return
- self._keep.get(b, prefetch=True)
- except Exception:
- _logger.exception("Exception doing block prefetch")
-
- @synchronized
- def start_get_threads(self):
- if self._prefetch_threads is None:
- self._prefetch_queue = queue.Queue()
- self._prefetch_threads = []
- for i in range(0, self.num_get_threads):
- thread = threading.Thread(target=self._block_prefetch_worker)
- self._prefetch_threads.append(thread)
- thread.daemon = True
- thread.start()
-
-
@synchronized
def stop_threads(self):
"""Shut down and wait for background upload and download threads to finish."""
@@ -621,14 +598,6 @@ class _BlockManager(object):
self._put_threads = None
self._put_queue = None
- if self._prefetch_threads is not None:
- for t in self._prefetch_threads:
- self._prefetch_queue.put(None)
- for t in self._prefetch_threads:
- t.join()
- self._prefetch_threads = None
- self._prefetch_queue = None
-
def __enter__(self):
return self
@@ -828,14 +797,10 @@ class _BlockManager(object):
owner.flush(sync=True)
self.delete_bufferblock(k)
+ self.stop_threads()
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
-
- This assumes that the underlying KeepClient implements a block cache,
- so repeated requests for the same block will not result in repeated
- downloads (unless the block is evicted from the cache.) This method
- does not block.
-
"""
if not self.prefetch_enabled:
@@ -845,8 +810,7 @@ class _BlockManager(object):
if locator in self._bufferblocks:
return
- self.start_get_threads()
- self._prefetch_queue.put(locator)
+ self._keep.block_prefetch(locator)
class ArvadosFile(object):
@@ -1096,7 +1060,7 @@ class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
locs = set()
data = []
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 23b4393a9..bd1775db2 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1262,8 +1262,7 @@ class Collection(RichCollectionBase):
block_manager=None,
replication_desired=None,
storage_classes_desired=None,
- put_threads=None,
- get_threads=None):
+ put_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
@@ -1317,7 +1316,6 @@ class Collection(RichCollectionBase):
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
self.put_threads = put_threads
- self.get_threads = get_threads
if apiconfig:
self._config = apiconfig
@@ -1435,8 +1433,7 @@ class Collection(RichCollectionBase):
copies=copies,
put_threads=self.put_threads,
num_retries=self.num_retries,
- storage_classes_func=self.storage_classes_desired,
- get_threads=self.get_threads,)
+ storage_classes_func=self.storage_classes_desired)
return self._block_manager
def _remember_api_response(self, response):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index a2c8fd249..4b00f7df8 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -835,7 +835,7 @@ class KeepClient(object):
def __init__(self, api_client=None, proxy=None,
timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
- num_retries=10, session=None):
+ num_retries=10, session=None, num_prefetch_threads=None):
"""Initialize a new KeepClient.
Arguments:
@@ -924,6 +924,9 @@ class KeepClient(object):
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
+ self.num_prefetch_threads = num_prefetch_threads or 2
+ self._prefetch_queue = None
+ self._prefetch_threads = None
if local_store:
self.local_store = local_store
@@ -1391,6 +1394,51 @@ class KeepClient(object):
"[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+ def _block_prefetch_worker(self):
+ """The background downloader thread."""
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self.get(b, prefetch=True)
+ except Exception:
+ _logger.exception("Exception doing block prefetch")
+
+ def _start_prefetch_threads(self):
+ if self._prefetch_threads is None:
+ with self.lock:
+ if self._prefetch_threads is not None:
+ return
+ self._prefetch_queue = queue.Queue()
+ self._prefetch_threads = []
+ for i in range(0, self.num_prefetch_threads):
+ thread = threading.Thread(target=self._block_prefetch_worker)
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ def block_prefetch(self, locator):
+ """
+ This relies on the fact that KeepClient implements a block cache,
+ so repeated requests for the same block will not result in repeated
+ downloads (unless the block is evicted from the cache.) This method
+ does not block.
+ """
+
+ self._start_prefetch_threads()
+ self._prefetch_queue.put(locator)
+
+ def stop_prefetch_threads(self):
+ with self.lock:
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
"""A stub for put().
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 95b9a9773..c2242eb2b 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -228,17 +228,28 @@ class Mount(object):
def _setup_api(self):
try:
+ # default value of file_cache is 0, this tells KeepBlockCache to
+ # choose a default based on whether disk_cache is enabled or not.
+
+ block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
+ disk_cache=self.args.disk_cache,
+ disk_cache_dir=self.args.disk_cache_dir)
+
+ # If there's too many prefetch threads and you
+ # max out the CPU, delivering data to the FUSE
+ # layer actually ends up being slower.
+ # Experimentally, capping 7 threads seems to
+ # be a sweet spot.
+ prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+
self.api = arvados.safeapi.ThreadSafeApiCache(
apiconfig=arvados.config.settings(),
api_params={
'num_retries': self.args.retries,
},
- # default value of file_cache is 0, this tells KeepBlockCache to
- # choose a default based on whether disk_cache is enabled or not.
keep_params={
- 'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
- disk_cache=self.args.disk_cache,
- disk_cache_dir=self.args.disk_cache_dir),
+ 'block_cache': block_cache,
+ 'num_prefetch_threads': prefetch_threads,
'num_retries': self.args.retries,
},
version='v1',
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index f3816c0d3..7de95a0cb 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -525,23 +525,15 @@ class CollectionDirectory(CollectionDirectoryBase):
self.collection.update()
new_collection_record = self.collection.api_response()
else:
- # If there's too many prefetch threads and you
- # max out the CPU, delivering data to the FUSE
- # layer actually ends up being slower.
- # Experimentally, capping 7 threads seems to
- # be a sweet spot.
- get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
# Create a new collection object
if uuid_pattern.match(self.collection_locator):
coll_reader = arvados.collection.Collection(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries,
- get_threads=get_threads)
+ num_retries=self.num_retries)
else:
coll_reader = arvados.collection.CollectionReader(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries,
- get_threads=get_threads)
+ num_retries=self.num_retries)
new_collection_record = coll_reader.api_response() or {}
# If the Collection only exists in Keep, there will be no API
# response. Fill in the fields we need.
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list