[ARVADOS] updated: 2.1.0-2165-g2d6c425e7
Git user
git at public.arvados.org
Tue Mar 29 16:25:18 UTC 2022
Summary of changes:
sdk/python/arvados/arvfile.py | 13 +++++--------
sdk/python/arvados/collection.py | 11 +++++++++--
sdk/python/arvados/commands/get.py | 13 ++++++++++++-
sdk/python/arvados/keep.py | 3 +--
4 files changed, 27 insertions(+), 13 deletions(-)
via 2d6c425e78bc5712c63b4ebecb05077b0e30da1f (commit)
from 32c9b81e2c1bce19673c73cb14490f6e9dde0fc6 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 2d6c425e78bc5712c63b4ebecb05077b0e30da1f
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue Mar 29 12:24:53 2022 -0400
18941: Add --threads option to arv-get
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index b21ebd331..fbf593d02 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -479,9 +479,9 @@ class _BlockManager(object):
"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 4
+ DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
@@ -1103,7 +1103,7 @@ class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
locs = set()
data = []
@@ -1121,10 +1121,7 @@ class ArvadosFile(object):
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- if len(data) == 1:
- return data[0]
- else:
- return b''.join(data)
+ return b''.join(data)
@must_be_writable
@synchronized
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a076de6ba..a44d42b6a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1262,7 +1262,8 @@ class Collection(RichCollectionBase):
block_manager=None,
replication_desired=None,
storage_classes_desired=None,
- put_threads=None):
+ put_threads=None,
+ get_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
@@ -1311,6 +1312,7 @@ class Collection(RichCollectionBase):
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
self.put_threads = put_threads
+ self.get_threads = get_threads
if apiconfig:
self._config = apiconfig
@@ -1424,7 +1426,12 @@ class Collection(RichCollectionBase):
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+ self._block_manager = _BlockManager(self._my_keep(),
+ copies=copies,
+ put_threads=self.put_threads,
+ num_retries=self.num_retries,
+ storage_classes_func=self.storage_classes_desired,
+ get_threads=self.get_threads,)
return self._block_manager
def _remember_api_response(self, response):
diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py
index eb6829762..a377c149d 100755
--- a/sdk/python/arvados/commands/get.py
+++ b/sdk/python/arvados/commands/get.py
@@ -98,6 +98,15 @@ When getting a collection manifest, strip its access tokens before writing
it.
""")
+parser.add_argument('--threads', type=int, metavar='N', default=2,
+ help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 2 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
def parse_arguments(arguments, stdout, stderr):
args = parser.parse_args(arguments)
@@ -191,7 +200,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
try:
reader = arvados.CollectionReader(
- col_loc, api_client=api_client, num_retries=args.retries)
+ col_loc, api_client=api_client, num_retries=args.retries,
+ keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+ get_threads=args.threads)
except Exception as error:
logger.error("failed to read collection: {}".format(error))
return 1
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index df01c3a55..94104586d 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -176,7 +176,7 @@ class Keep(object):
class KeepBlockCache(object):
# Default RAM cache is 256MiB
- def __init__(self, cache_max=(1024 * 1024 * 1024)):
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
self.cache_max = cache_max
self._cache = []
self._cache_lock = threading.Lock()
@@ -1337,4 +1337,3 @@ class KeepClient(object):
return True
if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
return True
-
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list