[arvados] created: 2.6.0-281-g4e4c935d6

git repository hosting git at public.arvados.org
Fri Jun 16 19:53:47 UTC 2023


        at  4e4c935d6fddb68997a50a382bff01c223dd00df (commit)


commit 4e4c935d6fddb68997a50a382bff01c223dd00df
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Fri Jun 16 15:46:48 2023 -0400

    20637: Fix tests
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 691716b3f..cf6dec1a5 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -27,6 +27,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def __init__(self, blocks):
             self.blocks = blocks
             self.requests = []
+            self.num_prefetch_threads = 1
         def get(self, locator, num_retries=0, prefetch=False):
             self.requests.append(locator)
             return self.blocks.get(locator)
@@ -37,6 +38,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             pdh = tutil.str_keep_locator(data)
             self.blocks[pdh] = bytes(data)
             return pdh
+        def block_prefetch(self, loc):
+            self.requests.append(loc)
 
     class MockApi(object):
         def __init__(self, b, r):
@@ -627,7 +630,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
             def __init__(self, blocks, nocache):
                 self.blocks = blocks
                 self.nocache = nocache
-                self.num_get_threads = 1
+                self._keep = ArvadosFileWriterTestCase.MockKeep({})
 
             def block_prefetch(self, loc):
                 pass
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index c79607fca..56c2352ad 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -323,6 +323,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
     class MockKeep(object):
         def __init__(self, content, num_retries=0):
             self.content = content
+            self.num_prefetch_threads = 1
 
         def get(self, locator, num_retries=0, prefetch=False):
             return self.content[locator]

commit ea1d18899dc2d6518e53f508ffec1e14ebb8af51
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Fri Jun 16 14:53:49 2023 -0400

    20637: Fix arv-get
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py
index 89b333808..c4db072cc 100755
--- a/sdk/python/arvados/commands/get.py
+++ b/sdk/python/arvados/commands/get.py
@@ -197,8 +197,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     try:
         reader = arvados.CollectionReader(
             col_loc, api_client=api_client, num_retries=args.retries,
-            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
-            get_threads=args.threads)
+            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1

commit cdf63c1c50a044fe66f224b5cf36b632ecff12a5
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Fri Jun 16 14:35:20 2023 -0400

    20637: Move the prefetch thread pool to KeepClient
    
    This avoids the problem of every Collection with its own BlockManager
    creating its own prefetch thread pool, which becomes a resource leak
    when reading files from 1000s of separate Collection objects.
    
    The 'put' thread pool remains with the BlockManager but it now stops
    the put threads on 'BlockManager.commit_all'.  This is because this
    method always flushes pending blocks anyway, and is called before the
    collection record is written to the API server -- so we can assume
    we've just finished a batch of writes to that collection, and might
    not need the put thread pool any more, and if we do, we can just make
    a new one.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2ce0e46b3..a1b2241b3 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -479,20 +479,20 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
+    def __init__(self, keep,
+                 copies=None,
+                 put_threads=None,
+                 num_retries=None,
+                 storage_classes_func=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
-        self._prefetch_queue = None
-        self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -586,29 +586,6 @@ class _BlockManager(object):
                     thread.daemon = True
                     thread.start()
 
-    def _block_prefetch_worker(self):
-        """The background downloader thread."""
-        while True:
-            try:
-                b = self._prefetch_queue.get()
-                if b is None:
-                    return
-                self._keep.get(b, prefetch=True)
-            except Exception:
-                _logger.exception("Exception doing block prefetch")
-
-    @synchronized
-    def start_get_threads(self):
-        if self._prefetch_threads is None:
-            self._prefetch_queue = queue.Queue()
-            self._prefetch_threads = []
-            for i in range(0, self.num_get_threads):
-                thread = threading.Thread(target=self._block_prefetch_worker)
-                self._prefetch_threads.append(thread)
-                thread.daemon = True
-                thread.start()
-
-
     @synchronized
     def stop_threads(self):
         """Shut down and wait for background upload and download threads to finish."""
@@ -621,14 +598,6 @@ class _BlockManager(object):
         self._put_threads = None
         self._put_queue = None
 
-        if self._prefetch_threads is not None:
-            for t in self._prefetch_threads:
-                self._prefetch_queue.put(None)
-            for t in self._prefetch_threads:
-                t.join()
-        self._prefetch_threads = None
-        self._prefetch_queue = None
-
     def __enter__(self):
         return self
 
@@ -828,14 +797,10 @@ class _BlockManager(object):
                         owner.flush(sync=True)
                     self.delete_bufferblock(k)
 
+        self.stop_threads()
+
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
-
-        This assumes that the underlying KeepClient implements a block cache,
-        so repeated requests for the same block will not result in repeated
-        downloads (unless the block is evicted from the cache.)  This method
-        does not block.
-
         """
 
         if not self.prefetch_enabled:
@@ -845,8 +810,7 @@ class _BlockManager(object):
             if locator in self._bufferblocks:
                 return
 
-        self.start_get_threads()
-        self._prefetch_queue.put(locator)
+        self._keep.block_prefetch(locator)
 
 
 class ArvadosFile(object):
@@ -1096,7 +1060,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
 
         locs = set()
         data = []
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 23b4393a9..bd1775db2 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1262,8 +1262,7 @@ class Collection(RichCollectionBase):
                  block_manager=None,
                  replication_desired=None,
                  storage_classes_desired=None,
-                 put_threads=None,
-                 get_threads=None):
+                 put_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1317,7 +1316,6 @@ class Collection(RichCollectionBase):
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
-        self.get_threads = get_threads
 
         if apiconfig:
             self._config = apiconfig
@@ -1435,8 +1433,7 @@ class Collection(RichCollectionBase):
                                                 copies=copies,
                                                 put_threads=self.put_threads,
                                                 num_retries=self.num_retries,
-                                                storage_classes_func=self.storage_classes_desired,
-                                                get_threads=self.get_threads,)
+                                                storage_classes_func=self.storage_classes_desired)
         return self._block_manager
 
     def _remember_api_response(self, response):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index a2c8fd249..4b00f7df8 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -835,7 +835,7 @@ class KeepClient(object):
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
-                 num_retries=10, session=None):
+                 num_retries=10, session=None, num_prefetch_threads=None):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -924,6 +924,9 @@ class KeepClient(object):
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
+        self.num_prefetch_threads = num_prefetch_threads or 2
+        self._prefetch_queue = None
+        self._prefetch_threads = None
 
         if local_store:
             self.local_store = local_store
@@ -1391,6 +1394,51 @@ class KeepClient(object):
                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
+    def _block_prefetch_worker(self):
+        """The background downloader thread."""
+        while True:
+            try:
+                b = self._prefetch_queue.get()
+                if b is None:
+                    return
+                self.get(b, prefetch=True)
+            except Exception:
+                _logger.exception("Exception doing block prefetch")
+
+    def _start_prefetch_threads(self):
+        if self._prefetch_threads is None:
+            with self.lock:
+                if self._prefetch_threads is not None:
+                    return
+                self._prefetch_queue = queue.Queue()
+                self._prefetch_threads = []
+                for i in range(0, self.num_prefetch_threads):
+                    thread = threading.Thread(target=self._block_prefetch_worker)
+                    self._prefetch_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
+
+    def block_prefetch(self, locator):
+        """
+        This relies on the fact that KeepClient implements a block cache,
+        so repeated requests for the same block will not result in repeated
+        downloads (unless the block is evicted from the cache.)  This method
+        does not block.
+        """
+
+        self._start_prefetch_threads()
+        self._prefetch_queue.put(locator)
+
+    def stop_prefetch_threads(self):
+        with self.lock:
+            if self._prefetch_threads is not None:
+                for t in self._prefetch_threads:
+                    self._prefetch_queue.put(None)
+                for t in self._prefetch_threads:
+                    t.join()
+            self._prefetch_threads = None
+            self._prefetch_queue = None
+
     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 95b9a9773..c2242eb2b 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -228,17 +228,28 @@ class Mount(object):
 
     def _setup_api(self):
         try:
+            # default value of file_cache is 0, this tells KeepBlockCache to
+            # choose a default based on whether disk_cache is enabled or not.
+
+            block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
+                                                      disk_cache=self.args.disk_cache,
+                                                      disk_cache_dir=self.args.disk_cache_dir)
+
+            # If there's too many prefetch threads and you
+            # max out the CPU, delivering data to the FUSE
+            # layer actually ends up being slower.
+            # Experimentally, capping 7 threads seems to
+            # be a sweet spot.
+            prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+
             self.api = arvados.safeapi.ThreadSafeApiCache(
                 apiconfig=arvados.config.settings(),
                 api_params={
                     'num_retries': self.args.retries,
                 },
-                # default value of file_cache is 0, this tells KeepBlockCache to
-                # choose a default based on whether disk_cache is enabled or not.
                 keep_params={
-                    'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
-                                                               disk_cache=self.args.disk_cache,
-                                                               disk_cache_dir=self.args.disk_cache_dir),
+                    'block_cache': block_cache,
+                    'num_prefetch_threads': prefetch_threads,
                     'num_retries': self.args.retries,
                 },
                 version='v1',
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index f3816c0d3..7de95a0cb 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -525,23 +525,15 @@ class CollectionDirectory(CollectionDirectoryBase):
                         self.collection.update()
                         new_collection_record = self.collection.api_response()
                     else:
-                        # If there's too many prefetch threads and you
-                        # max out the CPU, delivering data to the FUSE
-                        # layer actually ends up being slower.
-                        # Experimentally, capping 7 threads seems to
-                        # be a sweet spot.
-                        get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
                         # Create a new collection object
                         if uuid_pattern.match(self.collection_locator):
                             coll_reader = arvados.collection.Collection(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries,
-                                get_threads=get_threads)
+                                num_retries=self.num_retries)
                         else:
                             coll_reader = arvados.collection.CollectionReader(
                                 self.collection_locator, self.api, self.api.keep,
-                                num_retries=self.num_retries,
-                                get_threads=get_threads)
+                                num_retries=self.num_retries)
                         new_collection_record = coll_reader.api_response() or {}
                         # If the Collection only exists in Keep, there will be no API
                         # response.  Fill in the fields we need.

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list