[arvados] updated: 2.6.0-577-gbb0f2e6285

git repository hosting git at public.arvados.org
Mon Sep 11 20:46:05 UTC 2023


Summary of changes:
 sdk/python/arvados/commands/arv_copy.py | 26 +++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)

       via  bb0f2e6285a2fa846044cc15592a752e0f13234e (commit)
      from  64ebdb367f035c3882c471e5f764bfb04b18ed9e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit bb0f2e6285a2fa846044cc15592a752e0f13234e
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 16:45:44 2023 -0400

    20937: fix deadlock with duplicated blocks
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 0efef7b8b5..6c2ae9e735 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -587,10 +587,12 @@ def copy_collection(obj_uuid, src, dst, args):
     # block hashes we want to get, but these are small
     get_queue = queue.Queue()
 
+    threadcount = 4
+
     # the put queue contains full data blocks
     # and if 'get' is faster than 'put' we could end up consuming
     # a great deal of RAM if it isn't bounded.
-    put_queue = queue.Queue(4)
+    put_queue = queue.Queue(threadcount)
     transfer_error = []
 
     def get_thread():
@@ -604,6 +606,8 @@ def copy_collection(obj_uuid, src, dst, args):
             blockhash = arvados.KeepLocator(word).md5sum
             with lock:
                 if blockhash in dst_locators:
+                    # Already uploaded
+                    get_queue.task_done()
                     continue
 
             try:
@@ -634,6 +638,8 @@ def copy_collection(obj_uuid, src, dst, args):
             blockhash = loc.md5sum
             with lock:
                 if blockhash in dst_locators:
+                    # Already uploaded
+                    put_queue.task_done()
                     continue
 
             try:
@@ -668,20 +674,14 @@ def copy_collection(obj_uuid, src, dst, args):
 
             get_queue.put(word)
 
-    get_queue.put(None)
-    get_queue.put(None)
-    get_queue.put(None)
-    get_queue.put(None)
+    for i in range(0, threadcount):
+        get_queue.put(None)
 
-    threading.Thread(target=get_thread, daemon=True).start()
-    threading.Thread(target=get_thread, daemon=True).start()
-    threading.Thread(target=get_thread, daemon=True).start()
-    threading.Thread(target=get_thread, daemon=True).start()
+    for i in range(0, threadcount):
+        threading.Thread(target=get_thread, daemon=True).start()
 
-    threading.Thread(target=put_thread, daemon=True).start()
-    threading.Thread(target=put_thread, daemon=True).start()
-    threading.Thread(target=put_thread, daemon=True).start()
-    threading.Thread(target=put_thread, daemon=True).start()
+    for i in range(0, threadcount):
+        threading.Thread(target=put_thread, daemon=True).start()
 
     get_queue.join()
     put_queue.join()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list