[ARVADOS] updated: e1ce0f420a72a205a81ee0b52ea04eaf2fb047d8

git at public.curoverse.com git at public.curoverse.com
Wed Sep 3 17:38:46 EDT 2014


Summary of changes:
 sdk/python/arvados/commands/copy.py | 121 ++++++++++++++++++++++++++++++------
 1 file changed, 101 insertions(+), 20 deletions(-)

       via  e1ce0f420a72a205a81ee0b52ea04eaf2fb047d8 (commit)
      from  5fcf5ebe7b89517edcb610bf1f1df8abf99df9ae (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e1ce0f420a72a205a81ee0b52ea04eaf2fb047d8
Author: Tim Pierce <twp at curoverse.com>
Date:   Wed Sep 3 17:36:04 2014 -0400

    3699: added copy_pipeline_instance
    
    Added copy_pipeline_instance, integrating copy_collection and
    copy_pipeline_template.  Added comments.

diff --git a/sdk/python/arvados/commands/copy.py b/sdk/python/arvados/commands/copy.py
index 554b00f..4c33c9b 100755
--- a/sdk/python/arvados/commands/copy.py
+++ b/sdk/python/arvados/commands/copy.py
@@ -28,15 +28,25 @@ import arvados.keep
 
 def main():
     logger = logging.getLogger('arvados.arv-copy')
+    logger.setLevel(logging.DEBUG)
 
     parser = argparse.ArgumentParser(
         description='Copy a pipeline instance from one Arvados instance to another.')
 
-    parser.add_argument('--recursive', dest='recursive', action='store_true')
-    parser.add_argument('--no-recursive', dest='recursive', action='store_false')
-    parser.add_argument('object_uuid')
-    parser.add_argument('source_arvados')
-    parser.add_argument('destination_arvados')
+    parser.add_argument(
+        '--recursive', dest='recursive', action='store_true',
+        help='Recursively add any objects that this object depends upon.')
+    parser.add_argument(
+        '--no-recursive', dest='recursive', action='store_false')
+    parser.add_argument(
+        'object_uuid',
+        help='The UUID of the object to be copied.')
+    parser.add_argument(
+        'source_arvados',
+        help='The name of the source Arvados instance.')
+    parser.add_argument(
+        'destination_arvados',
+        help='The name of the destination Arvados instance.')
     parser.set_defaults(recursive=True)
 
     args = parser.parse_args()
@@ -46,12 +56,12 @@ def main():
     dst_arv = api_for_instance(args.destination_arvados)
 
     # Identify the kind of object we have been given, and begin copying.
-    t = uuid_type(args.object_uuid)
-    if t == 'collection':
+    t = uuid_type(src_arv, args.object_uuid)
+    if t == 'Collection':
         result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
-    elif t == 'pipeline_instance':
+    elif t == 'PipelineInstance':
         result = copy_pipeline_instance(args.object_uuid, src=src_arv, dst=dst_arv)
-    elif t == 'pipeline_template':
+    elif t == 'PipelineTemplate':
         result = copy_pipeline_template(args.object_uuid, src=src_arv, dst=dst_arv)
     else:
         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
@@ -82,10 +92,23 @@ def api_for_instance(instance_name):
         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
     return client
 
+# copy_collection(obj_uuid, src, dst)
+#
+#    Copy the collection identified by obj_uuid from src to dst.
+#
+#    For this application, it is critical to preserve the
+#    collection's manifest hash, which is not guaranteed with the
+#    arvados.CollectionReader and arvados.CollectionWriter classes.
+#    Copying each block in the collection manually, followed by
+#    the manifest block, ensures that the collection's manifest
+#    hash will not change.
+#
 def copy_collection(obj_uuid, src=None, dst=None):
     # Fetch the collection's manifest.
     c = src.collections().get(uuid=obj_uuid).execute()
     manifest = c['manifest_text']
+    logging.debug('copying collection %s', obj_uuid)
+    logging.debug('manifest_text = %s', manifest)
 
     # Enumerate the block locators found in the manifest.
     collection_blocks = sets.Set()
@@ -100,6 +123,7 @@ def copy_collection(obj_uuid, src=None, dst=None):
     # Copy each block from src_keep to dst_keep.
     for locator in collection_blocks:
         data = src_keep.get(locator)
+        logger.debug('copying block %s', locator)
         logger.info("Retrieved %d bytes", len(data))
         dst_keep.put(data)
 
@@ -107,9 +131,60 @@ def copy_collection(obj_uuid, src=None, dst=None):
     dst_keep.put(manifest)
     return dst_keep.collections().create(manifest_text=manifest).execute()
 
+# copy_pipeline_instance(obj_uuid, src, dst)
+#
+#    Copies a pipeline instance identified by obj_uuid from src to dst.
+#
+#    If the recursive option is on:
+#      1. Copies all input collections
+#           * For each component in the pipeline, include all collections
+#             listed as job dependencies for that component)
+#      2. Copy docker images
+#      3. Copy git repositories
+#      4. Copy the pipeline template
+#
+#    The only changes made to the copied pipeline instance are:
+#      1. The original pipeline instance UUID is preserved in
+#         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
+#      2. The pipeline_template_uuid is changed to the new template uuid.
+#      3. The owner_uuid of the instance is changed to the user who
+#         copied it.
+#
 def copy_pipeline_instance(obj_uuid, src=None, dst=None):
-    raise NotImplementedError
-
+    # Fetch the pipeline instance record.
+    pi = src.pipeline_instances().get(uuid=obj_uuid).execute()
+
+    # Copy input collections (collections listed as job dependencies
+    # for any of the pipeline's components)
+    #
+    input_collections = sets.Set()
+    for cname in pi['components']:
+        comp = pi['components'][cname]
+        for c in comp['job']['dependencies']:
+            input_collections.add(c)
+
+    for c in input_collections:
+        copy_collection(c, src, dst)
+
+    # Copy the pipeline template and save the uuid of the copy
+    new_pt = copy_pipeline_template(pi['pipeline_template_uuid'], src, dst)
+
+    # Update the fields of the pipeline instance
+    pi['properties']['copied_from_pipeline_instance_uuid'] = obj_uuid
+    pi['pipeline_template_uuid'] = new_pt
+    del pi['owner_uuid']
+
+    # Create the new pipeline instance at the destination Arvados.
+    new_pi = dst.pipeline_instances().create(pipeline_instance=pi).execute()
+    return new_pi
+
+# copy_pipeline_template(obj_uuid, src, dst)
+#
+#    Copies a pipeline template identified by obj_uuid from src to dst.
+#
+#    The owner_uuid of the new template is changed to that of the user
+#    who copied the template.
+#
 def copy_pipeline_template(obj_uuid, src=None, dst=None):
     # fetch the pipeline template from the source instance
     old_pt = src.pipeline_templates().get(uuid=obj_uuid).execute()
@@ -118,15 +193,21 @@ def copy_pipeline_template(obj_uuid, src=None, dst=None):
     del old_pt['owner_uuid']
     return dst.pipeline_templates().create(body=old_pt).execute()
 
-uuid_type_map = {
-    "4zz18": "collection",
-    "d1hrv": "pipeline_instance",
-    "p5p6p": "pipeline_template",
-}
-
-def uuid_type(object_uuid):
-    type_str = object_uuid.split('-')[1]
-    return uuid_type_map.get(type_str, None)
+# uuid_type(api, object_uuid)
+#
+#    Returns the name of the class that object_uuid belongs to, based on
+#    the second field of the uuid.  This function consults the api's
+#    schema to identify the object class.
+#
+#    It returns a string such as 'Collection', 'PipelineInstance', etc.
+#
+def uuid_type(api, object_uuid):
+    type_prefix = object_uuid.split('-')[1]
+    for k in api._schema.schemas:
+        obj_class = api._schema.schemas[k].get('uuidPrefix', None)
+        if obj_class:
+            return obj_class
+    return None
 
 def abort(msg, code=1):
     print >>sys.stderr, "arv-copy:", msg

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list