[arvados] created: 2.6.0-571-g57380e658e

git repository hosting git at public.arvados.org
Fri Sep 8 02:47:11 UTC 2023


        at  57380e658e5deed7a1f7f735f88a14f1b616bfc0 (commit)


commit 57380e658e5deed7a1f7f735f88a14f1b616bfc0
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 22:46:44 2023 -0400

    20937: Better handling of repeated blocks
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index f9c3259910..4efd85e3e1 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -589,6 +589,12 @@ def copy_collection(obj_uuid, src, dst, args):
             word = get_queue.get()
             if word is None:
                 return
+
+            blockhash = arvados.KeepLocator(word).md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    continue
+
             logger.debug("Getting block %s", word)
             data = src_keep.get(word)
             put_queue.put((word, data))
@@ -599,9 +605,14 @@ def copy_collection(obj_uuid, src, dst, args):
             item = put_queue.get()
             if item is None:
                 return
+
             word, data = item
             loc = arvados.KeepLocator(word)
             blockhash = loc.md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    continue
+
             logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
             dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
             with lock:
@@ -621,14 +632,7 @@ def copy_collection(obj_uuid, src, dst, args):
                 # If 'word' can't be parsed as a locator,
                 # presume it's a filename.
                 continue
-            blockhash = loc.md5sum
-            # copy this block if we haven't seen it before
-            # (otherwise, just reuse the existing dst_locator)
-            with lock:
-                if blockhash in dst_locators:
-                    continue
 
-            # queue it up.
             get_queue.put(word)
 
     threading.Thread(target=get_thread, daemon=True).start()

commit 8e4404b0f90cc1906dfd048e66a377109770a09f
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 22:33:51 2023 -0400

    20937: Fix progress writer
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 3bcb4eead3..f9c3259910 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -589,6 +589,7 @@ def copy_collection(obj_uuid, src, dst, args):
             word = get_queue.get()
             if word is None:
                 return
+            logger.debug("Getting block %s", word)
             data = src_keep.get(word)
             put_queue.put((word, data))
             get_queue.task_done()
@@ -601,10 +602,14 @@ def copy_collection(obj_uuid, src, dst, args):
             word, data = item
             loc = arvados.KeepLocator(word)
             blockhash = loc.md5sum
+            logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
             dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
             with lock:
                 dst_locators[blockhash] = dst_locator
                 bytes_written[0] += loc.size
+                if progress_writer:
+                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
             put_queue.task_done()
 
     for line in manifest.splitlines():
@@ -622,9 +627,6 @@ def copy_collection(obj_uuid, src, dst, args):
             with lock:
                 if blockhash in dst_locators:
                     continue
-                logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
-                if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
 
             # queue it up.
             get_queue.put(word)

commit d790e335318c4c429658dbdfbc4d4f5019b2a806
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 22:27:35 2023 -0400

    20937: Parallel collection copy
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 91b78470ed..3bcb4eead3 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -37,6 +37,8 @@ import tempfile
 import urllib.parse
 import io
 import json
+import queue
+import threading
 
 import arvados
 import arvados.config
@@ -563,42 +565,102 @@ def copy_collection(obj_uuid, src, dst, args):
     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
     dst_manifest = io.StringIO()
     dst_locators = {}
-    bytes_written = 0
+    bytes_written = [0]
     bytes_expected = total_collection_size(manifest)
     if args.progress:
         progress_writer = ProgressWriter(human_progress)
     else:
         progress_writer = None
 
+    # go through the words
+    # put each block loc into 'get' queue
+    # 'get' threads get block and put it into 'put' queue
+    # 'put' threads put block and then update dst_locators
+    #
+    # after going through the whole manifest we go back through it
+    # again and build dst_manifest
+
+    lock = threading.Lock()
+    get_queue = queue.Queue()
+    put_queue = queue.Queue()
+
+    def get_thread():
+        while True:
+            word = get_queue.get()
+            if word is None:
+                return
+            data = src_keep.get(word)
+            put_queue.put((word, data))
+            get_queue.task_done()
+
+    def put_thread():
+        while True:
+            item = put_queue.get()
+            if item is None:
+                return
+            word, data = item
+            loc = arvados.KeepLocator(word)
+            blockhash = loc.md5sum
+            dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+            with lock:
+                dst_locators[blockhash] = dst_locator
+                bytes_written[0] += loc.size
+            put_queue.task_done()
+
     for line in manifest.splitlines():
         words = line.split()
-        dst_manifest.write(words[0])
         for word in words[1:]:
             try:
                 loc = arvados.KeepLocator(word)
             except ValueError:
                 # If 'word' can't be parsed as a locator,
                 # presume it's a filename.
-                dst_manifest.write(' ')
-                dst_manifest.write(word)
                 continue
             blockhash = loc.md5sum
             # copy this block if we haven't seen it before
             # (otherwise, just reuse the existing dst_locator)
-            if blockhash not in dst_locators:
+            with lock:
+                if blockhash in dst_locators:
+                    continue
                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
                 if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written, bytes_expected)
-                data = src_keep.get(word)
-                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
-                dst_locators[blockhash] = dst_locator
-                bytes_written += loc.size
+                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
+            # queue it up.
+            get_queue.put(word)
+
+    threading.Thread(target=get_thread, daemon=True).start()
+    threading.Thread(target=get_thread, daemon=True).start()
+    threading.Thread(target=get_thread, daemon=True).start()
+    threading.Thread(target=get_thread, daemon=True).start()
+
+    threading.Thread(target=put_thread, daemon=True).start()
+    threading.Thread(target=put_thread, daemon=True).start()
+    threading.Thread(target=put_thread, daemon=True).start()
+    threading.Thread(target=put_thread, daemon=True).start()
+
+    get_queue.join()
+    put_queue.join()
+
+    for line in manifest.splitlines():
+        words = line.split()
+        dst_manifest.write(words[0])
+        for word in words[1:]:
+            try:
+                loc = arvados.KeepLocator(word)
+            except ValueError:
+                # If 'word' can't be parsed as a locator,
+                # presume it's a filename.
+                dst_manifest.write(' ')
+                dst_manifest.write(word)
+                continue
+            blockhash = loc.md5sum
             dst_manifest.write(' ')
             dst_manifest.write(dst_locators[blockhash])
         dst_manifest.write("\n")
 
     if progress_writer:
-        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+        progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
         progress_writer.finish()
 
     # Copy the manifest and save the collection.

commit 41541b6d4183cf390db0edccd57864b0037b1fee
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 21:52:14 2023 -0400

    20937: Add http import capability to arv-copy
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 3132fbe465..91b78470ed 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -44,6 +44,7 @@ import arvados.keep
 import arvados.util
 import arvados.commands._util as arv_cmd
 import arvados.commands.keepdocker
+import arvados.http_to_keep
 import ruamel.yaml as yaml
 
 from arvados.api import OrderedJsonModel
@@ -107,6 +108,11 @@ def main():
     copy_opts.add_argument(
         '--storage-classes', dest='storage_classes',
         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+    copy_opts.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
 
     copy_opts.add_argument(
         'object_uuid',
@@ -127,7 +133,7 @@ def main():
     else:
         logger.setLevel(logging.INFO)
 
-    if not args.source_arvados:
+    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
         args.source_arvados = args.object_uuid[:5]
 
     # Create API clients for the source and destination instances
@@ -150,6 +156,8 @@ def main():
     elif t == 'Group':
         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+    elif t == 'httpURL':
+        result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
     else:
         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
 
@@ -753,6 +761,10 @@ def git_rev_parse(rev, repo):
 def uuid_type(api, object_uuid):
     if re.match(arvados.util.keep_locator_pattern, object_uuid):
         return 'Collection'
+
+    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+        return 'httpURL'
+
     p = object_uuid.split('-')
     if len(p) == 3:
         type_prefix = p[1]
@@ -762,6 +774,27 @@ def uuid_type(api, object_uuid):
                 return k
     return None
 
+
+def copy_from_http(url, src, dst, args):
+
+    project_uuid = args.project_uuid
+    varying_url_params = args.varying_url_params
+    prefer_cached_downloads = args.prefer_cached_downloads
+
+    cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+                                                   varying_url_params=varying_url_params,
+                                                   prefer_cached_downloads=prefer_cached_downloads)
+    if cached[2] is not None:
+        return copy_collection(cached[2], src, dst, args)
+
+    cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+                                               varying_url_params=varying_url_params,
+                                               prefer_cached_downloads=prefer_cached_downloads)
+
+    if cached is not None:
+        return {"uuid": cached[2]}
+
+
 def abort(msg, code=1):
     logger.info("arv-copy: %s", msg)
     exit(code)
diff --git a/sdk/python/arvados/http_to_keep.py b/sdk/python/arvados/http_to_keep.py
index 16c3dc4778..f2087e4e29 100644
--- a/sdk/python/arvados/http_to_keep.py
+++ b/sdk/python/arvados/http_to_keep.py
@@ -238,16 +238,10 @@ def _etag_quote(etag):
         return '"' + etag + '"'
 
 
-def http_to_keep(api, project_uuid, url,
-                 utcnow=datetime.datetime.utcnow, varying_url_params="",
-                 prefer_cached_downloads=False):
-    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
-
-    Before downloading the URL, checks to see if the URL already
-    exists in Keep and applies HTTP caching policy, the
-    varying_url_params and prefer_cached_downloads flags in order to
-    decide whether to use the version in Keep or re-download it.
-    """
+def check_cached_url(api, project_uuid, url, etags,
+                     utcnow=datetime.datetime.utcnow,
+                     varying_url_params="",
+                     prefer_cached_downloads=False):
 
     logger.info("Checking Keep for %s", url)
 
@@ -270,8 +264,6 @@ def http_to_keep(api, project_uuid, url,
 
     now = utcnow()
 
-    etags = {}
-
     curldownloader = _Downloader(api)
 
     for item in items:
@@ -287,13 +279,13 @@ def http_to_keep(api, project_uuid, url,
         if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
             # HTTP caching rules say we should use the cache
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return (item["portable_data_hash"], next(iter(cr.keys())) )
+            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
 
         if not _changed(cache_url, clean_url, properties, now, curldownloader):
             # Etag didn't change, same content, just update headers
             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return (item["portable_data_hash"], next(iter(cr.keys())))
+            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
 
         for etagstr in ("Etag", "ETag"):
             if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
@@ -301,6 +293,31 @@ def http_to_keep(api, project_uuid, url,
 
     logger.debug("Found ETag values %s", etags)
 
+    return (None, None, None, clean_url, now)
+
+
+def http_to_keep(api, project_uuid, url,
+                 utcnow=datetime.datetime.utcnow, varying_url_params="",
+                 prefer_cached_downloads=False):
+    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+    Before downloading the URL, checks to see if the URL already
+    exists in Keep and applies HTTP caching policy, the
+    varying_url_params and prefer_cached_downloads flags in order to
+    decide whether to use the version in Keep or re-download it.
+    """
+
+    etags = {}
+    cache_result = check_cached_url(api, project_uuid, url, etags,
+                                    utcnow, varying_url_params,
+                                    prefer_cached_downloads)
+
+    if cache_result[0] is not None:
+        return cache_result
+
+    clean_url = cache_result[3]
+    now = cache_result[4]
+
     properties = {}
     headers = {}
     if etags:
@@ -309,6 +326,8 @@ def http_to_keep(api, project_uuid, url,
 
     logger.info("Beginning download of %s", url)
 
+    curldownloader = _Downloader(api)
+
     req = curldownloader.download(url, headers)
 
     c = curldownloader.collection
@@ -344,4 +363,4 @@ def http_to_keep(api, project_uuid, url,
 
     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
 
-    return (c.portable_data_hash(), curldownloader.name)
+    return (c.portable_data_hash(), curldownloader.name, c.manifest_locator())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list