[arvados] created: 2.6.0-571-g57380e658e
git repository hosting
git at public.arvados.org
Fri Sep 8 02:47:11 UTC 2023
at 57380e658e5deed7a1f7f735f88a14f1b616bfc0 (commit)
commit 57380e658e5deed7a1f7f735f88a14f1b616bfc0
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 22:46:44 2023 -0400
20937: Better handling of repeated blocks
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index f9c3259910..4efd85e3e1 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -589,6 +589,12 @@ def copy_collection(obj_uuid, src, dst, args):
word = get_queue.get()
if word is None:
return
+
+ blockhash = arvados.KeepLocator(word).md5sum
+ with lock:
+ if blockhash in dst_locators:
+ continue
+
logger.debug("Getting block %s", word)
data = src_keep.get(word)
put_queue.put((word, data))
@@ -599,9 +605,14 @@ def copy_collection(obj_uuid, src, dst, args):
item = put_queue.get()
if item is None:
return
+
word, data = item
loc = arvados.KeepLocator(word)
blockhash = loc.md5sum
+ with lock:
+ if blockhash in dst_locators:
+ continue
+
logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
with lock:
@@ -621,14 +632,7 @@ def copy_collection(obj_uuid, src, dst, args):
# If 'word' can't be parsed as a locator,
# presume it's a filename.
continue
- blockhash = loc.md5sum
- # copy this block if we haven't seen it before
- # (otherwise, just reuse the existing dst_locator)
- with lock:
- if blockhash in dst_locators:
- continue
- # queue it up.
get_queue.put(word)
threading.Thread(target=get_thread, daemon=True).start()
commit 8e4404b0f90cc1906dfd048e66a377109770a09f
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 22:33:51 2023 -0400
20937: Fix progress writer
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 3bcb4eead3..f9c3259910 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -589,6 +589,7 @@ def copy_collection(obj_uuid, src, dst, args):
word = get_queue.get()
if word is None:
return
+ logger.debug("Getting block %s", word)
data = src_keep.get(word)
put_queue.put((word, data))
get_queue.task_done()
@@ -601,10 +602,14 @@ def copy_collection(obj_uuid, src, dst, args):
word, data = item
loc = arvados.KeepLocator(word)
blockhash = loc.md5sum
+ logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
with lock:
dst_locators[blockhash] = dst_locator
bytes_written[0] += loc.size
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
put_queue.task_done()
for line in manifest.splitlines():
@@ -622,9 +627,6 @@ def copy_collection(obj_uuid, src, dst, args):
with lock:
if blockhash in dst_locators:
continue
- logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
- if progress_writer:
- progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
# queue it up.
get_queue.put(word)
commit d790e335318c4c429658dbdfbc4d4f5019b2a806
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 22:27:35 2023 -0400
20937: Parallel collection copy
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 91b78470ed..3bcb4eead3 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -37,6 +37,8 @@ import tempfile
import urllib.parse
import io
import json
+import queue
+import threading
import arvados
import arvados.config
@@ -563,42 +565,102 @@ def copy_collection(obj_uuid, src, dst, args):
dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
dst_manifest = io.StringIO()
dst_locators = {}
- bytes_written = 0
+ bytes_written = [0]
bytes_expected = total_collection_size(manifest)
if args.progress:
progress_writer = ProgressWriter(human_progress)
else:
progress_writer = None
+ # go through the words
+ # put each block loc into 'get' queue
+ # 'get' threads get block and put it into 'put' queue
+ # 'put' threads put block and then update dst_locators
+ #
+ # after going through the whole manifest we go back through it
+ # again and build dst_manifest
+
+ lock = threading.Lock()
+ get_queue = queue.Queue()
+ put_queue = queue.Queue()
+
+ def get_thread():
+ while True:
+ word = get_queue.get()
+ if word is None:
+ return
+ data = src_keep.get(word)
+ put_queue.put((word, data))
+ get_queue.task_done()
+
+ def put_thread():
+ while True:
+ item = put_queue.get()
+ if item is None:
+ return
+ word, data = item
+ loc = arvados.KeepLocator(word)
+ blockhash = loc.md5sum
+ dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
+ with lock:
+ dst_locators[blockhash] = dst_locator
+ bytes_written[0] += loc.size
+ put_queue.task_done()
+
for line in manifest.splitlines():
words = line.split()
- dst_manifest.write(words[0])
for word in words[1:]:
try:
loc = arvados.KeepLocator(word)
except ValueError:
# If 'word' can't be parsed as a locator,
# presume it's a filename.
- dst_manifest.write(' ')
- dst_manifest.write(word)
continue
blockhash = loc.md5sum
# copy this block if we haven't seen it before
# (otherwise, just reuse the existing dst_locator)
- if blockhash not in dst_locators:
+ with lock:
+ if blockhash in dst_locators:
+ continue
logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
- data = src_keep.get(word)
- dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
- dst_locators[blockhash] = dst_locator
- bytes_written += loc.size
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
+
+ # queue it up.
+ get_queue.put(word)
+
+ threading.Thread(target=get_thread, daemon=True).start()
+ threading.Thread(target=get_thread, daemon=True).start()
+ threading.Thread(target=get_thread, daemon=True).start()
+ threading.Thread(target=get_thread, daemon=True).start()
+
+ threading.Thread(target=put_thread, daemon=True).start()
+ threading.Thread(target=put_thread, daemon=True).start()
+ threading.Thread(target=put_thread, daemon=True).start()
+ threading.Thread(target=put_thread, daemon=True).start()
+
+ get_queue.join()
+ put_queue.join()
+
+ for line in manifest.splitlines():
+ words = line.split()
+ dst_manifest.write(words[0])
+ for word in words[1:]:
+ try:
+ loc = arvados.KeepLocator(word)
+ except ValueError:
+ # If 'word' can't be parsed as a locator,
+ # presume it's a filename.
+ dst_manifest.write(' ')
+ dst_manifest.write(word)
+ continue
+ blockhash = loc.md5sum
dst_manifest.write(' ')
dst_manifest.write(dst_locators[blockhash])
dst_manifest.write("\n")
if progress_writer:
- progress_writer.report(obj_uuid, bytes_written, bytes_expected)
+ progress_writer.report(obj_uuid, bytes_written[0], bytes_expected)
progress_writer.finish()
# Copy the manifest and save the collection.
commit 41541b6d4183cf390db0edccd57864b0037b1fee
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 21:52:14 2023 -0400
20937: Add http import capability to arv-copy
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 3132fbe465..91b78470ed 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -44,6 +44,7 @@ import arvados.keep
import arvados.util
import arvados.commands._util as arv_cmd
import arvados.commands.keepdocker
+import arvados.http_to_keep
import ruamel.yaml as yaml
from arvados.api import OrderedJsonModel
@@ -107,6 +108,11 @@ def main():
copy_opts.add_argument(
'--storage-classes', dest='storage_classes',
help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
+ copy_opts.add_argument("--varying-url-params", type=str, default="",
+ help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+
+ copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+ help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
copy_opts.add_argument(
'object_uuid',
@@ -127,7 +133,7 @@ def main():
else:
logger.setLevel(logging.INFO)
- if not args.source_arvados:
+ if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
args.source_arvados = args.object_uuid[:5]
# Create API clients for the source and destination instances
@@ -150,6 +156,8 @@ def main():
elif t == 'Group':
set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
+ elif t == 'httpURL':
+ result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
else:
abort("cannot copy object {} of type {}".format(args.object_uuid, t))
@@ -753,6 +761,10 @@ def git_rev_parse(rev, repo):
def uuid_type(api, object_uuid):
if re.match(arvados.util.keep_locator_pattern, object_uuid):
return 'Collection'
+
+ if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
+ return 'httpURL'
+
p = object_uuid.split('-')
if len(p) == 3:
type_prefix = p[1]
@@ -762,6 +774,27 @@ def uuid_type(api, object_uuid):
return k
return None
+
+def copy_from_http(url, src, dst, args):
+
+ project_uuid = args.project_uuid
+ varying_url_params = args.varying_url_params
+ prefer_cached_downloads = args.prefer_cached_downloads
+
+ cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+ if cached[2] is not None:
+ return copy_collection(cached[2], src, dst, args)
+
+ cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
+ varying_url_params=varying_url_params,
+ prefer_cached_downloads=prefer_cached_downloads)
+
+ if cached is not None:
+ return {"uuid": cached[2]}
+
+
def abort(msg, code=1):
logger.info("arv-copy: %s", msg)
exit(code)
diff --git a/sdk/python/arvados/http_to_keep.py b/sdk/python/arvados/http_to_keep.py
index 16c3dc4778..f2087e4e29 100644
--- a/sdk/python/arvados/http_to_keep.py
+++ b/sdk/python/arvados/http_to_keep.py
@@ -238,16 +238,10 @@ def _etag_quote(etag):
return '"' + etag + '"'
-def http_to_keep(api, project_uuid, url,
- utcnow=datetime.datetime.utcnow, varying_url_params="",
- prefer_cached_downloads=False):
- """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
-
- Before downloading the URL, checks to see if the URL already
- exists in Keep and applies HTTP caching policy, the
- varying_url_params and prefer_cached_downloads flags in order to
- decide whether to use the version in Keep or re-download it.
- """
+def check_cached_url(api, project_uuid, url, etags,
+ utcnow=datetime.datetime.utcnow,
+ varying_url_params="",
+ prefer_cached_downloads=False):
logger.info("Checking Keep for %s", url)
@@ -270,8 +264,6 @@ def http_to_keep(api, project_uuid, url,
now = utcnow()
- etags = {}
-
curldownloader = _Downloader(api)
for item in items:
@@ -287,13 +279,13 @@ def http_to_keep(api, project_uuid, url,
if prefer_cached_downloads or _fresh_cache(cache_url, properties, now):
# HTTP caching rules say we should use the cache
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return (item["portable_data_hash"], next(iter(cr.keys())) )
+ return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
if not _changed(cache_url, clean_url, properties, now, curldownloader):
# Etag didn't change, same content, just update headers
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
- return (item["portable_data_hash"], next(iter(cr.keys())))
+ return (item["portable_data_hash"], next(iter(cr.keys())), item["uuid"], clean_url, now)
for etagstr in ("Etag", "ETag"):
if etagstr in properties[cache_url] and len(properties[cache_url][etagstr]) > 2:
@@ -301,6 +293,31 @@ def http_to_keep(api, project_uuid, url,
logger.debug("Found ETag values %s", etags)
+ return (None, None, None, clean_url, now)
+
+
+def http_to_keep(api, project_uuid, url,
+ utcnow=datetime.datetime.utcnow, varying_url_params="",
+ prefer_cached_downloads=False):
+ """Download a file over HTTP and upload it to keep, with HTTP headers as metadata.
+
+ Before downloading the URL, checks to see if the URL already
+ exists in Keep and applies HTTP caching policy, the
+ varying_url_params and prefer_cached_downloads flags in order to
+ decide whether to use the version in Keep or re-download it.
+ """
+
+ etags = {}
+ cache_result = check_cached_url(api, project_uuid, url, etags,
+ utcnow, varying_url_params,
+ prefer_cached_downloads)
+
+ if cache_result[0] is not None:
+ return cache_result
+
+ clean_url = cache_result[3]
+ now = cache_result[4]
+
properties = {}
headers = {}
if etags:
@@ -309,6 +326,8 @@ def http_to_keep(api, project_uuid, url,
logger.info("Beginning download of %s", url)
+ curldownloader = _Downloader(api)
+
req = curldownloader.download(url, headers)
c = curldownloader.collection
@@ -344,4 +363,4 @@ def http_to_keep(api, project_uuid, url,
api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
- return (c.portable_data_hash(), curldownloader.name)
+ return (c.portable_data_hash(), curldownloader.name, c.manifest_locator())
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list