[ARVADOS] updated: e1ce0f420a72a205a81ee0b52ea04eaf2fb047d8
git at public.curoverse.com
git at public.curoverse.com
Wed Sep 3 17:38:46 EDT 2014
Summary of changes:
sdk/python/arvados/commands/copy.py | 121 ++++++++++++++++++++++++++++++------
1 file changed, 101 insertions(+), 20 deletions(-)
via e1ce0f420a72a205a81ee0b52ea04eaf2fb047d8 (commit)
from 5fcf5ebe7b89517edcb610bf1f1df8abf99df9ae (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e1ce0f420a72a205a81ee0b52ea04eaf2fb047d8
Author: Tim Pierce <twp at curoverse.com>
Date: Wed Sep 3 17:36:04 2014 -0400
3699: added copy_pipeline_instance
Added copy_pipeline_instance, integrating copy_collection and
copy_pipeline_template. Added comments.
diff --git a/sdk/python/arvados/commands/copy.py b/sdk/python/arvados/commands/copy.py
index 554b00f..4c33c9b 100755
--- a/sdk/python/arvados/commands/copy.py
+++ b/sdk/python/arvados/commands/copy.py
@@ -28,15 +28,25 @@ import arvados.keep
def main():
logger = logging.getLogger('arvados.arv-copy')
+ logger.setLevel(logging.DEBUG)
parser = argparse.ArgumentParser(
description='Copy a pipeline instance from one Arvados instance to another.')
- parser.add_argument('--recursive', dest='recursive', action='store_true')
- parser.add_argument('--no-recursive', dest='recursive', action='store_false')
- parser.add_argument('object_uuid')
- parser.add_argument('source_arvados')
- parser.add_argument('destination_arvados')
+ parser.add_argument(
+ '--recursive', dest='recursive', action='store_true',
+ help='Recursively add any objects that this object depends upon.')
+ parser.add_argument(
+ '--no-recursive', dest='recursive', action='store_false')
+ parser.add_argument(
+ 'object_uuid',
+ help='The UUID of the object to be copied.')
+ parser.add_argument(
+ 'source_arvados',
+ help='The name of the source Arvados instance.')
+ parser.add_argument(
+ 'destination_arvados',
+ help='The name of the destination Arvados instance.')
parser.set_defaults(recursive=True)
args = parser.parse_args()
@@ -46,12 +56,12 @@ def main():
dst_arv = api_for_instance(args.destination_arvados)
# Identify the kind of object we have been given, and begin copying.
- t = uuid_type(args.object_uuid)
- if t == 'collection':
+ t = uuid_type(src_arv, args.object_uuid)
+ if t == 'Collection':
result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
- elif t == 'pipeline_instance':
+ elif t == 'PipelineInstance':
result = copy_pipeline_instance(args.object_uuid, src=src_arv, dst=dst_arv)
- elif t == 'pipeline_template':
+ elif t == 'PipelineTemplate':
result = copy_pipeline_template(args.object_uuid, src=src_arv, dst=dst_arv)
else:
abort("cannot copy object {} of type {}".format(args.object_uuid, t))
@@ -82,10 +92,23 @@ def api_for_instance(instance_name):
abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
return client
+# copy_collection(obj_uuid, src, dst)
+#
+# Copy the collection identified by obj_uuid from src to dst.
+#
+# For this application, it is critical to preserve the
+# collection's manifest hash, which is not guaranteed with the
+# arvados.CollectionReader and arvados.CollectionWriter classes.
+# Copying each block in the collection manually, followed by
+# the manifest block, ensures that the collection's manifest
+# hash will not change.
+#
def copy_collection(obj_uuid, src=None, dst=None):
# Fetch the collection's manifest.
c = src.collections().get(uuid=obj_uuid).execute()
manifest = c['manifest_text']
+ logging.debug('copying collection %s', obj_uuid)
+ logging.debug('manifest_text = %s', manifest)
# Enumerate the block locators found in the manifest.
collection_blocks = sets.Set()
@@ -100,6 +123,7 @@ def copy_collection(obj_uuid, src=None, dst=None):
# Copy each block from src_keep to dst_keep.
for locator in collection_blocks:
data = src_keep.get(locator)
+ logger.debug('copying block %s', locator)
logger.info("Retrieved %d bytes", len(data))
dst_keep.put(data)
@@ -107,9 +131,60 @@ def copy_collection(obj_uuid, src=None, dst=None):
dst_keep.put(manifest)
return dst_keep.collections().create(manifest_text=manifest).execute()
+# copy_pipeline_instance(obj_uuid, src, dst)
+#
+# Copies a pipeline instance identified by obj_uuid from src to dst.
+#
+# If the recursive option is on:
+# 1. Copies all input collections
+# * For each component in the pipeline, include all collections
+# listed as job dependencies for that component)
+# 2. Copy docker images
+# 3. Copy git repositories
+# 4. Copy the pipeline template
+#
+# The only changes made to the copied pipeline instance are:
+# 1. The original pipeline instance UUID is preserved in
+# the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
+# 2. The pipeline_template_uuid is changed to the new template uuid.
+# 3. The owner_uuid of the instance is changed to the user who
+# copied it.
+#
def copy_pipeline_instance(obj_uuid, src=None, dst=None):
- raise NotImplementedError
-
+ # Fetch the pipeline instance record.
+ pi = src.pipeline_instances().get(uuid=obj_uuid).execute()
+
+ # Copy input collections (collections listed as job dependencies
+ # for any of the pipeline's components)
+ #
+ input_collections = sets.Set()
+ for cname in pi['components']:
+ comp = pi['components'][cname]
+ for c in comp['job']['dependencies']:
+ input_collections.add(c)
+
+ for c in input_collections:
+ copy_collection(c, src, dst)
+
+ # Copy the pipeline template and save the uuid of the copy
+ new_pt = copy_pipeline_template(pi['pipeline_template_uuid'], src, dst)
+
+ # Update the fields of the pipeline instance
+ pi['properties']['copied_from_pipeline_instance_uuid'] = obj_uuid
+ pi['pipeline_template_uuid'] = new_pt
+ del pi['owner_uuid']
+
+ # Create the new pipeline instance at the destination Arvados.
+ new_pi = dst.pipeline_instances().create(pipeline_instance=pi).execute()
+ return new_pi
+
+# copy_pipeline_template(obj_uuid, src, dst)
+#
+# Copies a pipeline template identified by obj_uuid from src to dst.
+#
+# The owner_uuid of the new template is changed to that of the user
+# who copied the template.
+#
def copy_pipeline_template(obj_uuid, src=None, dst=None):
# fetch the pipeline template from the source instance
old_pt = src.pipeline_templates().get(uuid=obj_uuid).execute()
@@ -118,15 +193,21 @@ def copy_pipeline_template(obj_uuid, src=None, dst=None):
del old_pt['owner_uuid']
return dst.pipeline_templates().create(body=old_pt).execute()
-uuid_type_map = {
- "4zz18": "collection",
- "d1hrv": "pipeline_instance",
- "p5p6p": "pipeline_template",
-}
-
-def uuid_type(object_uuid):
- type_str = object_uuid.split('-')[1]
- return uuid_type_map.get(type_str, None)
+# uuid_type(api, object_uuid)
+#
+# Returns the name of the class that object_uuid belongs to, based on
+# the second field of the uuid. This function consults the api's
+# schema to identify the object class.
+#
+# It returns a string such as 'Collection', 'PipelineInstance', etc.
+#
+def uuid_type(api, object_uuid):
+ type_prefix = object_uuid.split('-')[1]
+ for k in api._schema.schemas:
+ obj_class = api._schema.schemas[k].get('uuidPrefix', None)
+ if obj_class:
+ return obj_class
+ return None
def abort(msg, code=1):
print >>sys.stderr, "arv-copy:", msg
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list