[ARVADOS] updated: 41c5cc5a3731417a31c8db685e78cb795bbbe91b

git at public.curoverse.com git at public.curoverse.com
Fri Oct 17 11:16:59 EDT 2014


Summary of changes:
 sdk/python/arvados/commands/copy.py | 154 ++++++++++++++++++++++++++----------
 1 file changed, 112 insertions(+), 42 deletions(-)

       via  41c5cc5a3731417a31c8db685e78cb795bbbe91b (commit)
      from  b7f67c80916c2efa0c234ab5f4e92c24d47223c5 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 41c5cc5a3731417a31c8db685e78cb795bbbe91b
Author: Tim Pierce <twp at curoverse.com>
Date:   Thu Oct 16 14:46:24 2014 -0400

    3699: report on collection copying progress

diff --git a/sdk/python/arvados/commands/copy.py b/sdk/python/arvados/commands/copy.py
index c8d3818..3fa554e 100755
--- a/sdk/python/arvados/commands/copy.py
+++ b/sdk/python/arvados/commands/copy.py
@@ -48,6 +48,12 @@ def main():
         '-v', '--verbose', dest='verbose', action='store_true',
         help='Verbose output.')
     parser.add_argument(
+        '--progress', dest='progress', action='store_true',
+        help='Report progress on copying collections. (default)')
+    parser.add_argument(
+        '--no-progress', dest='progress', action='store_false',
+        help='Do not report progress on copying collections.')
+    parser.add_argument(
         '-f', '--force', dest='force', action='store_true',
         help='Perform copy even if the object appears to exist at the remote destination.')
     parser.add_argument(
@@ -71,6 +77,7 @@ def main():
     parser.add_argument(
         'object_uuid',
         help='The UUID of the object to be copied.')
+    parser.set_defaults(progress=True)
     parser.set_defaults(recursive=True)
 
     args = parser.parse_args()
@@ -89,20 +96,14 @@ def main():
     if t == 'Collection':
         result = copy_collection(args.object_uuid,
                                  src_arv, dst_arv,
-                                 force=args.force)
+                                 args)
     elif t == 'PipelineInstance':
         result = copy_pipeline_instance(args.object_uuid,
                                         src_arv, dst_arv,
-                                        args.dst_git_repo,
-                                        dst_project=args.project_uuid,
-                                        recursive=args.recursive,
-                                        force=args.force)
+                                        args)
     elif t == 'PipelineTemplate':
         result = copy_pipeline_template(args.object_uuid,
-                                        src_arv, dst_arv,
-                                        args.dst_git_repo,
-                                        recursive=args.recursive,
-                                        force=args.force)
+                                        src_arv, dst_arv, args)
     else:
         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
 
@@ -160,11 +161,11 @@ def api_for_instance(instance_name):
         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
     return client
 
-# copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
+# copy_pipeline_instance(pi_uuid, src, dst, args)
 #
 #    Copies a pipeline instance identified by pi_uuid from src to dst.
 #
-#    If the 'recursive' option evaluates to True:
+#    If the args.recursive option is set:
 #      1. Copies all input collections
 #           * For each component in the pipeline, include all collections
 #             listed as job dependencies for that component)
@@ -172,8 +173,6 @@ def api_for_instance(instance_name):
 #      3. Copy git repositories
 #      4. Copy the pipeline template
 #
-#    The 'force' option is passed through to copy_collections.
-#
 #    The only changes made to the copied pipeline instance are:
 #      1. The original pipeline instance UUID is preserved in
 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
@@ -181,23 +180,21 @@ def api_for_instance(instance_name):
 #      3. The owner_uuid of the instance is changed to the user who
 #         copied it.
 #
-def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True, force=False):
+def copy_pipeline_instance(pi_uuid, src, dst, args):
     # Fetch the pipeline instance record.
     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
 
-    if recursive:
-        if not dst_git_repo:
+    if args.recursive:
+        if not args.dst_git_repo:
             abort('--dst-git-repo is required when copying a pipeline recursively.')
         # Copy the pipeline template and save the copied template.
         if pi.get('pipeline_template_uuid', None):
             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
-                                        src, dst,
-                                        dst_git_repo,
-                                        recursive=True)
+                                        src, dst, args)
 
         # Copy input collections, docker images and git repos.
-        pi = copy_collections(pi, src, dst, force)
-        copy_git_repos(pi, src, dst, dst_git_repo)
+        pi = copy_collections(pi, src, dst, args)
+        copy_git_repos(pi, src, dst, args.dst_git_repo)
 
         # Update the fields of the pipeline instance with the copied
         # pipeline template.
@@ -224,19 +221,19 @@ def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, re
     new_pi = dst.pipeline_instances().create(body=pi).execute()
     return new_pi
 
-# copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
+# copy_pipeline_template(pt_uuid, src, dst, args)
 #
 #    Copies a pipeline template identified by pt_uuid from src to dst.
 #
-#    If the 'recursive' option evaluates to true, also copy any collections,
-#    docker images and git repositories that this template references.
+#    If args.recursive is True, also copy any collections, docker
+#    images and git repositories that this template references.
 #
 #    The owner_uuid of the new template is changed to that of the user
 #    who copied the template.
 #
 #    Returns the copied pipeline template object.
 #
-def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, force=False):
+def copy_pipeline_template(pt_uuid, src, dst, args):
     # fetch the pipeline template from the source instance
     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
 
@@ -244,8 +241,8 @@ def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, forc
         if not dst_git_repo:
             abort('--dst-git-repo is required when copying a pipeline recursively.')
         # Copy input collections, docker images and git repos.
-        pt = copy_collections(pt, src, dst, force)
-        copy_git_repos(pt, src, dst, dst_git_repo)
+        pt = copy_collections(pt, src, dst, args)
+        copy_git_repos(pt, src, dst, args.dst_git_repo)
 
     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
         pt_uuid, pt.get('description', ''))
@@ -256,7 +253,7 @@ def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, forc
 
     return dst.pipeline_templates().create(body=pt).execute()
 
-# copy_collections(obj, src, dst)
+# copy_collections(obj, src, dst, args)
 #
 #    Recursively copies all collections referenced by 'obj' from src
 #    to dst.
@@ -264,17 +261,17 @@ def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, forc
 #    Returns a copy of obj with any old collection uuids replaced by
 #    the new ones.
 #
-def copy_collections(obj, src, dst, force=False):
+def copy_collections(obj, src, dst, args):
     if type(obj) in [str, unicode]:
         if uuid_type(src, obj) == 'Collection':
-            newc = copy_collection(obj, src, dst, force)
+            newc = copy_collection(obj, src, dst, args)
             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
                 return newc['uuid']
         return obj
     elif type(obj) == dict:
-        return {v: copy_collections(obj[v], src, dst, force) for v in obj}
+        return {v: copy_collections(obj[v], src, dst, args) for v in obj}
     elif type(obj) == list:
-        return [copy_collections(v, src, dst, force) for v in obj]
+        return [copy_collections(v, src, dst, force, args) for v in obj]
     return obj
 
 # copy_git_repos(p, src, dst, dst_repo)
@@ -321,15 +318,37 @@ def copy_git_repos(p, src, dst, dst_repo):
                 if 'supplied_script_version' in j:
                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
 
-# copy_collection(obj_uuid, src, dst, force)
+def total_collection_size(manifest_text):
+    """Return the total number of bytes in this collection (excluding
+    duplicate blocks)."""
+
+    total_bytes = 0
+    locators_seen = {}
+    for line in manifest_text.splitlines():
+        words = line.split()
+        for word in words[1:]:
+            try:
+                loc = arvados.KeepLocator(word)
+            except ValueError:
+                continue  # this word isn't a locator, skip it
+            if loc.md5sum not in locators_seen:
+                locators_seen[loc.md5sum] = True
+                total_bytes += loc.size
+
+    return total_bytes
+
+# copy_collection(obj_uuid, src, dst, args)
 #
 #    Copies the collection identified by obj_uuid from src to dst.
 #    Returns the collection object created at dst.
 #
+#    If args.progress is True, produce a human-friendly progress
+#    report.
+#
 #    If a collection with the desired portable_data_hash already
-#    exists at dst, and the 'force' argument is False, copy_collection
-#    returns the existing collection without copying any blocks.
-#    Otherwise (if no collection exists or if 'force' is True)
+#    exists at dst, and args.force is False, copy_collection returns
+#    the existing collection without copying any blocks.  Otherwise
+#    (if no collection exists or if args.force is True)
 #    copy_collection copies all of the collection data blocks from src
 #    to dst.
 #
@@ -340,13 +359,13 @@ def copy_git_repos(p, src, dst, dst_repo):
 #    the manifest block, ensures that the collection's manifest
 #    hash will not change.
 #
-def copy_collection(obj_uuid, src, dst, force=False):
+def copy_collection(obj_uuid, src, dst, args):
     c = src.collections().get(uuid=obj_uuid).execute()
 
     # If a collection with this hash already exists at the
     # destination, and 'force' is not true, just return that
     # collection.
-    if not force:
+    if not args.force:
         if 'portable_data_hash' in c:
             colhash = c['portable_data_hash']
         else:
@@ -355,10 +374,10 @@ def copy_collection(obj_uuid, src, dst, force=False):
             filters=[['portable_data_hash', '=', colhash]]
         ).execute()
         if dstcol['items_available'] > 0:
-            logger.info("Skipping collection %s (already at dst)", obj_uuid)
+            logger.debug("Skipping collection %s (already at dst)", obj_uuid)
             return dstcol['items'][0]
 
-    logger.info("Copying collection %s", obj_uuid)
+    logger.debug("Copying collection %s", obj_uuid)
 
     # Fetch the collection's manifest.
     manifest = c['manifest_text']
@@ -368,9 +387,15 @@ def copy_collection(obj_uuid, src, dst, force=False):
     # a new manifest as we go.
     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=2)
     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=2)
-
     dst_manifest = ""
     dst_locators = {}
+    bytes_written = 0
+    bytes_expected = total_collection_size(manifest)
+    if args.progress:
+        progress_writer = ProgressWriter(human_progress)
+    else:
+        progress_writer = None
+
     for line in manifest.splitlines():
         words = line.split()
         dst_manifest_line = words[0]
@@ -381,10 +406,13 @@ def copy_collection(obj_uuid, src, dst, force=False):
                 # copy this block if we haven't seen it before
                 # (otherwise, just reuse the existing dst_locator)
                 if blockhash not in dst_locators:
-                    logger.info("Copying block %s (%s bytes)", blockhash, loc.size)
+                    logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
+                    if progress_writer:
+                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
                     data = src_keep.get(word)
                     dst_locator = dst_keep.put(data)
                     dst_locators[blockhash] = dst_locator
+                    bytes_written += loc.size
                 dst_manifest_line += ' ' + dst_locators[blockhash]
             except ValueError:
                 # If 'word' can't be parsed as a locator,
@@ -392,6 +420,9 @@ def copy_collection(obj_uuid, src, dst, force=False):
                 dst_manifest_line += ' ' + word
         dst_manifest += dst_manifest_line + "\n"
 
+    if progress_writer:
+        progress_writer.finish()
+
     # Copy the manifest and save the collection.
     logger.debug('saving {} manifest: {}'.format(obj_uuid, dst_manifest))
     dst_keep.put(dst_manifest)
@@ -494,5 +525,44 @@ def abort(msg, code=1):
     print >>sys.stderr, "arv-copy:", msg
     exit(code)
 
+
+# Code for reporting on the progress of a collection upload.
+# Stolen from arvados.commands.put.ArvPutCollectionWriter
+# TODO(twp): figure out how to refactor into a shared library
+# (may involve refactoring some arvados.commands.copy.copy_collection
+# code)
+
+def machine_progress(obj_uuid, bytes_written, bytes_expected):
+    return "{} {}: {} {} written {} total\n".format(
+        sys.argv[0],
+        os.getpid(),
+        obj_uuid,
+        bytes_written,
+        -1 if (bytes_expected is None) else bytes_expected)
+
+def human_progress(obj_uuid, bytes_written, bytes_expected):
+    if bytes_expected:
+        return "\r{}: {}M / {}M {:.1%} ".format(
+            obj_uuid,
+            bytes_written >> 20, bytes_expected >> 20,
+            float(bytes_written) / bytes_expected)
+    else:
+        return "\r{}: {} ".format(obj_uuid, bytes_written)
+
+class ProgressWriter(object):
+    _progress_func = None
+    outfile = sys.stderr
+
+    def __init__(self, progress_func):
+        self._progress_func = progress_func
+
+    def report(self, obj_uuid, bytes_written, bytes_expected):
+        if self._progress_func is not None:
+            self.outfile.write(
+                self._progress_func(obj_uuid, bytes_written, bytes_expected))
+
+    def finish(self):
+        self.outfile.write("\n")
+
 if __name__ == '__main__':
     main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list