[arvados] created: 2.7.0-4963-g66997af197
git repository hosting
git at public.arvados.org
Thu Oct 5 19:19:35 UTC 2023
at 66997af197a01e972764bc395feecb92f3bf0a94 (commit)
commit 66997af197a01e972764bc395feecb92f3bf0a94
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Sep 11 16:45:44 2023 -0400
20937: fix deadlock with duplicated blocks
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index c6bbe142c2..d9b8bd47dc 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -591,10 +591,12 @@ def copy_collection(obj_uuid, src, dst, args):
# block hashes we want to get, but these are small
get_queue = queue.Queue()
+ threadcount = 4
+
# the put queue contains full data blocks
# and if 'get' is faster than 'put' we could end up consuming
# a great deal of RAM if it isn't bounded.
- put_queue = queue.Queue(4)
+ put_queue = queue.Queue(threadcount)
transfer_error = []
def get_thread():
@@ -608,6 +610,8 @@ def copy_collection(obj_uuid, src, dst, args):
blockhash = arvados.KeepLocator(word).md5sum
with lock:
if blockhash in dst_locators:
+ # Already uploaded
+ get_queue.task_done()
continue
try:
@@ -638,6 +642,8 @@ def copy_collection(obj_uuid, src, dst, args):
blockhash = loc.md5sum
with lock:
if blockhash in dst_locators:
+ # Already uploaded
+ put_queue.task_done()
continue
try:
@@ -672,20 +678,14 @@ def copy_collection(obj_uuid, src, dst, args):
get_queue.put(word)
- get_queue.put(None)
- get_queue.put(None)
- get_queue.put(None)
- get_queue.put(None)
+ for i in range(0, threadcount):
+ get_queue.put(None)
- threading.Thread(target=get_thread, daemon=True).start()
- threading.Thread(target=get_thread, daemon=True).start()
- threading.Thread(target=get_thread, daemon=True).start()
- threading.Thread(target=get_thread, daemon=True).start()
+ for i in range(0, threadcount):
+ threading.Thread(target=get_thread, daemon=True).start()
- threading.Thread(target=put_thread, daemon=True).start()
- threading.Thread(target=put_thread, daemon=True).start()
- threading.Thread(target=put_thread, daemon=True).start()
- threading.Thread(target=put_thread, daemon=True).start()
+ for i in range(0, threadcount):
+ threading.Thread(target=put_thread, daemon=True).start()
get_queue.join()
put_queue.join()
commit fcdb05d9562984d2414c15c2b6f34f7ff95c6ef6
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Sep 11 16:10:08 2023 -0400
20937: Set limit on 'put' queue
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index b05886d790..c6bbe142c2 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -586,8 +586,15 @@ def copy_collection(obj_uuid, src, dst, args):
# again and build dst_manifest
lock = threading.Lock()
+
+ # the get queue should be unbounded because we'll add all the
+ # block hashes we want to get, but these are small
get_queue = queue.Queue()
- put_queue = queue.Queue()
+
+ # the put queue contains full data blocks
+ # and if 'get' is faster than 'put' we could end up consuming
+ # a great deal of RAM if it isn't bounded.
+ put_queue = queue.Queue(4)
transfer_error = []
def get_thread():
commit 2d8e23a659259186e098d119e84407a216d5be6b
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Sep 11 15:57:53 2023 -0400
20937: Fix in -> is typo
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 95e4f61d17..b05886d790 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -168,7 +168,7 @@ def main():
# If no exception was thrown and the response does not have an
# error_token field, presume success
- if result in None or 'error_token' in result or 'uuid' not in result:
+ if result is None or 'error_token' in result or 'uuid' not in result:
if result:
logger.error("API server returned an error result: {}".format(result))
exit(1)
commit a4e232d9de469211536e1733520e2cb54bce9f95
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Sep 11 15:56:18 2023 -0400
20937: Handle None result
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index c0a0e312f3..95e4f61d17 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -168,8 +168,9 @@ def main():
# If no exception was thrown and the response does not have an
# error_token field, presume success
- if 'error_token' in result or 'uuid' not in result:
- logger.error("API server returned an error result: {}".format(result))
+ if result in None or 'error_token' in result or 'uuid' not in result:
+ if result:
+ logger.error("API server returned an error result: {}".format(result))
exit(1)
print(result['uuid'])
commit ec4ad3f3b7ac2126d36479cd63b58135a98a9f33
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Sep 11 13:02:42 2023 -0400
20937: Fix pathmapper use of http_to_keep
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 539188fddd..448facf776 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -109,9 +109,10 @@ class ArvPathMapper(PathMapper):
# passthrough, we'll download it later.
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
else:
- keepref = "keep:%s/%s" % http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+ results = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
+ keepref = "keep:%s/%s" % (results[0], results[1])
logger.info("%s is %s", src, keepref)
self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
except Exception as e:
commit 10801e63c5029a2fffc3599235dddab62a2f3480
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Sep 11 10:02:26 2023 -0400
20937: Add error handling to threaded collection copy
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index b26445b26d..c0a0e312f3 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -587,11 +587,14 @@ def copy_collection(obj_uuid, src, dst, args):
lock = threading.Lock()
get_queue = queue.Queue()
put_queue = queue.Queue()
+ transfer_error = []
def get_thread():
while True:
word = get_queue.get()
if word is None:
+ put_queue.put(None)
+ get_queue.task_done()
return
blockhash = arvados.KeepLocator(word).md5sum
@@ -599,15 +602,27 @@ def copy_collection(obj_uuid, src, dst, args):
if blockhash in dst_locators:
continue
- logger.debug("Getting block %s", word)
- data = src_keep.get(word)
- put_queue.put((word, data))
- get_queue.task_done()
+ try:
+ logger.debug("Getting block %s", word)
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ except e:
+ logger.error("Error getting block %s: %s", word, e)
+ transfer_error.append(e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ except queue.Empty:
+ pass
+ finally:
+ get_queue.task_done()
def put_thread():
while True:
item = put_queue.get()
if item is None:
+ put_queue.task_done()
return
word, data = item
@@ -617,15 +632,25 @@ def copy_collection(obj_uuid, src, dst, args):
if blockhash in dst_locators:
continue
- logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
- dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
- with lock:
- dst_locators[blockhash] = dst_locator
- bytes_written[0] += loc.size
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
-
- put_queue.task_done()
+ try:
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written[0] += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+ except e:
+ logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
+ try:
+ # Drain the 'get' queue so we end early
+ while True:
+ get_queue.get(False)
+ except queue.Empty:
+ pass
+ transfer_error.append(e)
+ finally:
+ put_queue.task_done()
for line in manifest.splitlines():
words = line.split()
@@ -639,6 +664,11 @@ def copy_collection(obj_uuid, src, dst, args):
get_queue.put(word)
+ get_queue.put(None)
+ get_queue.put(None)
+ get_queue.put(None)
+ get_queue.put(None)
+
threading.Thread(target=get_thread, daemon=True).start()
threading.Thread(target=get_thread, daemon=True).start()
threading.Thread(target=get_thread, daemon=True).start()
@@ -652,6 +682,9 @@ def copy_collection(obj_uuid, src, dst, args):
get_queue.join()
put_queue.join()
+ if len(transfer_error) > 0:
+ return {"error_token": "Failed to transfer blocks"}
+
for line in manifest.splitlines():
words = line.split()
dst_manifest.write(words[0])
commit 66a1e8588bcefb2886b4ed3e34d5f748394c4c9d
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 22:46:44 2023 -0400
20937: Better handling of repeated blocks
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index adba6be68c..b26445b26d 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -593,6 +593,12 @@ def copy_collection(obj_uuid, src, dst, args):
word = get_queue.get()
if word is None:
return
+
+ blockhash = arvados.KeepLocator(word).md5sum
+ with lock:
+ if blockhash in dst_locators:
+ continue
+
logger.debug("Getting block %s", word)
data = src_keep.get(word)
put_queue.put((word, data))
@@ -603,9 +609,14 @@ def copy_collection(obj_uuid, src, dst, args):
item = put_queue.get()
if item is None:
return
+
word, data = item
loc = arvados.KeepLocator(word)
blockhash = loc.md5sum
+ with lock:
+ if blockhash in dst_locators:
+ continue
+
logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
with lock:
@@ -625,14 +636,7 @@ def copy_collection(obj_uuid, src, dst, args):
# If 'word' can't be parsed as a locator,
# presume it's a filename.
continue
- blockhash = loc.md5sum
- # copy this block if we haven't seen it before
- # (otherwise, just reuse the existing dst_locator)
- with lock:
- if blockhash in dst_locators:
- continue
- # queue it up.
get_queue.put(word)
threading.Thread(target=get_thread, daemon=True).start()
commit 20d7f7ebaaf9e1841d48a5048df8a7193b7c9a8e
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 22:33:51 2023 -0400
20937: Fix progress writer
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 64523b4750..adba6be68c 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -593,6 +593,7 @@ def copy_collection(obj_uuid, src, dst, args):
word = get_queue.get()
if word is None:
return
+ logger.debug("Getting block %s", word)
data = src_keep.get(word)
put_queue.put((word, data))
get_queue.task_done()
@@ -605,10 +606,14 @@ def copy_collection(obj_uuid, src, dst, args):
word, data = item
loc = arvados.KeepLocator(word)
blockhash = loc.md5sum
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
with lock:
dst_locators[blockhash] = dst_locator
bytes_written[0] += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
put_queue.task_done()
for line in manifest.splitlines():
@@ -626,9 +631,6 @@ def copy_collection(obj_uuid, src, dst, args):
with lock:
if blockhash in dst_locators:
continue
- logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
# queue it up.
get_queue.put(word)
commit 822ba3d61d93ff19c41861ddf1be2fcca20765fb
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Oct 5 15:14:31 2023 -0400
20937: Parallel collection copy
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 8e58b0ebed..64523b4750 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -36,6 +36,9 @@ import logging
import tempfile
import urllib.parse
import io
+import json
+import queue
+import threading
import arvados
import arvados.config
@@ -566,42 +569,102 @@ def copy_collection(obj_uuid, src, dst, args):
dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
dst_manifest = io.StringIO()
dst_locators = {}
- bytes_written = 0
+ bytes_written = [0]
bytes_expected = total_collection_size(manifest)
if args.progress:
progress_writer = ProgressWriter(human_progress)
else:
progress_writer = None
+ # go through the words
+ # put each block loc into 'get' queue
+ # 'get' threads get block and put it into 'put' queue
+ # 'put' threads put block and then update dst_locators
+ #
+ # after going through the whole manifest we go back through it
+ # again and build dst_manifest
+
+ lock = threading.Lock()
+ get_queue = queue.Queue()
+ put_queue = queue.Queue()
+
+ def get_thread():
+ while True:
+ word = get_queue.get()
+ if word is None:
+ return
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ get_queue.task_done()
+
+ def put_thread():
+ while True:
+ item = put_queue.get()
+ if item is None:
+ return
+ word, data = item
+ loc = arvados.KeepLocator(word)
+ blockhash = loc.md5sum
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written[0] += loc.size
+ put_queue.task_done()
+
for line in manifest.splitlines():
words = line.split()
- dst_manifest.write(words[0])
for word in words[1:]:
try:
loc = arvados.KeepLocator(word)
except ValueError:
# If 'word' can't be parsed as a locator,
# presume it's a filename.
- dst_manifest.write(' ')
- dst_manifest.write(word)
continue
blockhash = loc.md5sum
# copy this block if we haven't seen it before
# (otherwise, just reuse the existing dst_locator)
- if blockhash not in dst_locators:
+ with lock:
+ if blockhash in dst_locators:
+ continue
logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
- data = src_keep.get(word)
- dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
- dst_locators[blockhash] = dst_locator
- bytes_written += loc.size
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
+ # queue it up.
+ get_queue.put(word)
+
+ threading.Thread(target=get_thread, daemon=True).start()
+ threading.Thread(target=get_thread, daemon=True).start()
+ threading.Thread(target=get_thread, daemon=True).start()
+ threading.Thread(target=get_thread, daemon=True).start()
+
+ threading.Thread(target=put_thread, daemon=True).start()
+ threading.Thread(target=put_thread, daemon=True).start()
+ threading.Thread(target=put_thread, daemon=True).start()
+ threading.Thread(target=put_thread, daemon=True).start()
+
+ get_queue.join()
+ put_queue.join()
+
+ for line in manifest.splitlines():
+ words = line.split()
+ dst_manifest.write(words[0])
+ for word in words[1:]:
+ try:
+ loc = arvados.KeepLocator(word)
+ except ValueError:
+ # If 'word' can't be parsed as a locator,
+ # presume it's a filename.
+ dst_manifest.write(' ')
+ dst_manifest.write(word)
+ continue
+ blockhash = loc.md5sum
dst_manifest.write(' ')
dst_manifest.write(dst_locators[blockhash])
dst_manifest.write("\n")
if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
progress_writer.finish()
# Copy the manifest and save the collection.
commit 65b22662b8a9370e9a85d8de639283c378a3dad1
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 21:52:14 2023 -0400
20937: Add http import capability to arv-copy
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 41b500a52f..8e58b0ebed 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -43,6 +43,7 @@ import arvados.keep
import arvados.util
import arvados.commands._util as arv_cmd
import arvados.commands.keepdocker
+import arvados.http_to_keep
import ruamel.yaml as yaml
from arvados._version import __version__
@@ -105,6 +106,11 @@ def main():
copy_opts.add_argument(
'--storage-classes', dest='storage_classes',
help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+ copy_opts.add_argument("--varying-url-params", type=str, default="",
+ help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+ copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+ help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
copy_opts.add_argument(
'object_uuid',
@@ -125,7 +131,7 @@ def main():
else:
logger.setLevel(logging.INFO)
- if not args.source_arvados:
+ if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
args.source_arvados = args.object_uuid[:5]
# Create API clients for the source and destination instances
@@ -148,6 +154,8 @@ def main():
elif t == 'Group':
set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+ elif t == 'httpURL':
+ result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
else:
abort("cannot copy object {} of type {}".format(args.object_uuid, t))
@@ -756,6 +764,10 @@ def git_rev_parse(rev, repo):
def uuid_type(api, object_uuid):
if re.match(arvados.util.keep_locator_pattern, object_uuid):
return 'Collection'
+
+ if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+ return 'httpURL'
+
p = object_uuid.split('-')
if len(p) == 3:
type_prefix = p[1]
@@ -765,6 +777,27 @@ def uuid_type(api, object_uuid):
return k
return None
+
+def copy_from_http(url, src, dst, args):
+
+ project_uuid = args.project_uuid
+ varying_url_params = args.varying_url_params
+ prefer_cached_downloads = args.prefer_cached_downloads
+
+ cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+ if cached[2] is not None:
+ return copy_collection(cached[2], src, dst, args)
+
+ cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+
+ if cached is not None:
+ return {"uuid": cached[2]}
+
+
def abort(msg, code=1):
logger.info("arv-copy: %s", msg)
exit(code)
diff --git a/sdk/python/arvados/http_to_keep.py b/sdk/python/arvados/http_to_keep.py
index 16c3dc4778..f2087e4e29 100644
--- a/sdk/python/arvados/http_to_keep.py
+++ b/sdk/python/arvados/http_to_keep.py
@@ -238,16 +238,10 @@ def _etag_quote(etag):
return '"' + etag + '"'
-def http_to_keep(api, project_uuid, url,
- utcnow=datetime.datetime.utcnow, varying_url_params="",
- prefer_cached_downloads=False):
- """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
-
- Before downloading the URL, checks to see if the URL already
- exists in Keep and applies HTTP caching policy, the
- varying_url_params and prefer_cached_downloads flags in order to
- decide whether to use the version in Keep or re-download it.
- """
+def check_cached_url(api, project_uuid, url, etags,
+ utcnow=datetime.datetime.utcnow,
+ varying_url_params="",
+ prefer_cached_downloads=False):
logger.info("Checking Keep for %s", url)
@@ -270,8 +264,6 @@ def http_to_keep(api, project_uuid, url,
now = utcnow()
- etags = {}
-
curldownloader = _Downloader(api)
for item in items:
@@ -287,13 +279,13 @@ def http_to_keep(api, project_uuid, url,
if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
# HTTP caching rules say we should use the cache
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return (item["portable_data_hash"], next(iter(cr.keys())) )
+ return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
if not _changed(cache_url, clean_url, properties, now, curldownloader):
# Etag didn't change, same content, just update headers
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return (item["portable_data_hash"], next(iter(cr.keys())))
+ return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
for etagstr in ("Etag", "ETag"):
if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
@@ -301,6 +293,31 @@ def http_to_keep(api, project_uuid, url,
logger.debug("Found ETag values %s", etags)
+ return (None, None, None, clean_url, now)
+
+
+def http_to_keep(api, project_uuid, url,
+ utcnow=datetime.datetime.utcnow, varying_url_params="",
+ prefer_cached_downloads=False):
+ """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+ Before downloading the URL, checks to see if the URL already
+ exists in Keep and applies HTTP caching policy, the
+ varying_url_params and prefer_cached_downloads flags in order to
+ decide whether to use the version in Keep or re-download it.
+ """
+
+ etags = {}
+ cache_result = check_cached_url(api, project_uuid, url, etags,
+ utcnow, varying_url_params,
+ prefer_cached_downloads)
+
+ if cache_result[0] is not None:
+ return cache_result
+
+ clean_url = cache_result[3]
+ now = cache_result[4]
+
properties = {}
headers = {}
if etags:
@@ -309,6 +326,8 @@ def http_to_keep(api, project_uuid, url,
logger.info("Beginning download of %s", url)
+ curldownloader = _Downloader(api)
+
req = curldownloader.download(url, headers)
c = curldownloader.collection
@@ -344,4 +363,4 @@ def http_to_keep(api, project_uuid, url,
api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
- return (c.portable_data_hash(), curldownloader.name)
+ return (c.portable_data_hash(), curldownloader.name, c.manifest_locator())
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list