[ARVADOS] updated: 7fadfb046ace2dbda699037545c5504e99446046

git at public.curoverse.com git at public.curoverse.com
Mon Oct 13 16:37:24 EDT 2014


Summary of changes:
 crunch_scripts/run-command         |  88 +++++++++++-----
 sdk/python/arvados/commands/run.py | 209 +++++++++++++++++++++++++++++++++++++
 sdk/python/bin/arv-run             |   4 +
 3 files changed, 272 insertions(+), 29 deletions(-)
 create mode 100644 sdk/python/arvados/commands/run.py
 create mode 100755 sdk/python/bin/arv-run

       via  7fadfb046ace2dbda699037545c5504e99446046 (commit)
       via  c0cead2ee16700cf14baba7993297e83787aedbb (commit)
       via  fee9a5e5f58a830082374e19787a44185a2a0fdb (commit)
       via  dd131dfc2686e378a7d90f5cc269340ab5adada9 (commit)
       via  3f726dd7022da6e2be816ba6cc493212596087f5 (commit)
       via  c9df4289923a621c929920fe958dbde287f29d73 (commit)
       via  0fdfa049801418ecd1faf33ec1415f3b689ea761 (commit)
       via  4f4ad25bf60751a09e316dca8c29cf3628ad7bdc (commit)
       via  c8a5c25611c964d2af8ba26b88622b70692257e4 (commit)
      from  1ad626b28816840288093d94a12ea7694201364b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7fadfb046ace2dbda699037545c5504e99446046
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 16:37:14 2014 -0400

    3609: Parallelizing on -- works, added tentative support for --group.

diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 97278c8..7e95083 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -21,11 +21,14 @@ arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
 arvrun_parser.add_argument('--git-dir', type=str, default="")
 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
 
-class UploadFile(object):
+class ArvFile(object):
     def __init__(self, prefix, fn):
         self.prefix = prefix
         self.fn = fn
 
+class UploadFile(ArvFile):
+    pass
+
 def is_in_collection(root, branch):
     if root == "/":
         return (None, None)
@@ -46,7 +49,7 @@ def statfile(prefix, fn):
             sp = os.path.split(absfn)
             (pdh, branch) = is_in_collection(sp[0], sp[1])
             if pdh:
-                return "%s$(file %s/%s)" % (prefix, pdh, branch)
+                return ArvFile(prefix, "$(file %s/%s)" % (pdh, branch))
             else:
                 # trim leading '/' for path prefix test later
                 return UploadFile(prefix, absfn[1:])
@@ -132,15 +135,17 @@ def main(arguments=None):
         else:
             pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
 
-        for i in xrange(1, len(slots)):
-            slots[i] = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, UploadFile) else c for c in slots[i]]
+        for c in files:
+            c.fn = "$(file %s/%s)" % (pdh, c.fn)
+
+    for i in xrange(1, len(slots)):
+        slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
 
     component = {
         "script": "run-command",
         "script_version": "3609-arv-run",
         "repository": "arvados",
         "script_parameters": {
-            "command": slots[2:]
         },
         "runtime_constraints": {
             "docker_image": args.docker_image
@@ -148,6 +153,27 @@ def main(arguments=None):
     }
 
     task_foreach = []
+    group_parser = argparse.ArgumentParser()
+    group_parser.add_argument('--group', type=str)
+    group_parser.add_argument('args', nargs=argparse.REMAINDER)
+
+    for s in xrange(2, len(slots)):
+        for i in xrange(0, len(slots[s])):
+            if slots[s][i] == '--':
+                inp = "input%i" % s
+                groupargs = group_parser.parse_args(slots[2][i+1:])
+                component["script_parameters"][inp] = groupargs.args
+                if groupargs.group:
+                    inpgroups = inp+"_groups"
+                    component["script_parameters"][inpgroups] = {"group":inp, "regex":groupargs.group}
+                    slots[s] = slots[s][0:i] + [{"foreach": inpgroups, "command": "$(%s)" % inpgroups}]
+                    task_foreach.append(inpgroups)
+                else:
+                    slots[s] = slots[s][0:i] + ["$(%s)" % inp]
+                    task_foreach.append(inp)
+                break
+            if slots[s][i] == '\--':
+                slots[s][i] = '--'
 
     if slots[0]:
         component["script_parameters"]["task.stdout"] = slots[0][0]
@@ -159,6 +185,8 @@ def main(arguments=None):
     if task_foreach:
         component["script_parameters"]["task.foreach"] = task_foreach
 
+    component["script_parameters"]["command"] = slots[2:]
+
     pipeline = {
         "name": "",
         "components": {
@@ -174,6 +202,7 @@ def main(arguments=None):
     else:
         api = arvados.api('v1')
         pi = api.pipeline_instances().create(body=pipeline).execute()
+        print "Running pipeline %s" % pi["uuid"]
         #ws.main(["--pipeline", pi["uuid"]])
 
 if __name__ == '__main__':

commit c0cead2ee16700cf14baba7993297e83787aedbb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 15:34:25 2014 -0400

    Search up path to see if a file is in a collection.  Refactor to work towards supporting pipelines.

diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 6cebb44..97278c8 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -10,79 +10,115 @@ import put
 import time
 #import arvados.command.ws as ws
 import subprocess
+import logging
+
+logger = logging.getLogger('arvados.arv-run')
 
 arvrun_parser = argparse.ArgumentParser()
 arvrun_parser.add_argument('--dry-run', action="store_true")
 arvrun_parser.add_argument('--local', action="store_true")
 arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
 arvrun_parser.add_argument('--git-dir', type=str, default="")
-arvrun_parser.add_argument('command')
 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
 
-needupload_files = []
-
-class ArvFile(object):
+class UploadFile(object):
     def __init__(self, prefix, fn):
         self.prefix = prefix
         self.fn = fn
 
-def statfile(prefix, fn, pattern):
+def is_in_collection(root, branch):
+    if root == "/":
+        return (None, None)
+    fn = os.path.join(root, ".arvados#collection")
+    if os.path.exists(fn):
+        with file(fn, 'r') as f:
+            c = json.load(f)
+        return (c["portable_data_hash"], branch)
+    else:
+        sp = os.path.split(root)
+        return is_in_collection(sp[0], os.path.join(sp[1], branch))
+
+def statfile(prefix, fn):
     absfn = os.path.abspath(fn)
     if os.path.exists(absfn):
-        fn = os.path.abspath(fn)
-        st = os.stat(fn)
+        st = os.stat(absfn)
         if stat.S_ISREG(st.st_mode):
-            mount = os.path.dirname(fn)+"/.arvados#collection"
-            if os.path.exists(mount):
-                with file(mount, 'r') as f:
-                    c = json.load(f)
-                return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")"
+            sp = os.path.split(absfn)
+            (pdh, branch) = is_in_collection(sp[0], sp[1])
+            if pdh:
+                return "%s$(file %s/%s)" % (prefix, pdh, branch)
             else:
-                needupload_files.append(fn)
-            return ArvFile(prefix, fn[1:])
+                # trim leading '/' for path prefix test later
+                return UploadFile(prefix, absfn[1:])
     return prefix+fn
 
 def main(arguments=None):
     args = arvrun_parser.parse_args(arguments)
 
+    reading_into = 2
+
+    slots = [[], [], []]
+    for c in args.args:
+        if c == '>':
+            reading_into = 0
+        elif c == '<':
+            reading_into = 1
+        elif c == '|':
+            reading_into = len(slots)
+            slots.append([])
+        else:
+            slots[reading_into].append(c)
+
+    if slots[0] and len(slots[0]) > 1:
+        logger.error("Can only specify a single stdout file (run-command substitutions are permitted)")
+        return
+
     patterns = [re.compile("(--[^=]+=)(.*)"),
                 re.compile("(-[^=]+=)(.*)"),
                 re.compile("(-.)(.+)")]
 
-    commandargs = []
-
-    for a in args.args:
-        if a[0] == '-':
-            matched = False
-            for p in patterns:
-                m = p.match(a)
-                if m:
-                    commandargs.append(statfile(m.group(1), m.group(2), p))
-                    matched = True
-                    break
-            if not matched:
-                commandargs.append(a)
-        else:
-            commandargs.append(statfile('', a, None))
+    for command in slots[1:]:
+        for i in xrange(0, len(command)):
+            a = command[i]
+            if a[0] == '-':
+                # parameter starts with '-' so it might be a command line
+                # parameter with a file name, do some pattern matching
+                matched = False
+                for p in patterns:
+                    m = p.match(a)
+                    if m:
+                        command[i] = statfile(m.group(1), m.group(2))
+                        break
+            else:
+                # parameter might be a file, so test it
+                command[i] = statfile('', a)
 
     n = True
     pathprefix = "/"
-    files = [c for c in commandargs if isinstance(c, ArvFile)]
+    files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
     if len(files) > 0:
+        # Find the smallest path prefix that includes all the files that need to be uploaded.
+        # This starts at the root and iteratively removes common parent directory prefixes
+        # until all file pathes no longer have a common parent.
         while n:
             pathstep = None
             for c in files:
                 if pathstep is None:
                     sp = c.fn.split('/')
                     if len(sp) < 2:
+                        # no parent directories left
                         n = False
                         break
+                    # path step takes next directory
                     pathstep = sp[0] + "/"
                 else:
+                    # check if pathstep is common prefix for all files
                     if not c.fn.startswith(pathstep):
                         n = False
                         break
             if n:
+                # pathstep is common parent directory for all files, so remove the prefix
+                # from each path
                 pathprefix += pathstep
                 for c in files:
                     c.fn = c.fn[len(pathstep):]
@@ -96,48 +132,37 @@ def main(arguments=None):
         else:
             pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
 
-    commandargs = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, ArvFile) else c for c in commandargs]
-
-    cut = None
-    i = -1
-    stdio = [None, None]
-    for j in xrange(0, len(commandargs)):
-        c = commandargs[j]
-        if c == '<':
-            stdio[0] = []
-            i = 0
-            cut = j if cut is None else cut
-        elif c == '>':
-            stdio[1] = []
-            i = 1
-            cut = j if cut is None else cut
-        elif i > -1:
-            stdio[i].append(c)
-
-    if cut is not None:
-        commandargs = commandargs[:cut]
+        for i in xrange(1, len(slots)):
+            slots[i] = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, UploadFile) else c for c in slots[i]]
 
     component = {
         "script": "run-command",
         "script_version": "3609-arv-run",
         "repository": "arvados",
         "script_parameters": {
-            "command": [args.command]+commandargs
+            "command": slots[2:]
         },
         "runtime_constraints": {
             "docker_image": args.docker_image
         }
     }
 
-    if stdio[0]:
-        component["script_parameters"]["task.stdin"] = stdio[0][0]
-    if stdio[1]:
-        component["script_parameters"]["task.stdout"] = stdio[1][0]
+    task_foreach = []
+
+    if slots[0]:
+        component["script_parameters"]["task.stdout"] = slots[0][0]
+    if slots[1]:
+        task_foreach.append("stdin")
+        component["script_parameters"]["stdin"] = slots[1]
+        component["script_parameters"]["task.stdin"] = "$(stdin)"\
+
+    if task_foreach:
+        component["script_parameters"]["task.foreach"] = task_foreach
 
     pipeline = {
         "name": "",
         "components": {
-            args.command: component
+            "command": component
         },
         "state":"RunningOnServer"
     }

commit fee9a5e5f58a830082374e19787a44185a2a0fdb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 13:38:38 2014 -0400

    New wait logic, report all exit codes.

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 609572e..fb4f3c7 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -178,7 +178,6 @@ stdoutname = None
 stdoutfile = None
 stdinname = None
 stdinfile = None
-rcode = 1
 
 def recursive_foreach(params, fvars):
     var = fvars[0]
@@ -309,29 +308,20 @@ try:
     signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
 
     active = 1
-    while active > 0:
-        try:
-            os.waitpid(0, 0)
-        except OSError as e:
-            if e.errno == errno.ECHILD:
-                # child already exited
-                print "got ECHILD"
-                time.sleep(1)
-                for s in subprocesses:
-                    print s.poll()
-            else:
-                raise
-        active = sum([1 if s.poll() is None else 0 for s in subprocesses])
-        print "active is %i" % active
-
-    # wait for process to complete.
-    rcode = subprocesses[len(subprocesses)-1].returncode
+    pids = set([s.pid for s in subprocesses])
+    rcode = {}
+    while len(pids) > 0:
+        (pid, status) = os.wait()
+        pids.discard(pid)
+        rcode[pid] = (status >> 8)
 
     if sig.sig is not None:
         logger.critical("terminating on signal %s" % sig.sig)
         sys.exit(2)
     else:
-        logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
+        for i in xrange(len(cmd)):
+            r = rcode[subprocesses[i].pid]
+            logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
 
 except Exception as e:
     logger.exception("caught exception")
@@ -360,10 +350,12 @@ if "task.vwd" in taskp:
 else:
     outcollection = robust_put.upload(outdir, logger)
 
+success = reduce(lambda x, y: x & (y == 0), [True]+rcode.values())
+
 api.job_tasks().update(uuid=arvados.current_task()['uuid'],
                                      body={
                                          'output': outcollection,
-                                         'success': (rcode == 0),
+                                         'success': success,
                                          'progress':1.0
                                      }).execute()
 

commit dd131dfc2686e378a7d90f5cc269340ab5adada9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 13:18:15 2014 -0400

    sleep so it doesn't go haywire

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 2af422f..609572e 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -316,7 +316,9 @@ try:
             if e.errno == errno.ECHILD:
                 # child already exited
                 print "got ECHILD"
-                pass
+                time.sleep(1)
+                for s in subprocesses:
+                    print s.poll()
             else:
                 raise
         active = sum([1 if s.poll() is None else 0 for s in subprocesses])

commit 3f726dd7022da6e2be816ba6cc493212596087f5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 12:50:33 2014 -0400

    Use branch

diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 2d966b6..6cebb44 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -119,7 +119,7 @@ def main(arguments=None):
 
     component = {
         "script": "run-command",
-        "script_version": "0fdfa049801418ecd1faf33ec1415f3b689ea761",
+        "script_version": "3609-arv-run",
         "repository": "arvados",
         "script_parameters": {
             "command": [args.command]+commandargs

commit c9df4289923a621c929920fe958dbde287f29d73
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 12:50:00 2014 -0400

    testing

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index edc0f59..2af422f 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -315,10 +315,12 @@ try:
         except OSError as e:
             if e.errno == errno.ECHILD:
                 # child already exited
+                print "got ECHILD"
                 pass
             else:
                 raise
         active = sum([1 if s.poll() is None else 0 for s in subprocesses])
+        print "active is %i" % active
 
     # wait for process to complete.
     rcode = subprocesses[len(subprocesses)-1].returncode
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 9df0a84..2d966b6 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -119,7 +119,7 @@ def main(arguments=None):
 
     component = {
         "script": "run-command",
-        "script_version": "4f4ad25bf60751a09e316dca8c29cf3628ad7bdc",
+        "script_version": "0fdfa049801418ecd1faf33ec1415f3b689ea761",
         "repository": "arvados",
         "script_parameters": {
             "command": [args.command]+commandargs

commit 0fdfa049801418ecd1faf33ec1415f3b689ea761
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 12:46:15 2014 -0400

    Catch ECHILD from os.waitpid()

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 34419b4..edc0f59 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -28,6 +28,7 @@ import crunchutil.vwd as vwd
 import argparse
 import json
 import tempfile
+import errno
 
 parser = argparse.ArgumentParser()
 parser.add_argument('--dry-run', action='store_true')
@@ -309,7 +310,14 @@ try:
 
     active = 1
     while active > 0:
-        os.waitpid(0, 0)
+        try:
+            os.waitpid(0, 0)
+        except OSError as e:
+            if e.errno == errno.ECHILD:
+                # child already exited
+                pass
+            else:
+                raise
         active = sum([1 if s.poll() is None else 0 for s in subprocesses])
 
     # wait for process to complete.
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index e27b3df..9df0a84 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -7,12 +7,15 @@ import re
 import os
 import stat
 import put
-import arvados.events
 import time
+#import arvados.command.ws as ws
+import subprocess
 
 arvrun_parser = argparse.ArgumentParser()
 arvrun_parser.add_argument('--dry-run', action="store_true")
+arvrun_parser.add_argument('--local', action="store_true")
 arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
+arvrun_parser.add_argument('--git-dir', type=str, default="")
 arvrun_parser.add_argument('command')
 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
 
@@ -116,7 +119,7 @@ def main(arguments=None):
 
     component = {
         "script": "run-command",
-        "script_version": "bf243e064a7a2ee4e69a87dc3ba46e949a545150",
+        "script_version": "4f4ad25bf60751a09e316dca8c29cf3628ad7bdc",
         "repository": "arvados",
         "script_parameters": {
             "command": [args.command]+commandargs
@@ -141,21 +144,12 @@ def main(arguments=None):
 
     if args.dry_run:
         print(json.dumps(pipeline, indent=4))
+    elif args.local:
+        subprocess.call(["arv-crunch-job", "--job", json.dumps(component), "--git-dir", args.git_dir])
     else:
         api = arvados.api('v1')
         pi = api.pipeline_instances().create(body=pipeline).execute()
-        ws = None
-        def report(x):
-            if "event_type" in x:
-                print "\n"
-                print x
-                if x["event_type"] == "stderr":
-                    print x["properties"]["text"]
-                elif x["event_type"] == "update" and x["properties"]["new_attributes"]["state"] in ["Complete", "Failed"]:
-                    ws.close_connection()
-
-        ws =  arvados.events.subscribe(api, [["object_uuid", "=", pi["uuid"]], ["event_type", "in", ["stderr", "update"]]], report)
-        ws.run_forever()
+        #ws.main(["--pipeline", pi["uuid"]])
 
 if __name__ == '__main__':
     main()

commit 4f4ad25bf60751a09e316dca8c29cf3628ad7bdc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Sep 16 22:47:44 2014 -0400

    3609: Fix trimming redirect parts of the command line.
    3609: Collect command line arguments, uploads files, builds arv-run submission.  Needs work on event listener.
    
    3609: Now print log messages for submitted pipeline.
    
    3609: Fix trimming redirect parts of the command line.

diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
new file mode 100644
index 0000000..e27b3df
--- /dev/null
+++ b/sdk/python/arvados/commands/run.py
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+
+import arvados
+import argparse
+import json
+import re
+import os
+import stat
+import put
+import arvados.events
+import time
+
+arvrun_parser = argparse.ArgumentParser()
+arvrun_parser.add_argument('--dry-run', action="store_true")
+arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
+arvrun_parser.add_argument('command')
+arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
+
+needupload_files = []
+
+class ArvFile(object):
+    def __init__(self, prefix, fn):
+        self.prefix = prefix
+        self.fn = fn
+
+def statfile(prefix, fn, pattern):
+    absfn = os.path.abspath(fn)
+    if os.path.exists(absfn):
+        fn = os.path.abspath(fn)
+        st = os.stat(fn)
+        if stat.S_ISREG(st.st_mode):
+            mount = os.path.dirname(fn)+"/.arvados#collection"
+            if os.path.exists(mount):
+                with file(mount, 'r') as f:
+                    c = json.load(f)
+                return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")"
+            else:
+                needupload_files.append(fn)
+            return ArvFile(prefix, fn[1:])
+    return prefix+fn
+
+def main(arguments=None):
+    args = arvrun_parser.parse_args(arguments)
+
+    patterns = [re.compile("(--[^=]+=)(.*)"),
+                re.compile("(-[^=]+=)(.*)"),
+                re.compile("(-.)(.+)")]
+
+    commandargs = []
+
+    for a in args.args:
+        if a[0] == '-':
+            matched = False
+            for p in patterns:
+                m = p.match(a)
+                if m:
+                    commandargs.append(statfile(m.group(1), m.group(2), p))
+                    matched = True
+                    break
+            if not matched:
+                commandargs.append(a)
+        else:
+            commandargs.append(statfile('', a, None))
+
+    n = True
+    pathprefix = "/"
+    files = [c for c in commandargs if isinstance(c, ArvFile)]
+    if len(files) > 0:
+        while n:
+            pathstep = None
+            for c in files:
+                if pathstep is None:
+                    sp = c.fn.split('/')
+                    if len(sp) < 2:
+                        n = False
+                        break
+                    pathstep = sp[0] + "/"
+                else:
+                    if not c.fn.startswith(pathstep):
+                        n = False
+                        break
+            if n:
+                pathprefix += pathstep
+                for c in files:
+                    c.fn = c.fn[len(pathstep):]
+
+        os.chdir(pathprefix)
+
+        if args.dry_run:
+            print("cd %s" % pathprefix)
+            print("arv-put \"%s\"" % '" "'.join([c.fn for c in files]))
+            pdh = "$(input)"
+        else:
+            pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
+
+    commandargs = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, ArvFile) else c for c in commandargs]
+
+    cut = None
+    i = -1
+    stdio = [None, None]
+    for j in xrange(0, len(commandargs)):
+        c = commandargs[j]
+        if c == '<':
+            stdio[0] = []
+            i = 0
+            cut = j if cut is None else cut
+        elif c == '>':
+            stdio[1] = []
+            i = 1
+            cut = j if cut is None else cut
+        elif i > -1:
+            stdio[i].append(c)
+
+    if cut is not None:
+        commandargs = commandargs[:cut]
+
+    component = {
+        "script": "run-command",
+        "script_version": "bf243e064a7a2ee4e69a87dc3ba46e949a545150",
+        "repository": "arvados",
+        "script_parameters": {
+            "command": [args.command]+commandargs
+        },
+        "runtime_constraints": {
+            "docker_image": args.docker_image
+        }
+    }
+
+    if stdio[0]:
+        component["script_parameters"]["task.stdin"] = stdio[0][0]
+    if stdio[1]:
+        component["script_parameters"]["task.stdout"] = stdio[1][0]
+
+    pipeline = {
+        "name": "",
+        "components": {
+            args.command: component
+        },
+        "state":"RunningOnServer"
+    }
+
+    if args.dry_run:
+        print(json.dumps(pipeline, indent=4))
+    else:
+        api = arvados.api('v1')
+        pi = api.pipeline_instances().create(body=pipeline).execute()
+        ws = None
+        def report(x):
+            if "event_type" in x:
+                print "\n"
+                print x
+                if x["event_type"] == "stderr":
+                    print x["properties"]["text"]
+                elif x["event_type"] == "update" and x["properties"]["new_attributes"]["state"] in ["Complete", "Failed"]:
+                    ws.close_connection()
+
+        ws =  arvados.events.subscribe(api, [["object_uuid", "=", pi["uuid"]], ["event_type", "in", ["stderr", "update"]]], report)
+        ws.run_forever()
+
+if __name__ == '__main__':
+    main()
diff --git a/sdk/python/bin/arv-run b/sdk/python/bin/arv-run
new file mode 100755
index 0000000..41f5fd3
--- /dev/null
+++ b/sdk/python/bin/arv-run
@@ -0,0 +1,4 @@
+#!/usr/bin/env python
+
+from arvados.commands.run import main
+main()

commit c8a5c25611c964d2af8ba26b88622b70692257e4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Oct 13 12:02:02 2014 -0400

    3609: Piped commands works

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index b9f1ea6..34419b4 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -53,7 +53,7 @@ if not args.dry_run:
         taskp = arvados.current_task()['parameters']
 else:
     outdir = "/tmp"
-    jobp = json.loads(args.job_parameters)
+    jobp = json.loads(args.script_parameters)
     os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
     os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
     os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
@@ -91,8 +91,9 @@ class SigHandler(object):
     def __init__(self):
         self.sig = None
 
-    def send_signal(self, sp, signum):
-        sp.send_signal(signum)
+    def send_signal(self, subprocesses, signum):
+        for sp in subprocesses:
+            sp.send_signal(signum)
         self.sig = signum
 
 def add_to_group(gr, match):
@@ -235,23 +236,24 @@ try:
         if "task.cwd" in taskp:
             os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
 
-    if "piped_commands" in taskp:
-        cmd = []
-        for c in taskp["piped_commands"]:
-            cmd += expand_list(taskp, c)
+    cmd = []
+    if isinstance(taskp["command"][0], list):
+        for c in taskp["command"]:
+            cmd.append(expand_list(taskp, c))
     else:
-        cmd = [expand_list(taskp, taskp["command"])]
+        cmd.append(expand_list(taskp, taskp["command"]))
 
-    if not args.dry_run:
-        if "task.stdin" in taskp:
-            stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
+    if "task.stdin" in taskp:
+        stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
+        if not args.dry_run:
             stdinfile = open(stdinname, "rb")
 
-        if "task.stdout" in taskp:
-            stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
+    if "task.stdout" in taskp:
+        stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
+        if not args.dry_run:
             stdoutfile = open(stdoutname, "wb")
 
-    logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
+    logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
 
     if args.dry_run:
         sys.exit(0)
@@ -268,26 +270,50 @@ except Exception as e:
 
 try:
     subprocesses = []
-    next_cmd_stdin = stdinfile
+    close_streams = []
+    if stdinfile:
+        close_streams.append(stdinfile)
+    next_stdin = stdinfile
+
     for i in xrange(len(cmd)):
         if i == len(cmd)-1:
-            next_cmd_stdout = stdoutfile
+            # this is the last command in the pipeline, so its stdout should go to stdoutfile
+            next_stdout = stdoutfile
         else:
-            next_cmd_stdout = subprocess.PIPE
-        sp = subprocess.Popen(cmd, shell=False, stdin=next_cmd_stdin, stdout=next_cmd_stdout)
-        next_cmd_stdin = sp.stdout
+            # this is an intermediate command in the pipeline, so its stdout should go to a pipe
+            next_stdout = subprocess.PIPE
 
-        sig = SigHandler()
+        sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
 
-        # forward signals to the process.
-        signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
-        signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
-        signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
+        # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
+        # consuming process ends prematurely.
+        if sp.stdout:
+            close_streams.append(sp.stdout)
+
+        # Send this processes's stdout to to the next process's stdin
+        next_stdin = sp.stdout
 
         subprocesses.append(sp)
 
+    # File descriptors have been handed off to the subprocesses, so close them here.
+    for s in close_streams:
+        s.close()
+
+    # Set up signal handling
+    sig = SigHandler()
+
+    # Forward terminate signals to the subprocesses.
+    signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
+    signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
+    signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
+
+    active = 1
+    while active > 0:
+        os.waitpid(0, 0)
+        active = sum([1 if s.poll() is None else 0 for s in subprocesses])
+
     # wait for process to complete.
-    rcode = sp.wait()
+    rcode = subprocesses[len(subprocesses)-1].returncode
 
     if sig.sig is not None:
         logger.critical("terminating on signal %s" % sig.sig)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list