[ARVADOS] updated: 34aac296f4a0d2df0e369a9169924ef7849d6e85
git at public.curoverse.com
git at public.curoverse.com
Fri Sep 12 17:32:47 EDT 2014
Summary of changes:
sdk/cli/bin/arv | 2 +-
sdk/python/arvados/commands/copy.py | 253 +++++++++++++++++++++---------------
2 files changed, 152 insertions(+), 103 deletions(-)
via 34aac296f4a0d2df0e369a9169924ef7849d6e85 (commit)
from 4b34d2584324664897467bb902599938710c9650 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 34aac296f4a0d2df0e369a9169924ef7849d6e85
Author: Tim Pierce <twp at curoverse.com>
Date: Fri Sep 12 17:30:29 2014 -0400
3699: bug fixes and feedback
* added 'arv copy' front end to sdk/cli/bin/arv
* can supply --recursive for pipeline templates as well as pipeline
instance
* collections and git repositories are now properly renamed in the
pipeline template and instance when copying recursively
* copy_collection skips copying blocks if the collection already exists
at the destination
diff --git a/sdk/cli/bin/arv b/sdk/cli/bin/arv
index 337d9ab..f90c731 100755
--- a/sdk/cli/bin/arv
+++ b/sdk/cli/bin/arv
@@ -118,7 +118,7 @@ def check_subcommands client, arvados, subcommand, global_opts, remaining_opts
case subcommand
when 'keep'
@sub = remaining_opts.shift
- if ['get', 'put', 'ls', 'normalize'].index @sub then
+ if ['get', 'put', 'ls', 'normalize', 'copy'].index @sub then
# Native Arvados
exec `which arv-#{@sub}`.strip, *remaining_opts
elsif ['less', 'check'].index @sub then
diff --git a/sdk/python/arvados/commands/copy.py b/sdk/python/arvados/commands/copy.py
index 526304c..6c0c2a7 100755
--- a/sdk/python/arvados/commands/copy.py
+++ b/sdk/python/arvados/commands/copy.py
@@ -38,9 +38,10 @@ def main():
parser.add_argument(
'--recursive', dest='recursive', action='store_true',
- help='Recursively add any objects that this object depends upon.')
+ help='Recursively copy any dependencies for this object. (default)')
parser.add_argument(
- '--no-recursive', dest='recursive', action='store_false')
+ '--no-recursive', dest='recursive', action='store_false',
+ help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
parser.add_argument(
'--dst-git-repo', dest='dst_git_repo',
help='The name of the destination git repository.')
@@ -75,7 +76,9 @@ def main():
recursive=args.recursive,
src=src_arv, dst=dst_arv)
elif t == 'PipelineTemplate':
- result = copy_pipeline_template(args.object_uuid, src=src_arv, dst=dst_arv)
+ result = copy_pipeline_template(args.object_uuid,
+ recursive=args.recursive,
+ src=src_arv, dst=dst_arv)
else:
abort("cannot copy object {} of type {}".format(args.object_uuid, t))
@@ -107,50 +110,9 @@ def api_for_instance(instance_name):
abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
return client
-# copy_collection(obj_uuid, src, dst)
-#
-# Copy the collection identified by obj_uuid from src to dst.
-#
-# For this application, it is critical to preserve the
-# collection's manifest hash, which is not guaranteed with the
-# arvados.CollectionReader and arvados.CollectionWriter classes.
-# Copying each block in the collection manually, followed by
-# the manifest block, ensures that the collection's manifest
-# hash will not change.
-#
-def copy_collection(obj_uuid, src=None, dst=None):
- # Fetch the collection's manifest.
- c = src.collections().get(uuid=obj_uuid).execute()
- manifest = c['manifest_text']
- logging.debug('copying collection %s', obj_uuid)
- logging.debug('manifest_text = %s', manifest)
-
- # Enumerate the block locators found in the manifest.
- collection_blocks = sets.Set()
- src_keep = arvados.keep.KeepClient(src)
- for line in manifest.splitlines():
- try:
- block_hash = line.split()[1]
- collection_blocks.add(block_hash)
- except ValueError:
- abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
-
- # Copy each block from src_keep to dst_keep.
- dst_keep = arvados.keep.KeepClient(dst)
- for locator in collection_blocks:
- data = src_keep.get(locator)
- logger.debug('copying block %s', locator)
- logger.info("Retrieved %d bytes", len(data))
- dst_keep.put(data)
-
- # Copy the manifest and save the collection.
- logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
- dst_keep.put(manifest)
- return dst.collections().create(body={"manifest_text": manifest}).execute()
-
-# copy_pipeline_instance(obj_uuid, dst_git_repo, dst_project, recursive, src, dst)
+# copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
#
-# Copies a pipeline instance identified by obj_uuid from src to dst.
+# Copies a pipeline instance identified by pi_uuid from src to dst.
#
# If the 'recursive' option evaluates to True:
# 1. Copies all input collections
@@ -167,84 +129,171 @@ def copy_collection(obj_uuid, src=None, dst=None):
# 3. The owner_uuid of the instance is changed to the user who
# copied it.
#
-def copy_pipeline_instance(obj_uuid, dst_git_repo=None, dst_project=None, recursive=True, src=None, dst=None):
+def copy_pipeline_instance(pi_uuid, dst_git_repo=None, dst_project=None, recursive=True, src=None, dst=None):
# Fetch the pipeline instance record.
- pi = src.pipeline_instances().get(uuid=obj_uuid).execute()
+ pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
+ pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
if recursive:
- # Copy input collections and docker images:
- # For each component c in the pipeline, add any
- # collection hashes found in c['job']['dependencies']
- # and c['job']['docker_image_locator'].
- #
- input_collections = sets.Set()
- for cname in pi['components']:
- if 'job' not in pi['components'][cname]:
- continue
- job = pi['components'][cname]['job']
- for dep in job['dependencies']:
- input_collections.add(dep)
- docker = job.get('docker_image_locator', None)
- if docker:
- input_collections.add(docker)
-
- for c in input_collections:
- copy_collection(c, src, dst)
-
- # Copy the git repositorie(s)
- repos = sets.Set()
- for c in pi['components']:
- component = pi['components'][c]
- if 'repository' in component:
- repos.add(component['repository'])
- if 'job' in component and 'repository' in component['job']:
- repos.add(component['job']['repository'])
-
- for r in repos:
- dst_branch = '{}_{}'.format(obj_uuid, r)
- copy_git_repo(r, dst_git_repo, dst_branch, src, dst)
-
- # Copy the pipeline template and save the uuid of the copy
- new_pt = copy_pipeline_template(pi['pipeline_template_uuid'], src, dst)
-
- # Update the fields of the pipeline instance
- pi['properties']['copied_from_pipeline_instance_uuid'] = obj_uuid
- pi['pipeline_template_uuid'] = new_pt
+ # Copy the pipeline template and save the copied template.
+ pt = copy_pipeline_template(pi['pipeline_template_uuid'],
+ recursive=True,
+ src, dst)
+
+ # Copy input collections, docker images and git repos.
+ pi = copy_collections(pi, src, dst)
+ copy_git_repos(pi, dst_git_repo, src, dst)
+
+ # Update the fields of the pipeline instance with the copied
+ # pipeline template.
+ pi['pipeline_template_uuid'] = pt['uuid']
if dst_project:
pi['owner_uuid'] = dst_project
else:
del pi['owner_uuid']
- # Rename the repositories named in the components to the dst_git_repo
- for c in pi['components']:
- component = pi['components'][c]
- if 'repository' in component:
- component['repository'] = dst_git_repo
- if 'job' in component and 'repository' in component['job']:
- component['job']['repository'] = dst_git_repo
else:
# not recursive
- print >>sys.stderr, "Copying only pipeline instance {}.".format(obj_uuid)
+ print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
# Create the new pipeline instance at the destination Arvados.
new_pi = dst.pipeline_instances().create(pipeline_instance=pi).execute()
return new_pi
-# copy_pipeline_template(obj_uuid, src, dst)
+# copy_pipeline_template(pt_uuid, recursive, src, dst)
+#
+# Copies a pipeline template identified by pt_uuid from src to dst.
#
-# Copies a pipeline template identified by obj_uuid from src to dst.
+# If the 'recursive' option evaluates to true, also copy any collections,
+# docker images and git repositories that this template references.
#
# The owner_uuid of the new template is changed to that of the user
# who copied the template.
#
-def copy_pipeline_template(obj_uuid, src=None, dst=None):
+# Returns the copied pipeline template object.
+#
+def copy_pipeline_template(pt_uuid, recursive=True, src=None, dst=None):
# fetch the pipeline template from the source instance
- old_pt = src.pipeline_templates().get(uuid=obj_uuid).execute()
- old_pt['name'] = old_pt['name'] + ' copy'
- del old_pt['uuid']
- del old_pt['owner_uuid']
- return dst.pipeline_templates().create(body=old_pt).execute()
+ pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
+
+ if recursive:
+ # Copy input collections, docker images and git repos.
+ pt = copy_collections(pt, src, dst)
+ copy_git_repos(pt, dst_git_repo, src, dst)
+
+ pt['name'] = pt['name'] + ' copy'
+ del pt['uuid']
+ del pt['owner_uuid']
+
+ return dst.pipeline_templates().create(body=pt).execute()
+
+# copy_collections(obj, src, dst)
+#
+# Recursively copies all collections referenced by 'obj' from src
+# to dst.
+#
+# Returns a copy of obj with any old collection uuids replaced by
+# the new ones.
+#
+def copy_collections(obj, src, dst):
+ if type(obj) == str:
+ if uuid_type(src, obj) == 'Collection':
+ newc = copy_collection(obj, src, dst)
+ if obj != newc['uuid'] and obj != newc['portable_data_hash']:
+ return newc['uuid']
+ return obj
+ elif type(obj) == dict:
+ return {v: copy_collections(obj[v], src, dst) for v in obj}
+ elif type(obj) == list:
+ return [copy_collections(v, src, dst) for v in obj]
+ return obj
+
+# copy_git_repos(p, dst_repo, dst_branch, src, dst)
+#
+# Copy all git repositories referenced by pipeline instance or
+# template 'p' from src to dst.
+#
+# p is updated
+# Git repository dependencies are identified by:
+# * p['components'][c]['repository']
+# * p['components'][c]['job']['repository']
+# for each component c in the pipeline.
+#
+def copy_git_repos(p, dst_repo, src=None, dst=None):
+ copied = set()
+ for c in p['components']:
+ component = p['components'][c]
+ if 'repository' in component:
+ repo = component['repository']
+ if repo not in copied:
+ dst_branch = p['uuid']
+ copy_git_repo(repo, dst_repo, dst_branch, src, dst)
+ copied.add(repo)
+ component['repository'] = dst_repo
+ if 'job' in component and 'repository' in component['job']:
+ repo = component['job']['repository']
+ if repo not in copied:
+ dst_branch = p['uuid']
+ copy_git_repo(repo, dst_repo, dst_branch, src, dst)
+ copied.add(repo)
+ component['job']['repository'] = dst_repo
+ return repos
+
+# copy_collection(obj_uuid, src, dst)
+#
+# Copy the collection identified by obj_uuid from src to dst.
+# Returns the collection object created at dst.
+#
+# For this application, it is critical to preserve the
+# collection's manifest hash, which is not guaranteed with the
+# arvados.CollectionReader and arvados.CollectionWriter classes.
+# Copying each block in the collection manually, followed by
+# the manifest block, ensures that the collection's manifest
+# hash will not change.
+#
+def copy_collection(obj_uuid, src=None, dst=None):
+ c = src.collections().get(uuid=obj_uuid).execute()
+
+ # Check whether a collection with this hash already exists
+ # at the destination. If so, just return that collection.
+ if 'portable_data_hash' in c:
+ colhash = c['portable_data_hash']
+ else:
+ colhash = c['uuid']
+ dstcol = dst.collections().list(
+ filters=[['portable_data_hash', '=', colhash]]
+ ).execute()
+ if dstcol['items_available'] > 0:
+ return dstcol['items'][0]
+
+ # Fetch the collection's manifest.
+ manifest = c['manifest_text']
+ logging.debug('copying collection %s', obj_uuid)
+ logging.debug('manifest_text = %s', manifest)
+
+ # Enumerate the block locators found in the manifest.
+ collection_blocks = sets.Set()
+ src_keep = arvados.keep.KeepClient(src)
+ for line in manifest.splitlines():
+ try:
+ block_hash = line.split()[1]
+ collection_blocks.add(block_hash)
+ except ValueError:
+ abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
+
+ # Copy each block from src_keep to dst_keep.
+ dst_keep = arvados.keep.KeepClient(dst)
+ for locator in collection_blocks:
+ data = src_keep.get(locator)
+ logger.debug('copying block %s', locator)
+ logger.info("Retrieved %d bytes", len(data))
+ dst_keep.put(data)
+
+ # Copy the manifest and save the collection.
+ logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
+ dst_keep.put(manifest)
+ return dst.collections().create(body={"manifest_text": manifest}).execute()
# copy_git_repo(src_git_repo, dst_git_repo, dst_branch, src, dst)
#
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list