[ARVADOS] updated: 0812bc1c717e5fed57d420b177f6ca9d41e81032
git at public.curoverse.com
git at public.curoverse.com
Fri Oct 31 15:26:49 EDT 2014
Summary of changes:
crunch_scripts/run-command | 23 ++++++++++++--
sdk/python/arvados/commands/run.py | 65 +++++++++++++++++++++++++++-----------
2 files changed, 66 insertions(+), 22 deletions(-)
via 0812bc1c717e5fed57d420b177f6ca9d41e81032 (commit)
from 0edcc26fa0f04c707f0b6fd3694c3dae7572d8f7 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0812bc1c717e5fed57d420b177f6ca9d41e81032
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 31 15:26:46 2014 -0400
3609: Inherit --retries from _util. Be more specific about error being caught. Add comments.
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 6c27a94..28adb74 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -119,13 +119,19 @@ def add_to_group(gr, match):
gr[m] = []
gr[m].append(match.group(0))
+# Return the name of variable ('var') that will take on each value in 'items'
+# when performing an inner substitution
def var_items(p, c, key):
if "var" in c:
- # Var specifies
+ # Var specifies the variable name for inner parameter substitution
return (c["var"], get_items(p, c[key]))
else:
+ # The component function ('key') value is a list, so return the list
+ # directly with no parameter substition.
if isinstance(c[key], list):
return (None, get_items(p, c[key]))
+
+ # check if c[key] is a string that looks like a parameter
m = re.match("^\$\((.*)\)$", c[key])
if m and m.group(1) in p:
return (m.group(1), get_items(p, c[key]))
@@ -133,6 +139,10 @@ def var_items(p, c, key):
# backwards compatible, foreach specifies bare parameter name to use
return (c[key], get_items(p, p[c[key]]))
+# "p" is the parameter scope, "c" is the item to be expanded.
+# If "c" is a dict, apply function expansion.
+# If "c" is a list, recursively expand each item and return a new list.
+# If "c" is a string, apply parameter substitution
def expand_item(p, c):
if isinstance(c, dict):
if "foreach" in c and "command" in c:
@@ -185,8 +195,13 @@ def expand_item(p, c):
else:
return subst.do_substitution(p, c)
- return []
+ raise Exception("expand_item() unexpected parameter type %s" % (type(c))
+# Evaluate in a list context
+# "p" is the parameter scope, "value" will be evaluated
+# if "value" is a list after expansion, return that
+# if "value" is a path to a directory, return a list consisting of each entry in the directory
+# if "value" is a path to a file, return a list consisting of each line of the file
def get_items(p, value):
value = expand_item(p, value)
if isinstance(value, list):
@@ -208,6 +223,7 @@ stdoutfile = None
stdinname = None
stdinfile = None
+# Construct the cross product of all values of each variable listed in fvars
def recursive_foreach(params, fvars):
var = fvars[0]
fvars = fvars[1:]
@@ -392,7 +408,8 @@ if "task.vwd" in taskp:
else:
outcollection = robust_put.upload(outdir, logger)
-success = reduce(lambda x, y: x & (y == 0), [True]+rcode.values())
+# Success if no non-zero return codes
+success = not any([status != 0 for status in rcode.values()])
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 962d6a8..411b997 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
import arvados
+import arvados.commands.ws as ws
import argparse
import json
import re
@@ -8,13 +9,13 @@ import os
import stat
import put
import time
-import arvados.commands.ws as ws
import subprocess
import logging
+import arvados.commands._util as arv_cmd
logger = logging.getLogger('arvados.arv-run')
-arvrun_parser = argparse.ArgumentParser()
+arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance")
arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs")
@@ -35,6 +36,9 @@ class ArvFile(object):
class UploadFile(ArvFile):
pass
+# Determine if a file is in a collection, and return a tuple consisting of the
+# portable data hash and the path relative to the root of the collection.
+# Return None if the path isn't with an arv-mount collection or there was is error.
def is_in_collection(root, branch):
try:
if root == "/":
@@ -47,9 +51,12 @@ def is_in_collection(root, branch):
else:
sp = os.path.split(root)
return is_in_collection(sp[0], os.path.join(sp[1], branch))
- except:
+ except IOError, OSError:
return (None, None)
+# Determine the project to place the output of this command by searching upward
+# for arv-mount psuedofile indicating the project. If the cwd isn't within
+# an arv-mount project or there is an error, return current_user.
def determine_project(root, current_user):
try:
if root == "/":
@@ -65,9 +72,14 @@ def determine_project(root, current_user):
else:
sp = os.path.split(root)
return determine_project(sp[0], current_user)
- except:
+ except IOError, OSError:
return current_user
+# Determine if string corresponds to a file, and if that file is part of a
+# arv-mounted collection or only local to the machine. Returns one of
+# ArvFile() (file already exists in a collection), UploadFile() (file needs to
+# be uploaded to a collection), or simply returns prefix+fn (which yields the
+# original parameter string).
def statfile(prefix, fn):
absfn = os.path.abspath(fn)
if os.path.exists(absfn):
@@ -93,6 +105,17 @@ def main(arguments=None):
reading_into = 2
+ # Parse the command arguments into 'slots'.
+ # All words following '>' are output arguments and are collected into slots[0].
+ # All words following '<' are input arguments and are collected into slots[1].
+ # slots[2..] store the parameters of each command in the pipeline.
+ #
+ # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt
+ # will be parsed into:
+ # [['output.txt'],
+ # ['input1', 'input2', 'input3'],
+ # ['foo', 'arg1', 'arg2'],
+ # ['bar', 'arg3', 'arg4']]
slots = [[], [], []]
for c in args.args:
if c.startswith('>'):
@@ -120,9 +143,11 @@ def main(arguments=None):
else:
project = determine_project(os.getcwd(), api.users().current().execute()["uuid"])
+ # Identify input files. Look at each parameter and test to see if there is
+ # a file by that name. This uses 'patterns' to look for within
+ # command line arguments, such as --foo=file.txt or -lfile.txt
patterns = [re.compile("([^=]+=)(.*)"),
re.compile("(-[A-Za-z])(.+)")]
-
for j, command in enumerate(slots[1:]):
for i, a in enumerate(command):
if j > 0 and i == 0:
@@ -133,17 +158,19 @@ def main(arguments=None):
# if it starts with a \ then don't do any interpretation
command[i] = a[1:]
else:
- # Do some pattern matching
- matched = False
- for p in patterns:
- m = p.match(a)
- if m:
- command[i] = statfile(m.group(1), m.group(2))
- matched = True
- break
- if not matched:
- # parameter might be a file, so test it
- command[i] = statfile('', a)
+ # See if it looks like a file
+ command[i] = statfile('', a)
+
+ # If a file named command[i] was found, it would now be an
+ # ArvFile or UploadFile. If command[i] is a basestring, that
+ # means it doesn't correspond exactly to a file, so do some
+ # pattern matching.
+ if isinstance(command[i], basestring):
+ for p in patterns:
+ m = p.match(a)
+ if m:
+ command[i] = statfile(m.group(1), m.group(2))
+ break
n = True
pathprefix = "/"
@@ -185,7 +212,7 @@ def main(arguments=None):
pdh = "$(input)"
else:
files = sorted(files, key=lambda x: x.fn)
- collection = arvados.CollectionWriter(api, num_retries=3)
+ collection = arvados.CollectionWriter(api, num_retries=args.retries)
stream = None
for f in files:
sp = os.path.split(f.fn)
@@ -242,7 +269,7 @@ def main(arguments=None):
if slots[1]:
task_foreach.append("stdin")
component["script_parameters"]["stdin"] = slots[1]
- component["script_parameters"]["task.stdin"] = "$(stdin)"\
+ component["script_parameters"]["task.stdin"] = "$(stdin)"
if task_foreach:
component["script_parameters"]["task.foreach"] = task_foreach
@@ -264,7 +291,7 @@ def main(arguments=None):
print(json.dumps(pipeline, indent=4))
else:
pipeline["owner_uuid"] = project
- pi = api.pipeline_instances().create(body=pipeline).execute()
+ pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
print "Running pipeline %s" % pi["uuid"]
if args.local:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list