[arvados] updated: 2.6.0-572-g216b408ecb

git repository hosting git at public.arvados.org
Mon Sep 11 14:02:47 UTC 2023


Summary of changes:
 sdk/python/arvados/commands/arv_copy.py | 59 +++++++++++++++++++++++++--------
 1 file changed, 46 insertions(+), 13 deletions(-)

       via  216b408ecb9774dd6c3f562f242787de1acfe78d (commit)
      from  57380e658e5deed7a1f7f735f88a14f1b616bfc0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 216b408ecb9774dd6c3f562f242787de1acfe78d
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 10:02:26 2023 -0400

    20937: Add error handling to threaded collection copy
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 4efd85e3e1..901245c950 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -583,11 +583,14 @@ def copy_collection(obj_uuid, src, dst, args):
     lock = threading.Lock()
     get_queue = queue.Queue()
     put_queue = queue.Queue()
+    transfer_error = []
 
     def get_thread():
         while True:
             word = get_queue.get()
             if word is None:
+                put_queue.put(None)
+                get_queue.task_done()
                 return
 
             blockhash = arvados.KeepLocator(word).md5sum
@@ -595,15 +598,27 @@ def copy_collection(obj_uuid, src, dst, args):
                 if blockhash in dst_locators:
                     continue
 
-            logger.debug("Getting block %s", word)
-            data = src_keep.get(word)
-            put_queue.put((word, data))
-            get_queue.task_done()
+            try:
+                logger.debug("Getting block %s", word)
+                data = src_keep.get(word)
+                put_queue.put((word, data))
+            except e:
+                logger.error("Error getting block %s: %s", word, e)
+                transfer_error.append(e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                except queue.Empty:
+                    pass
+            finally:
+                get_queue.task_done()
 
     def put_thread():
         while True:
             item = put_queue.get()
             if item is None:
+                put_queue.task_done()
                 return
 
             word, data = item
@@ -613,15 +628,25 @@ def copy_collection(obj_uuid, src, dst, args):
                 if blockhash in dst_locators:
                     continue
 
-            logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
-            dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
-            with lock:
-                dst_locators[blockhash] = dst_locator
-                bytes_written[0] += loc.size
-                if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
-
-            put_queue.task_done()
+            try:
+                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+                with lock:
+                    dst_locators[blockhash] = dst_locator
+                    bytes_written[0] += loc.size
+                    if progress_writer:
+                        progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+            except e:
+                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                except queue.Empty:
+                    pass
+                transfer_error.append(e)
+            finally:
+                put_queue.task_done()
 
     for line in manifest.splitlines():
         words = line.split()
@@ -635,6 +660,11 @@ def copy_collection(obj_uuid, src, dst, args):
 
             get_queue.put(word)
 
+    get_queue.put(None)
+    get_queue.put(None)
+    get_queue.put(None)
+    get_queue.put(None)
+
     threading.Thread(target=get_thread, daemon=True).start()
     threading.Thread(target=get_thread, daemon=True).start()
     threading.Thread(target=get_thread, daemon=True).start()
@@ -648,6 +678,9 @@ def copy_collection(obj_uuid, src, dst, args):
     get_queue.join()
     put_queue.join()
 
+    if len(transfer_error) > 0:
+        return {"error_token": "Failed to transfer blocks"}
+
     for line in manifest.splitlines():
         words = line.split()
         dst_manifest.write(words[0])

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list