[ARVADOS] updated: c79e86aff4cb20413cf0f09c52fe5066ca197deb

git at public.curoverse.com git at public.curoverse.com
Wed Oct 15 11:10:34 EDT 2014


Summary of changes:
 crunch_scripts/run-command         | 37 +++++++++++++++++++++++++------------
 sdk/python/arvados/commands/run.py | 24 ++++++++++++------------
 2 files changed, 37 insertions(+), 24 deletions(-)

       via  c79e86aff4cb20413cf0f09c52fe5066ca197deb (commit)
      from  ca65f360000d14ae987f3f6d3d15244ccf581c72 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c79e86aff4cb20413cf0f09c52fe5066ca197deb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 15 11:10:27 2014 -0400

    3609: Add support for batch size, improve ability to pass lists of lists
    without them getting flattened.

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index fb4f3c7..fbfd511 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -103,7 +103,7 @@ def add_to_group(gr, match):
         gr[m] = []
     gr[m].append(match.group(0))
 
-def expand_item(p, c):
+def expand_item(p, c, flatten=True):
     if isinstance(c, dict):
         if "foreach" in c and "command" in c:
             var = c["foreach"]
@@ -112,7 +112,7 @@ def expand_item(p, c):
             for i in items:
                 params = copy.copy(p)
                 params[var] = i
-                r.extend(expand_list(params, c["command"]))
+                r.extend(expand_item(params, c["command"]))
             return r
         if "list" in c and "index" in c and "command" in c:
             var = c["list"]
@@ -144,22 +144,27 @@ def expand_item(p, c):
     elif isinstance(c, list):
         return expand_list(p, c)
     elif isinstance(c, basestring):
-        return [subst.do_substitution(p, c)]
+        if flatten:
+            return [subst.do_substitution(p, c)]
+        else:
+            return subst.do_substitution(p, c)
 
     return []
 
-def expand_list(p, l):
+def expand_list(p, l, flatten=True):
     if isinstance(l, basestring):
         return expand_item(p, l)
+    elif flatten:
+        return [exp for arg in l for exp in expand_item(p, arg, flatten)]
     else:
-        return [exp for arg in l for exp in expand_item(p, arg)]
+        return [expand_item(p, arg, flatten) for arg in l]
 
-def get_items(p, value):
+def get_items(p, value, flatten=True):
     if isinstance(value, dict):
         return expand_item(p, value)
 
     if isinstance(value, list):
-        return expand_list(p, value)
+        return expand_list(p, value, flatten)
 
     fn = subst.do_substitution(p, value)
     mode = os.stat(fn).st_mode
@@ -182,7 +187,7 @@ stdinfile = None
 def recursive_foreach(params, fvars):
     var = fvars[0]
     fvars = fvars[1:]
-    items = get_items(params, params[var])
+    items = get_items(params, params[var], False)
     logger.info("parallelizing on %s with items %s" % (var, items))
     if items is not None:
         for i in items:
@@ -199,7 +204,10 @@ def recursive_foreach(params, fvars):
                         'parameters': params
                     }).execute()
                 else:
-                    logger.info(expand_list(params, params["command"]))
+                    if isinstance(params["command"][0], list):
+                        logger.info(expand_list(params, params["command"], False))
+                    else:
+                        logger.info(expand_list(params, params["command"], True))
     else:
         logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
         sys.exit(1)
@@ -227,7 +235,13 @@ try:
     else:
         # This is the only task so taskp/jobp are the same
         taskp = jobp
+except Exception as e:
+    logger.exception("caught exception")
+    logger.error("job parameters were:")
+    logger.error(pprint.pformat(jobp))
+    sys.exit(1)
 
+try:
     if not args.dry_run:
         if "task.vwd" in taskp:
             # Populate output directory with symlinks to files in collection
@@ -238,10 +252,9 @@ try:
 
     cmd = []
     if isinstance(taskp["command"][0], list):
-        for c in taskp["command"]:
-            cmd.append(expand_list(taskp, c))
+        cmd.append(expand_list(taskp, taskp["command"], False))
     else:
-        cmd.append(expand_list(taskp, taskp["command"]))
+        cmd.append(expand_list(taskp, taskp["command"], True))
 
     if "task.stdin" in taskp:
         stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 9301e3c..e118a9e 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -18,9 +18,9 @@ arvrun_parser = argparse.ArgumentParser()
 arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
 arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-crunch-job")
 arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs")
-arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git directory to use to find run-command when using --local")
-arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of pipeline submission, default 'arvados'")
-arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of pipeline submission, default 'master'")
+arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local")
+arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'")
+arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'")
 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
 
 class ArvFile(object):
@@ -156,23 +156,23 @@ def main(arguments=None):
 
     task_foreach = []
     group_parser = argparse.ArgumentParser()
-    group_parser.add_argument('--group', type=str)
+    group_parser.add_argument('--batch-size', type=int)
     group_parser.add_argument('args', nargs=argparse.REMAINDER)
 
     for s in xrange(2, len(slots)):
         for i in xrange(0, len(slots[s])):
             if slots[s][i] == '--':
-                inp = "input%i" % s
+                inp = "input%i" % (s-2)
                 groupargs = group_parser.parse_args(slots[2][i+1:])
-                component["script_parameters"][inp] = groupargs.args
-                if groupargs.group:
-                    inpgroups = inp+"_groups"
-                    component["script_parameters"][inpgroups] = {"group":inp, "regex":groupargs.group}
-                    slots[s] = slots[s][0:i] + [{"foreach": inpgroups, "command": "$(%s)" % inpgroups}]
-                    task_foreach.append(inpgroups)
+                if groupargs.batch_size:
+                    component["script_parameters"][inp] = []
+                    for j in xrange(0, len(groupargs.args), groupargs.batch_size):
+                        component["script_parameters"][inp].append(groupargs.args[j:j+groupargs.batch_size])
+                    slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
                 else:
+                    component["script_parameters"][inp] = groupargs.args
                     slots[s] = slots[s][0:i] + ["$(%s)" % inp]
-                    task_foreach.append(inp)
+                task_foreach.append(inp)
                 break
             if slots[s][i] == '\--':
                 slots[s][i] = '--'

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list