[ARVADOS] created: b4dde1b5648aa71180282b7dfa0fc70ba7c87755

git at public.curoverse.com git at public.curoverse.com
Thu Oct 2 11:46:34 EDT 2014


        at  b4dde1b5648aa71180282b7dfa0fc70ba7c87755 (commit)


commit b4dde1b5648aa71180282b7dfa0fc70ba7c87755
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 2 11:46:24 2014 -0400

    4042: task.foreach should now accept multiple parameters and generate tasks
    based on the cartesian product of all input lists.

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index c5fbcdf..74ad8a8 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -113,7 +113,7 @@ def get_items(p, value):
             items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
         elif stat.S_ISREG(mode):
             with open(fn) as f:
-                items = [line for line in f]
+                items = [line.rstrip("\r\n") for line in f]
         return items
     else:
         return None
@@ -124,33 +124,50 @@ stdinname = None
 stdinfile = None
 rcode = 1
 
+def recursive_foreach(params, fvars):
+    var = fvars[0]
+    fvars = fvars[1:]
+    items = get_items(params, params[var])
+    logger.info("parallelizing on %s with items %s" % (var, items))
+    if items is not None:
+        for i in items:
+            params = copy.copy(params)
+            params[var] = i
+            if len(fvars) > 0:
+                recursive_foreach(params, fvars)
+            else:
+                arvados.api().job_tasks().create(body={
+                    'job_uuid': arvados.current_job()['uuid'],
+                    'created_by_job_task_uuid': arvados.current_task()['uuid'],
+                    'sequence': 1,
+                    'parameters': params
+                    }
+                ).execute()
+    else:
+        logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
+        sys.exit(1)
+
 try:
     if "task.foreach" in jobp:
         if arvados.current_task()['sequence'] == 0:
-            var = jobp["task.foreach"]
-            items = get_items(jobp, jobp[var])
-            logger.info("parallelizing on %s with items %s" % (var, items))
-            if items is not None:
-                for i in items:
-                    params = copy.copy(jobp)
-                    params[var] = i
-                    arvados.api().job_tasks().create(body={
-                        'job_uuid': arvados.current_job()['uuid'],
-                        'created_by_job_task_uuid': arvados.current_task()['uuid'],
-                        'sequence': 1,
-                        'parameters': params
-                        }
-                    ).execute()
-                if "task.vwd" in jobp:
-                    # Base vwd collection will be merged with output fragments from
-                    # the other tasks by crunch.
-                    arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
-                else:
-                    arvados.current_task().set_output(None)
-                sys.exit(0)
-            else:
+            # This is the first task to start the other tasks and exit
+            fvars = jobp["task.foreach"]
+            if isinstance(fvars, basestring):
+                fvars = [fvars]
+            if not isinstance(fvars, list) or len(fvars) == 0:
+                logger.error("value of task.foreach must be a string or non-empty list")
                 sys.exit(1)
+            recursive_foreach(jobp, jobp["task.foreach"])
+            if "task.vwd" in jobp:
+                # Set output of the first task to the base vwd collection so it
+                # will be merged with output fragments from the other tasks by
+                # crunch.
+                arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
+            else:
+                arvados.current_task().set_output(None)
+            sys.exit(0)
     else:
+        # This is the only task so taskp/jobp are the same
         taskp = jobp
 
     if "task.vwd" in taskp:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list