[ARVADOS] updated: 2.1.0-2165-g2d6c425e7

Git user git at public.arvados.org
Tue Mar 29 16:25:18 UTC 2022


Summary of changes:
 sdk/python/arvados/arvfile.py      | 13 +++++--------
 sdk/python/arvados/collection.py   | 11 +++++++++--
 sdk/python/arvados/commands/get.py | 13 ++++++++++++-
 sdk/python/arvados/keep.py         |  3 +--
 4 files changed, 27 insertions(+), 13 deletions(-)

       via  2d6c425e78bc5712c63b4ebecb05077b0e30da1f (commit)
      from  32c9b81e2c1bce19673c73cb14490f6e9dde0fc6 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2d6c425e78bc5712c63b4ebecb05077b0e30da1f
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Tue Mar 29 12:24:53 2022 -0400

    18941: Add --threads option to arv-get
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index b21ebd331..fbf593d02 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -479,9 +479,9 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 4
+    DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -1103,7 +1103,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
 
         locs = set()
         data = []
@@ -1121,10 +1121,7 @@ class ArvadosFile(object):
                 self.parent._my_block_manager().block_prefetch(lr.locator)
                 locs.add(lr.locator)
 
-        if len(data) == 1:
-            return data[0]
-        else:
-            return b''.join(data)
+        return b''.join(data)
 
     @must_be_writable
     @synchronized
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a076de6ba..a44d42b6a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1262,7 +1262,8 @@ class Collection(RichCollectionBase):
                  block_manager=None,
                  replication_desired=None,
                  storage_classes_desired=None,
-                 put_threads=None):
+                 put_threads=None,
+                 get_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1311,6 +1312,7 @@ class Collection(RichCollectionBase):
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
+        self.get_threads = get_threads
 
         if apiconfig:
             self._config = apiconfig
@@ -1424,7 +1426,12 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+            self._block_manager = _BlockManager(self._my_keep(),
+                                                copies=copies,
+                                                put_threads=self.put_threads,
+                                                num_retries=self.num_retries,
+                                                storage_classes_func=self.storage_classes_desired,
+                                                get_threads=self.get_threads,)
         return self._block_manager
 
     def _remember_api_response(self, response):
diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py
index eb6829762..a377c149d 100755
--- a/sdk/python/arvados/commands/get.py
+++ b/sdk/python/arvados/commands/get.py
@@ -98,6 +98,15 @@ When getting a collection manifest, strip its access tokens before writing
 it.
 """)
 
+parser.add_argument('--threads', type=int, metavar='N', default=2,
+                    help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 2 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
 def parse_arguments(arguments, stdout, stderr):
     args = parser.parse_args(arguments)
 
@@ -191,7 +200,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     try:
         reader = arvados.CollectionReader(
-            col_loc, api_client=api_client, num_retries=args.retries)
+            col_loc, api_client=api_client, num_retries=args.retries,
+            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+            get_threads=args.threads)
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index df01c3a55..94104586d 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -176,7 +176,7 @@ class Keep(object):
 
 class KeepBlockCache(object):
     # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(1024 * 1024 * 1024)):
+    def __init__(self, cache_max=(256 * 1024 * 1024)):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
@@ -1337,4 +1337,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list