[ARVADOS] updated: 41c5cc5a3731417a31c8db685e78cb795bbbe91b
git at public.curoverse.com
git at public.curoverse.com
Fri Oct 17 11:16:59 EDT 2014
Summary of changes:
sdk/python/arvados/commands/copy.py | 154 ++++++++++++++++++++++++++----------
1 file changed, 112 insertions(+), 42 deletions(-)
via 41c5cc5a3731417a31c8db685e78cb795bbbe91b (commit)
from b7f67c80916c2efa0c234ab5f4e92c24d47223c5 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 41c5cc5a3731417a31c8db685e78cb795bbbe91b
Author: Tim Pierce <twp at curoverse.com>
Date: Thu Oct 16 14:46:24 2014 -0400
3699: report on collection copying progress
diff --git a/sdk/python/arvados/commands/copy.py b/sdk/python/arvados/commands/copy.py
index c8d3818..3fa554e 100755
--- a/sdk/python/arvados/commands/copy.py
+++ b/sdk/python/arvados/commands/copy.py
@@ -48,6 +48,12 @@ def main():
'-v', '--verbose', dest='verbose', action='store_true',
help='Verbose output.')
parser.add_argument(
+ '--progress', dest='progress', action='store_true',
+ help='Report progress on copying collections. (default)')
+ parser.add_argument(
+ '--no-progress', dest='progress', action='store_false',
+ help='Do not report progress on copying collections.')
+ parser.add_argument(
'-f', '--force', dest='force', action='store_true',
help='Perform copy even if the object appears to exist at the remote destination.')
parser.add_argument(
@@ -71,6 +77,7 @@ def main():
parser.add_argument(
'object_uuid',
help='The UUID of the object to be copied.')
+ parser.set_defaults(progress=True)
parser.set_defaults(recursive=True)
args = parser.parse_args()
@@ -89,20 +96,14 @@ def main():
if t == 'Collection':
result = copy_collection(args.object_uuid,
src_arv, dst_arv,
- force=args.force)
+ args)
elif t == 'PipelineInstance':
result = copy_pipeline_instance(args.object_uuid,
src_arv, dst_arv,
- args.dst_git_repo,
- dst_project=args.project_uuid,
- recursive=args.recursive,
- force=args.force)
+ args)
elif t == 'PipelineTemplate':
result = copy_pipeline_template(args.object_uuid,
- src_arv, dst_arv,
- args.dst_git_repo,
- recursive=args.recursive,
- force=args.force)
+ src_arv, dst_arv, args)
else:
abort("cannot copy object {} of type {}".format(args.object_uuid, t))
@@ -160,11 +161,11 @@ def api_for_instance(instance_name):
abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
return client
-# copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
+# copy_pipeline_instance(pi_uuid, src, dst, args)
#
# Copies a pipeline instance identified by pi_uuid from src to dst.
#
-# If the 'recursive' option evaluates to True:
+# If the args.recursive option is set:
# 1. Copies all input collections
# * For each component in the pipeline, include all collections
# listed as job dependencies for that component)
@@ -172,8 +173,6 @@ def api_for_instance(instance_name):
# 3. Copy git repositories
# 4. Copy the pipeline template
#
-# The 'force' option is passed through to copy_collections.
-#
# The only changes made to the copied pipeline instance are:
# 1. The original pipeline instance UUID is preserved in
# the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
@@ -181,23 +180,21 @@ def api_for_instance(instance_name):
# 3. The owner_uuid of the instance is changed to the user who
# copied it.
#
-def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True, force=False):
+def copy_pipeline_instance(pi_uuid, src, dst, args):
# Fetch the pipeline instance record.
pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
- if recursive:
- if not dst_git_repo:
+ if args.recursive:
+ if not args.dst_git_repo:
abort('--dst-git-repo is required when copying a pipeline recursively.')
# Copy the pipeline template and save the copied template.
if pi.get('pipeline_template_uuid', None):
pt = copy_pipeline_template(pi['pipeline_template_uuid'],
- src, dst,
- dst_git_repo,
- recursive=True)
+ src, dst, args)
# Copy input collections, docker images and git repos.
- pi = copy_collections(pi, src, dst, force)
- copy_git_repos(pi, src, dst, dst_git_repo)
+ pi = copy_collections(pi, src, dst, args)
+ copy_git_repos(pi, src, dst, args.dst_git_repo)
# Update the fields of the pipeline instance with the copied
# pipeline template.
@@ -224,19 +221,19 @@ def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, re
new_pi = dst.pipeline_instances().create(body=pi).execute()
return new_pi
-# copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
+# copy_pipeline_template(pt_uuid, src, dst, args)
#
# Copies a pipeline template identified by pt_uuid from src to dst.
#
-# If the 'recursive' option evaluates to true, also copy any collections,
-# docker images and git repositories that this template references.
+# If args.recursive is True, also copy any collections, docker
+# images and git repositories that this template references.
#
# The owner_uuid of the new template is changed to that of the user
# who copied the template.
#
# Returns the copied pipeline template object.
#
-def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, force=False):
+def copy_pipeline_template(pt_uuid, src, dst, args):
# fetch the pipeline template from the source instance
pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
@@ -244,8 +241,8 @@ def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, forc
if not dst_git_repo:
abort('--dst-git-repo is required when copying a pipeline recursively.')
# Copy input collections, docker images and git repos.
- pt = copy_collections(pt, src, dst, force)
- copy_git_repos(pt, src, dst, dst_git_repo)
+ pt = copy_collections(pt, src, dst, args)
+ copy_git_repos(pt, src, dst, args.dst_git_repo)
pt['description'] = "Pipeline template copied from {}\n\n{}".format(
pt_uuid, pt.get('description', ''))
@@ -256,7 +253,7 @@ def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, forc
return dst.pipeline_templates().create(body=pt).execute()
-# copy_collections(obj, src, dst)
+# copy_collections(obj, src, dst, args)
#
# Recursively copies all collections referenced by 'obj' from src
# to dst.
@@ -264,17 +261,17 @@ def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, forc
# Returns a copy of obj with any old collection uuids replaced by
# the new ones.
#
-def copy_collections(obj, src, dst, force=False):
+def copy_collections(obj, src, dst, args):
if type(obj) in [str, unicode]:
if uuid_type(src, obj) == 'Collection':
- newc = copy_collection(obj, src, dst, force)
+ newc = copy_collection(obj, src, dst, args)
if obj != newc['uuid'] and obj != newc['portable_data_hash']:
return newc['uuid']
return obj
elif type(obj) == dict:
- return {v: copy_collections(obj[v], src, dst, force) for v in obj}
+ return {v: copy_collections(obj[v], src, dst, args) for v in obj}
elif type(obj) == list:
- return [copy_collections(v, src, dst, force) for v in obj]
+ return [copy_collections(v, src, dst, force, args) for v in obj]
return obj
# copy_git_repos(p, src, dst, dst_repo)
@@ -321,15 +318,37 @@ def copy_git_repos(p, src, dst, dst_repo):
if 'supplied_script_version' in j:
j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
-# copy_collection(obj_uuid, src, dst, force)
+def total_collection_size(manifest_text):
+ """Return the total number of bytes in this collection (excluding
+ duplicate blocks)."""
+
+ total_bytes = 0
+ locators_seen = {}
+ for line in manifest_text.splitlines():
+ words = line.split()
+ for word in words[1:]:
+ try:
+ loc = arvados.KeepLocator(word)
+ except ValueError:
+ continue # this word isn't a locator, skip it
+ if loc.md5sum not in locators_seen:
+ locators_seen[loc.md5sum] = True
+ total_bytes += loc.size
+
+ return total_bytes
+
+# copy_collection(obj_uuid, src, dst, args)
#
# Copies the collection identified by obj_uuid from src to dst.
# Returns the collection object created at dst.
#
+# If args.progress is True, produce a human-friendly progress
+# report.
+#
# If a collection with the desired portable_data_hash already
-# exists at dst, and the 'force' argument is False, copy_collection
-# returns the existing collection without copying any blocks.
-# Otherwise (if no collection exists or if 'force' is True)
+# exists at dst, and args.force is False, copy_collection returns
+# the existing collection without copying any blocks. Otherwise
+# (if no collection exists or if args.force is True)
# copy_collection copies all of the collection data blocks from src
# to dst.
#
@@ -340,13 +359,13 @@ def copy_git_repos(p, src, dst, dst_repo):
# the manifest block, ensures that the collection's manifest
# hash will not change.
#
-def copy_collection(obj_uuid, src, dst, force=False):
+def copy_collection(obj_uuid, src, dst, args):
c = src.collections().get(uuid=obj_uuid).execute()
# If a collection with this hash already exists at the
# destination, and 'force' is not true, just return that
# collection.
- if not force:
+ if not args.force:
if 'portable_data_hash' in c:
colhash = c['portable_data_hash']
else:
@@ -355,10 +374,10 @@ def copy_collection(obj_uuid, src, dst, force=False):
filters=[['portable_data_hash', '=', colhash]]
).execute()
if dstcol['items_available'] > 0:
- logger.info("Skipping collection %s (already at dst)", obj_uuid)
+ logger.debug("Skipping collection %s (already at dst)", obj_uuid)
return dstcol['items'][0]
- logger.info("Copying collection %s", obj_uuid)
+ logger.debug("Copying collection %s", obj_uuid)
# Fetch the collection's manifest.
manifest = c['manifest_text']
@@ -368,9 +387,15 @@ def copy_collection(obj_uuid, src, dst, force=False):
# a new manifest as we go.
src_keep = arvados.keep.KeepClient(api_client=src, num_retries=2)
dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=2)
-
dst_manifest = ""
dst_locators = {}
+ bytes_written = 0
+ bytes_expected = total_collection_size(manifest)
+ if args.progress:
+ progress_writer = ProgressWriter(human_progress)
+ else:
+ progress_writer = None
+
for line in manifest.splitlines():
words = line.split()
dst_manifest_line = words[0]
@@ -381,10 +406,13 @@ def copy_collection(obj_uuid, src, dst, force=False):
# copy this block if we haven't seen it before
# (otherwise, just reuse the existing dst_locator)
if blockhash not in dst_locators:
- logger.info("Copying block %s (%s bytes)", blockhash, loc.size)
+ logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
+ if progress_writer:
+ progress_writer.report(obj_uuid, bytes_written, bytes_expected)
data = src_keep.get(word)
dst_locator = dst_keep.put(data)
dst_locators[blockhash] = dst_locator
+ bytes_written += loc.size
dst_manifest_line += ' ' + dst_locators[blockhash]
except ValueError:
# If 'word' can't be parsed as a locator,
@@ -392,6 +420,9 @@ def copy_collection(obj_uuid, src, dst, force=False):
dst_manifest_line += ' ' + word
dst_manifest += dst_manifest_line + "\n"
+ if progress_writer:
+ progress_writer.finish()
+
# Copy the manifest and save the collection.
logger.debug('saving {} manifest: {}'.format(obj_uuid, dst_manifest))
dst_keep.put(dst_manifest)
@@ -494,5 +525,44 @@ def abort(msg, code=1):
print >>sys.stderr, "arv-copy:", msg
exit(code)
+
+# Code for reporting on the progress of a collection upload.
+# Stolen from arvados.commands.put.ArvPutCollectionWriter
+# TODO(twp): figure out how to refactor into a shared library
+# (may involve refactoring some arvados.commands.copy.copy_collection
+# code)
+
+def machine_progress(obj_uuid, bytes_written, bytes_expected):
+ return "{} {}: {} {} written {} total\n".format(
+ sys.argv[0],
+ os.getpid(),
+ obj_uuid,
+ bytes_written,
+ -1 if (bytes_expected is None) else bytes_expected)
+
+def human_progress(obj_uuid, bytes_written, bytes_expected):
+ if bytes_expected:
+ return "\r{}: {}M / {}M {:.1%} ".format(
+ obj_uuid,
+ bytes_written >> 20, bytes_expected >> 20,
+ float(bytes_written) / bytes_expected)
+ else:
+ return "\r{}: {} ".format(obj_uuid, bytes_written)
+
+class ProgressWriter(object):
+ _progress_func = None
+ outfile = sys.stderr
+
+ def __init__(self, progress_func):
+ self._progress_func = progress_func
+
+ def report(self, obj_uuid, bytes_written, bytes_expected):
+ if self._progress_func is not None:
+ self.outfile.write(
+ self._progress_func(obj_uuid, bytes_written, bytes_expected))
+
+ def finish(self):
+ self.outfile.write("\n")
+
if __name__ == '__main__':
main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list