[arvados] updated: 2.6.0-572-g216b408ecb
git repository hosting
git at public.arvados.org
Mon Sep 11 14:02:47 UTC 2023
Summary of changes:
sdk/python/arvados/commands/arv_copy.py | 59 +++++++++++++++++++++++++--------
1 file changed, 46 insertions(+), 13 deletions(-)
via 216b408ecb9774dd6c3f562f242787de1acfe78d (commit)
from 57380e658e5deed7a1f7f735f88a14f1b616bfc0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 216b408ecb9774dd6c3f562f242787de1acfe78d
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Sep 11 10:02:26 2023 -0400
20937: Add error handling to threaded collection copy
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 4efd85e3e1..901245c950 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -583,11 +583,14 @@ def copy_collection(obj_uuid, src, dst, args):
lock = threading.Lock()
get_queue = queue.Queue()
put_queue = queue.Queue()
+ transfer_error = []
def get_thread():
while True:
word = get_queue.get()
if word is None:
+ put_queue.put(None)
+ get_queue.task_done()
return
blockhash = arvados.KeepLocator(word).md5sum
@@ -595,15 +598,27 @@ def copy_collection(obj_uuid, src, dst, args):
if blockhash in dst_locators:
continue
- logger.debug("Getting block %s", word)
- data = src_keep.get(word)
- put_queue.put((word, data))
- get_queue.task_done()
+ try:
+ logger.debug("Getting block %s", word)
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ except e:
+ logger.error("Error getting block %s: %s", word, e)
+ transfer_error.append(e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ except queue.Empty:
+ pass
+ finally:
+ get_queue.task_done()
def put_thread():
while True:
item = put_queue.get()
if item is None:
+ put_queue.task_done()
return
word, data = item
@@ -613,15 +628,25 @@ def copy_collection(obj_uuid, src, dst, args):
if blockhash in dst_locators:
continue
- logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
- dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
- with lock:
- dst_locators[blockhash] = dst_locator
- bytes_written[0] += loc.size
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
-
- put_queue.task_done()
+ try:
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written[0] += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+ except e:
+ logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ except queue.Empty:
+ pass
+ transfer_error.append(e)
+ finally:
+ put_queue.task_done()
for line in manifest.splitlines():
words = line.split()
@@ -635,6 +660,11 @@ def copy_collection(obj_uuid, src, dst, args):
get_queue.put(word)
+ get_queue.put(None)
+ get_queue.put(None)
+ get_queue.put(None)
+ get_queue.put(None)
+
threading.Thread(target=get_thread, daemon=True).start()
threading.Thread(target=get_thread, daemon=True).start()
threading.Thread(target=get_thread, daemon=True).start()
@@ -648,6 +678,9 @@ def copy_collection(obj_uuid, src, dst, args):
get_queue.join()
put_queue.join()
+ if len(transfer_error) > 0:
+ return {"error_token": "Failed to transfer blocks"}
+
for line in manifest.splitlines():
words = line.split()
dst_manifest.write(words[0])
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list