[arvados] created: 2.7.0-4963-g66997af197

git repository hosting git at public.arvados.org
Thu Oct 5 19:19:35 UTC 2023


        at  66997af197a01e972764bc395feecb92f3bf0a94 (commit)


commit 66997af197a01e972764bc395feecb92f3bf0a94
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 16:45:44 2023 -0400

    20937: fix deadlock with duplicated blocks
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index c6bbe142c2..d9b8bd47dc 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -591,10 +591,12 @@ def copy_collection(obj_uuid, src, dst, args):
     # block hashes we want to get, but these are small
     get_queue = queue.Queue()
 
+    threadcount = 4
+
     # the put queue contains full data blocks
     # and if 'get' is faster than 'put' we could end up consuming
     # a great deal of RAM if it isn't bounded.
-    put_queue = queue.Queue(4)
+    put_queue = queue.Queue(threadcount)
     transfer_error = []
 
     def get_thread():
@@ -608,6 +610,8 @@ def copy_collection(obj_uuid, src, dst, args):
             blockhash = arvados.KeepLocator(word).md5sum
             with lock:
                 if blockhash in dst_locators:
+                    # Already uploaded
+                    get_queue.task_done()
                     continue
 
             try:
@@ -638,6 +642,8 @@ def copy_collection(obj_uuid, src, dst, args):
             blockhash = loc.md5sum
             with lock:
                 if blockhash in dst_locators:
+                    # Already uploaded
+                    put_queue.task_done()
                     continue
 
             try:
@@ -672,20 +678,14 @@ def copy_collection(obj_uuid, src, dst, args):
 
             get_queue.put(word)
 
-    get_queue.put(None)
-    get_queue.put(None)
-    get_queue.put(None)
-    get_queue.put(None)
+    for i in range(0, threadcount):
+        get_queue.put(None)
 
-    threading.Thread(target=get_thread, daemon=True).start()
-    threading.Thread(target=get_thread, daemon=True).start()
-    threading.Thread(target=get_thread, daemon=True).start()
-    threading.Thread(target=get_thread, daemon=True).start()
+    for i in range(0, threadcount):
+        threading.Thread(target=get_thread, daemon=True).start()
 
-    threading.Thread(target=put_thread, daemon=True).start()
-    threading.Thread(target=put_thread, daemon=True).start()
-    threading.Thread(target=put_thread, daemon=True).start()
-    threading.Thread(target=put_thread, daemon=True).start()
+    for i in range(0, threadcount):
+        threading.Thread(target=put_thread, daemon=True).start()
 
     get_queue.join()
     put_queue.join()

commit fcdb05d9562984d2414c15c2b6f34f7ff95c6ef6
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 16:10:08 2023 -0400

    20937: Set limit on 'put' queue
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index b05886d790..c6bbe142c2 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -586,8 +586,15 @@ def copy_collection(obj_uuid, src, dst, args):
     # again and build dst_manifest
 
     lock = threading.Lock()
+
+    # the get queue should be unbounded because we'll add all the
+    # block hashes we want to get, but these are small
     get_queue = queue.Queue()
-    put_queue = queue.Queue()
+
+    # the put queue contains full data blocks
+    # and if 'get' is faster than 'put' we could end up consuming
+    # a great deal of RAM if it isn't bounded.
+    put_queue = queue.Queue(4)
     transfer_error = []
 
     def get_thread():

commit 2d8e23a659259186e098d119e84407a216d5be6b
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 15:57:53 2023 -0400

    20937: Fix in -> is typo
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 95e4f61d17..b05886d790 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -168,7 +168,7 @@ def main():
 
     # If no exception was thrown and the response does not have an
     # error_token field, presume success
-    if result in None or 'error_token' in result or 'uuid' not in result:
+    if result is None or 'error_token' in result or 'uuid' not in result:
         if result:
             logger.error("API server returned an error result: {}".format(result))
         exit(1)

commit a4e232d9de469211536e1733520e2cb54bce9f95
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 15:56:18 2023 -0400

    20937: Handle None result
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index c0a0e312f3..95e4f61d17 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -168,8 +168,9 @@ def main():
 
     # If no exception was thrown and the response does not have an
     # error_token field, presume success
-    if 'error_token' in result or 'uuid' not in result:
-        logger.error("API server returned an error result: {}".format(result))
+    if result in None or 'error_token' in result or 'uuid' not in result:
+        if result:
+            logger.error("API server returned an error result: {}".format(result))
         exit(1)
 
     print(result['uuid'])

commit ec4ad3f3b7ac2126d36479cd63b58135a98a9f33
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 13:02:42 2023 -0400

    20937: Fix pathmapper use of http_to_keep
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 539188fddd..448facf776 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -109,9 +109,10 @@ class ArvPathMapper(PathMapper):
                         # passthrough, we'll download it later.
                         self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
                     else:
-                        keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+                        results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
                                                               varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
                                                               prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+                        keepref = "keep:%s/%s" % (results[0], results[1])
                         logger.info("%s is %s", src, keepref)
                         self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
                 except Exception as e:

commit 10801e63c5029a2fffc3599235dddab62a2f3480
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Sep 11 10:02:26 2023 -0400

    20937: Add error handling to threaded collection copy
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index b26445b26d..c0a0e312f3 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -587,11 +587,14 @@ def copy_collection(obj_uuid, src, dst, args):
     lock = threading.Lock()
     get_queue = queue.Queue()
     put_queue = queue.Queue()
+    transfer_error = []
 
     def get_thread():
         while True:
             word = get_queue.get()
             if word is None:
+                put_queue.put(None)
+                get_queue.task_done()
                 return
 
             blockhash = arvados.KeepLocator(word).md5sum
@@ -599,15 +602,27 @@ def copy_collection(obj_uuid, src, dst, args):
                 if blockhash in dst_locators:
                     continue
 
-            logger.debug("Getting block %s", word)
-            data = src_keep.get(word)
-            put_queue.put((word, data))
-            get_queue.task_done()
+            try:
+                logger.debug("Getting block %s", word)
+                data = src_keep.get(word)
+                put_queue.put((word, data))
+            except e:
+                logger.error("Error getting block %s: %s", word, e)
+                transfer_error.append(e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                except queue.Empty:
+                    pass
+            finally:
+                get_queue.task_done()
 
     def put_thread():
         while True:
             item = put_queue.get()
             if item is None:
+                put_queue.task_done()
                 return
 
             word, data = item
@@ -617,15 +632,25 @@ def copy_collection(obj_uuid, src, dst, args):
                 if blockhash in dst_locators:
                     continue
 
-            logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
-            dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
-            with lock:
-                dst_locators[blockhash] = dst_locator
-                bytes_written[0] += loc.size
-                if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
-
-            put_queue.task_done()
+            try:
+                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+                with lock:
+                    dst_locators[blockhash] = dst_locator
+                    bytes_written[0] += loc.size
+                    if progress_writer:
+                        progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+            except e:
+                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+                try:
+                    # Drain the 'get' queue so we end early
+                    while True:
+                        get_queue.get(False)
+                except queue.Empty:
+                    pass
+                transfer_error.append(e)
+            finally:
+                put_queue.task_done()
 
     for line in manifest.splitlines():
         words = line.split()
@@ -639,6 +664,11 @@ def copy_collection(obj_uuid, src, dst, args):
 
             get_queue.put(word)
 
+    get_queue.put(None)
+    get_queue.put(None)
+    get_queue.put(None)
+    get_queue.put(None)
+
     threading.Thread(target=get_thread, daemon=True).start()
     threading.Thread(target=get_thread, daemon=True).start()
     threading.Thread(target=get_thread, daemon=True).start()
@@ -652,6 +682,9 @@ def copy_collection(obj_uuid, src, dst, args):
     get_queue.join()
     put_queue.join()
 
+    if len(transfer_error) > 0:
+        return {"error_token": "Failed to transfer blocks"}
+
     for line in manifest.splitlines():
         words = line.split()
         dst_manifest.write(words[0])

commit 66a1e8588bcefb2886b4ed3e34d5f748394c4c9d
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 22:46:44 2023 -0400

    20937: Better handling of repeated blocks
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index adba6be68c..b26445b26d 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -593,6 +593,12 @@ def copy_collection(obj_uuid, src, dst, args):
             word = get_queue.get()
             if word is None:
                 return
+
+            blockhash = arvados.KeepLocator(word).md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    continue
+
             logger.debug("Getting block %s", word)
             data = src_keep.get(word)
             put_queue.put((word, data))
@@ -603,9 +609,14 @@ def copy_collection(obj_uuid, src, dst, args):
             item = put_queue.get()
             if item is None:
                 return
+
             word, data = item
             loc = arvados.KeepLocator(word)
             blockhash = loc.md5sum
+            with lock:
+                if blockhash in dst_locators:
+                    continue
+
             logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
             dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
             with lock:
@@ -625,14 +636,7 @@ def copy_collection(obj_uuid, src, dst, args):
                 # If 'word' can't be parsed as a locator,
                 # presume it's a filename.
                 continue
-            blockhash = loc.md5sum
-            # copy this block if we haven't seen it before
-            # (otherwise, just reuse the existing dst_locator)
-            with lock:
-                if blockhash in dst_locators:
-                    continue
 
-            # queue it up.
             get_queue.put(word)
 
     threading.Thread(target=get_thread, daemon=True).start()

commit 20d7f7ebaaf9e1841d48a5048df8a7193b7c9a8e
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 22:33:51 2023 -0400

    20937: Fix progress writer
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 64523b4750..adba6be68c 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -593,6 +593,7 @@ def copy_collection(obj_uuid, src, dst, args):
             word = get_queue.get()
             if word is None:
                 return
+            logger.debug("Getting block %s", word)
             data = src_keep.get(word)
             put_queue.put((word, data))
             get_queue.task_done()
@@ -605,10 +606,14 @@ def copy_collection(obj_uuid, src, dst, args):
             word, data = item
             loc = arvados.KeepLocator(word)
             blockhash = loc.md5sum
+            logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
             dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
             with lock:
                 dst_locators[blockhash] = dst_locator
                 bytes_written[0] += loc.size
+                if progress_writer:
+                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
             put_queue.task_done()
 
     for line in manifest.splitlines():
@@ -626,9 +631,6 @@ def copy_collection(obj_uuid, src, dst, args):
             with lock:
                 if blockhash in dst_locators:
                     continue
-                logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
-                if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
 
             # queue it up.
             get_queue.put(word)

commit 822ba3d61d93ff19c41861ddf1be2fcca20765fb
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Oct 5 15:14:31 2023 -0400

    20937: Parallel collection copy
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 8e58b0ebed..64523b4750 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -36,6 +36,9 @@ import logging
 import tempfile
 import urllib.parse
 import io
+import json
+import queue
+import threading
 
 import arvados
 import arvados.config
@@ -566,42 +569,102 @@ def copy_collection(obj_uuid, src, dst, args):
     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
     dst_manifest = io.StringIO()
     dst_locators = {}
-    bytes_written = 0
+    bytes_written = [0]
     bytes_expected = total_collection_size(manifest)
     if args.progress:
         progress_writer = ProgressWriter(human_progress)
     else:
         progress_writer = None
 
+    # go through the words
+    # put each block loc into 'get' queue
+    # 'get' threads get block and put it into 'put' queue
+    # 'put' threads put block and then update dst_locators
+    #
+    # after going through the whole manifest we go back through it
+    # again and build dst_manifest
+
+    lock = threading.Lock()
+    get_queue = queue.Queue()
+    put_queue = queue.Queue()
+
+    def get_thread():
+        while True:
+            word = get_queue.get()
+            if word is None:
+                return
+            data = src_keep.get(word)
+            put_queue.put((word, data))
+            get_queue.task_done()
+
+    def put_thread():
+        while True:
+            item = put_queue.get()
+            if item is None:
+                return
+            word, data = item
+            loc = arvados.KeepLocator(word)
+            blockhash = loc.md5sum
+            dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+            with lock:
+                dst_locators[blockhash] = dst_locator
+                bytes_written[0] += loc.size
+            put_queue.task_done()
+
     for line in manifest.splitlines():
         words = line.split()
-        dst_manifest.write(words[0])
         for word in words[1:]:
             try:
                 loc = arvados.KeepLocator(word)
             except ValueError:
                 # If 'word' can't be parsed as a locator,
                 # presume it's a filename.
-                dst_manifest.write(' ')
-                dst_manifest.write(word)
                 continue
             blockhash = loc.md5sum
             # copy this block if we haven't seen it before
             # (otherwise, just reuse the existing dst_locator)
-            if blockhash not in dst_locators:
+            with lock:
+                if blockhash in dst_locators:
+                    continue
                 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
                 if progress_writer:
-                    progress_writer.report(obj_uuid, bytes_written, bytes_expected)
-                data = src_keep.get(word)
-                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
-                dst_locators[blockhash] = dst_locator
-                bytes_written += loc.size
+                    progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
+            # queue it up.
+            get_queue.put(word)
+
+    threading.Thread(target=get_thread, daemon=True).start()
+    threading.Thread(target=get_thread, daemon=True).start()
+    threading.Thread(target=get_thread, daemon=True).start()
+    threading.Thread(target=get_thread, daemon=True).start()
+
+    threading.Thread(target=put_thread, daemon=True).start()
+    threading.Thread(target=put_thread, daemon=True).start()
+    threading.Thread(target=put_thread, daemon=True).start()
+    threading.Thread(target=put_thread, daemon=True).start()
+
+    get_queue.join()
+    put_queue.join()
+
+    for line in manifest.splitlines():
+        words = line.split()
+        dst_manifest.write(words[0])
+        for word in words[1:]:
+            try:
+                loc = arvados.KeepLocator(word)
+            except ValueError:
+                # If 'word' can't be parsed as a locator,
+                # presume it's a filename.
+                dst_manifest.write(' ')
+                dst_manifest.write(word)
+                continue
+            blockhash = loc.md5sum
             dst_manifest.write(' ')
             dst_manifest.write(dst_locators[blockhash])
         dst_manifest.write("\n")
 
     if progress_writer:
-        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+        progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
         progress_writer.finish()
 
     # Copy the manifest and save the collection.

commit 65b22662b8a9370e9a85d8de639283c378a3dad1
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 21:52:14 2023 -0400

    20937: Add http import capability to arv-copy
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 41b500a52f..8e58b0ebed 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -43,6 +43,7 @@ import arvados.keep
 import arvados.util
 import arvados.commands._util as arv_cmd
 import arvados.commands.keepdocker
+import arvados.http_to_keep
 import ruamel.yaml as yaml
 
 from arvados._version import __version__
@@ -105,6 +106,11 @@ def main():
     copy_opts.add_argument(
         '--storage-classes', dest='storage_classes',
         help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+    copy_opts.add_argument("--varying-url-params", type=str, default="",
+                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
 
     copy_opts.add_argument(
         'object_uuid',
@@ -125,7 +131,7 @@ def main():
     else:
         logger.setLevel(logging.INFO)
 
-    if not args.source_arvados:
+    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
         args.source_arvados = args.object_uuid[:5]
 
     # Create API clients for the source and destination instances
@@ -148,6 +154,8 @@ def main():
     elif t == 'Group':
         set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
         result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+    elif t == 'httpURL':
+        result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
     else:
         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
 
@@ -756,6 +764,10 @@ def git_rev_parse(rev, repo):
 def uuid_type(api, object_uuid):
     if re.match(arvados.util.keep_locator_pattern, object_uuid):
         return 'Collection'
+
+    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+        return 'httpURL'
+
     p = object_uuid.split('-')
     if len(p) == 3:
         type_prefix = p[1]
@@ -765,6 +777,27 @@ def uuid_type(api, object_uuid):
                 return k
     return None
 
+
+def copy_from_http(url, src, dst, args):
+
+    project_uuid = args.project_uuid
+    varying_url_params = args.varying_url_params
+    prefer_cached_downloads = args.prefer_cached_downloads
+
+    cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+                                                   varying_url_params=varying_url_params,
+                                                   prefer_cached_downloads=prefer_cached_downloads)
+    if cached[2] is not None:
+        return copy_collection(cached[2], src, dst, args)
+
+    cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+                                               varying_url_params=varying_url_params,
+                                               prefer_cached_downloads=prefer_cached_downloads)
+
+    if cached is not None:
+        return {"uuid": cached[2]}
+
+
 def abort(msg, code=1):
     logger.info("arv-copy: %s", msg)
     exit(code)
diff --git a/sdk/python/arvados/http_to_keep.py b/sdk/python/arvados/http_to_keep.py
index 16c3dc4778..f2087e4e29 100644
--- a/sdk/python/arvados/http_to_keep.py
+++ b/sdk/python/arvados/http_to_keep.py
@@ -238,16 +238,10 @@ def _etag_quote(etag):
         return '"' + etag + '"'
 
 
-def http_to_keep(api, project_uuid, url,
-                 utcnow=datetime.datetime.utcnow, varying_url_params="",
-                 prefer_cached_downloads=False):
-    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
-
-    Before downloading the URL, checks to see if the URL already
-    exists in Keep and applies HTTP caching policy, the
-    varying_url_params and prefer_cached_downloads flags in order to
-    decide whether to use the version in Keep or re-download it.
-    """
+def check_cached_url(api, project_uuid, url, etags,
+                     utcnow=datetime.datetime.utcnow,
+                     varying_url_params="",
+                     prefer_cached_downloads=False):
 
     logger.info("Checking Keep for %s", url)
 
@@ -270,8 +264,6 @@ def http_to_keep(api, project_uuid, url,
 
     now = utcnow()
 
-    etags = {}
-
     curldownloader = _Downloader(api)
 
     for item in items:
@@ -287,13 +279,13 @@ def http_to_keep(api, project_uuid, url,
         if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
             # HTTP caching rules say we should use the cache
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return (item["portable_data_hash"], next(iter(cr.keys())) )
+            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
 
         if not _changed(cache_url, clean_url, properties, now, curldownloader):
             # Etag didn't change, same content, just update headers
             api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
             cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
-            return (item["portable_data_hash"], next(iter(cr.keys())))
+            return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
 
         for etagstr in ("Etag", "ETag"):
             if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
@@ -301,6 +293,31 @@ def http_to_keep(api, project_uuid, url,
 
     logger.debug("Found ETag values %s", etags)
 
+    return (None, None, None, clean_url, now)
+
+
+def http_to_keep(api, project_uuid, url,
+                 utcnow=datetime.datetime.utcnow, varying_url_params="",
+                 prefer_cached_downloads=False):
+    """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+    Before downloading the URL, checks to see if the URL already
+    exists in Keep and applies HTTP caching policy, the
+    varying_url_params and prefer_cached_downloads flags in order to
+    decide whether to use the version in Keep or re-download it.
+    """
+
+    etags = {}
+    cache_result = check_cached_url(api, project_uuid, url, etags,
+                                    utcnow, varying_url_params,
+                                    prefer_cached_downloads)
+
+    if cache_result[0] is not None:
+        return cache_result
+
+    clean_url = cache_result[3]
+    now = cache_result[4]
+
     properties = {}
     headers = {}
     if etags:
@@ -309,6 +326,8 @@ def http_to_keep(api, project_uuid, url,
 
     logger.info("Beginning download of %s", url)
 
+    curldownloader = _Downloader(api)
+
     req = curldownloader.download(url, headers)
 
     c = curldownloader.collection
@@ -344,4 +363,4 @@ def http_to_keep(api, project_uuid, url,
 
     api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
 
-    return (c.portable_data_hash(), curldownloader.name)
+    return (c.portable_data_hash(), curldownloader.name, c.manifest_locator())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list