[ARVADOS] updated: 7fadfb046ace2dbda699037545c5504e99446046
git at public.curoverse.com
git at public.curoverse.com
Mon Oct 13 16:37:24 EDT 2014
Summary of changes:
crunch_scripts/run-command | 88 +++++++++++-----
sdk/python/arvados/commands/run.py | 209 +++++++++++++++++++++++++++++++++++++
sdk/python/bin/arv-run | 4 +
3 files changed, 272 insertions(+), 29 deletions(-)
create mode 100644 sdk/python/arvados/commands/run.py
create mode 100755 sdk/python/bin/arv-run
via 7fadfb046ace2dbda699037545c5504e99446046 (commit)
via c0cead2ee16700cf14baba7993297e83787aedbb (commit)
via fee9a5e5f58a830082374e19787a44185a2a0fdb (commit)
via dd131dfc2686e378a7d90f5cc269340ab5adada9 (commit)
via 3f726dd7022da6e2be816ba6cc493212596087f5 (commit)
via c9df4289923a621c929920fe958dbde287f29d73 (commit)
via 0fdfa049801418ecd1faf33ec1415f3b689ea761 (commit)
via 4f4ad25bf60751a09e316dca8c29cf3628ad7bdc (commit)
via c8a5c25611c964d2af8ba26b88622b70692257e4 (commit)
from 1ad626b28816840288093d94a12ea7694201364b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7fadfb046ace2dbda699037545c5504e99446046
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 16:37:14 2014 -0400
3609: Parallelizing on -- works, added tentative support for --group.
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 97278c8..7e95083 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -21,11 +21,14 @@ arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
arvrun_parser.add_argument('--git-dir', type=str, default="")
arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
-class UploadFile(object):
+class ArvFile(object):
def __init__(self, prefix, fn):
self.prefix = prefix
self.fn = fn
+class UploadFile(ArvFile):
+ pass
+
def is_in_collection(root, branch):
if root == "/":
return (None, None)
@@ -46,7 +49,7 @@ def statfile(prefix, fn):
sp = os.path.split(absfn)
(pdh, branch) = is_in_collection(sp[0], sp[1])
if pdh:
- return "%s$(file %s/%s)" % (prefix, pdh, branch)
+ return ArvFile(prefix, "$(file %s/%s)" % (pdh, branch))
else:
# trim leading '/' for path prefix test later
return UploadFile(prefix, absfn[1:])
@@ -132,15 +135,17 @@ def main(arguments=None):
else:
pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
- for i in xrange(1, len(slots)):
- slots[i] = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, UploadFile) else c for c in slots[i]]
+ for c in files:
+ c.fn = "$(file %s/%s)" % (pdh, c.fn)
+
+ for i in xrange(1, len(slots)):
+ slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
component = {
"script": "run-command",
"script_version": "3609-arv-run",
"repository": "arvados",
"script_parameters": {
- "command": slots[2:]
},
"runtime_constraints": {
"docker_image": args.docker_image
@@ -148,6 +153,27 @@ def main(arguments=None):
}
task_foreach = []
+ group_parser = argparse.ArgumentParser()
+ group_parser.add_argument('--group', type=str)
+ group_parser.add_argument('args', nargs=argparse.REMAINDER)
+
+ for s in xrange(2, len(slots)):
+ for i in xrange(0, len(slots[s])):
+ if slots[s][i] == '--':
+ inp = "input%i" % s
+ groupargs = group_parser.parse_args(slots[2][i+1:])
+ component["script_parameters"][inp] = groupargs.args
+ if groupargs.group:
+ inpgroups = inp+"_groups"
+ component["script_parameters"][inpgroups] = {"group":inp, "regex":groupargs.group}
+ slots[s] = slots[s][0:i] + [{"foreach": inpgroups, "command": "$(%s)" % inpgroups}]
+ task_foreach.append(inpgroups)
+ else:
+ slots[s] = slots[s][0:i] + ["$(%s)" % inp]
+ task_foreach.append(inp)
+ break
+ if slots[s][i] == '\--':
+ slots[s][i] = '--'
if slots[0]:
component["script_parameters"]["task.stdout"] = slots[0][0]
@@ -159,6 +185,8 @@ def main(arguments=None):
if task_foreach:
component["script_parameters"]["task.foreach"] = task_foreach
+ component["script_parameters"]["command"] = slots[2:]
+
pipeline = {
"name": "",
"components": {
@@ -174,6 +202,7 @@ def main(arguments=None):
else:
api = arvados.api('v1')
pi = api.pipeline_instances().create(body=pipeline).execute()
+ print "Running pipeline %s" % pi["uuid"]
#ws.main(["--pipeline", pi["uuid"]])
if __name__ == '__main__':
commit c0cead2ee16700cf14baba7993297e83787aedbb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 15:34:25 2014 -0400
Search up path to see if a file is in a collection. Refactor to work towards supporting pipelines.
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 6cebb44..97278c8 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -10,79 +10,115 @@ import put
import time
#import arvados.command.ws as ws
import subprocess
+import logging
+
+logger = logging.getLogger('arvados.arv-run')
arvrun_parser = argparse.ArgumentParser()
arvrun_parser.add_argument('--dry-run', action="store_true")
arvrun_parser.add_argument('--local', action="store_true")
arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
arvrun_parser.add_argument('--git-dir', type=str, default="")
-arvrun_parser.add_argument('command')
arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
-needupload_files = []
-
-class ArvFile(object):
+class UploadFile(object):
def __init__(self, prefix, fn):
self.prefix = prefix
self.fn = fn
-def statfile(prefix, fn, pattern):
+def is_in_collection(root, branch):
+ if root == "/":
+ return (None, None)
+ fn = os.path.join(root, ".arvados#collection")
+ if os.path.exists(fn):
+ with file(fn, 'r') as f:
+ c = json.load(f)
+ return (c["portable_data_hash"], branch)
+ else:
+ sp = os.path.split(root)
+ return is_in_collection(sp[0], os.path.join(sp[1], branch))
+
+def statfile(prefix, fn):
absfn = os.path.abspath(fn)
if os.path.exists(absfn):
- fn = os.path.abspath(fn)
- st = os.stat(fn)
+ st = os.stat(absfn)
if stat.S_ISREG(st.st_mode):
- mount = os.path.dirname(fn)+"/.arvados#collection"
- if os.path.exists(mount):
- with file(mount, 'r') as f:
- c = json.load(f)
- return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")"
+ sp = os.path.split(absfn)
+ (pdh, branch) = is_in_collection(sp[0], sp[1])
+ if pdh:
+ return "%s$(file %s/%s)" % (prefix, pdh, branch)
else:
- needupload_files.append(fn)
- return ArvFile(prefix, fn[1:])
+ # trim leading '/' for path prefix test later
+ return UploadFile(prefix, absfn[1:])
return prefix+fn
def main(arguments=None):
args = arvrun_parser.parse_args(arguments)
+ reading_into = 2
+
+ slots = [[], [], []]
+ for c in args.args:
+ if c == '>':
+ reading_into = 0
+ elif c == '<':
+ reading_into = 1
+ elif c == '|':
+ reading_into = len(slots)
+ slots.append([])
+ else:
+ slots[reading_into].append(c)
+
+ if slots[0] and len(slots[0]) > 1:
+ logger.error("Can only specify a single stdout file (run-command substitutions are permitted)")
+ return
+
patterns = [re.compile("(--[^=]+=)(.*)"),
re.compile("(-[^=]+=)(.*)"),
re.compile("(-.)(.+)")]
- commandargs = []
-
- for a in args.args:
- if a[0] == '-':
- matched = False
- for p in patterns:
- m = p.match(a)
- if m:
- commandargs.append(statfile(m.group(1), m.group(2), p))
- matched = True
- break
- if not matched:
- commandargs.append(a)
- else:
- commandargs.append(statfile('', a, None))
+ for command in slots[1:]:
+ for i in xrange(0, len(command)):
+ a = command[i]
+ if a[0] == '-':
+ # parameter starts with '-' so it might be a command line
+ # parameter with a file name, do some pattern matching
+ matched = False
+ for p in patterns:
+ m = p.match(a)
+ if m:
+ command[i] = statfile(m.group(1), m.group(2))
+ break
+ else:
+ # parameter might be a file, so test it
+ command[i] = statfile('', a)
n = True
pathprefix = "/"
- files = [c for c in commandargs if isinstance(c, ArvFile)]
+ files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
if len(files) > 0:
+ # Find the smallest path prefix that includes all the files that need to be uploaded.
+ # This starts at the root and iteratively removes common parent directory prefixes
+ # until all file pathes no longer have a common parent.
while n:
pathstep = None
for c in files:
if pathstep is None:
sp = c.fn.split('/')
if len(sp) < 2:
+ # no parent directories left
n = False
break
+ # path step takes next directory
pathstep = sp[0] + "/"
else:
+ # check if pathstep is common prefix for all files
if not c.fn.startswith(pathstep):
n = False
break
if n:
+ # pathstep is common parent directory for all files, so remove the prefix
+ # from each path
pathprefix += pathstep
for c in files:
c.fn = c.fn[len(pathstep):]
@@ -96,48 +132,37 @@ def main(arguments=None):
else:
pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
- commandargs = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, ArvFile) else c for c in commandargs]
-
- cut = None
- i = -1
- stdio = [None, None]
- for j in xrange(0, len(commandargs)):
- c = commandargs[j]
- if c == '<':
- stdio[0] = []
- i = 0
- cut = j if cut is None else cut
- elif c == '>':
- stdio[1] = []
- i = 1
- cut = j if cut is None else cut
- elif i > -1:
- stdio[i].append(c)
-
- if cut is not None:
- commandargs = commandargs[:cut]
+ for i in xrange(1, len(slots)):
+ slots[i] = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, UploadFile) else c for c in slots[i]]
component = {
"script": "run-command",
"script_version": "3609-arv-run",
"repository": "arvados",
"script_parameters": {
- "command": [args.command]+commandargs
+ "command": slots[2:]
},
"runtime_constraints": {
"docker_image": args.docker_image
}
}
- if stdio[0]:
- component["script_parameters"]["task.stdin"] = stdio[0][0]
- if stdio[1]:
- component["script_parameters"]["task.stdout"] = stdio[1][0]
+ task_foreach = []
+
+ if slots[0]:
+ component["script_parameters"]["task.stdout"] = slots[0][0]
+ if slots[1]:
+ task_foreach.append("stdin")
+ component["script_parameters"]["stdin"] = slots[1]
+ component["script_parameters"]["task.stdin"] = "$(stdin)"\
+
+ if task_foreach:
+ component["script_parameters"]["task.foreach"] = task_foreach
pipeline = {
"name": "",
"components": {
- args.command: component
+ "command": component
},
"state":"RunningOnServer"
}
commit fee9a5e5f58a830082374e19787a44185a2a0fdb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 13:38:38 2014 -0400
New wait logic, report all exit codes.
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 609572e..fb4f3c7 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -178,7 +178,6 @@ stdoutname = None
stdoutfile = None
stdinname = None
stdinfile = None
-rcode = 1
def recursive_foreach(params, fvars):
var = fvars[0]
@@ -309,29 +308,20 @@ try:
signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
active = 1
- while active > 0:
- try:
- os.waitpid(0, 0)
- except OSError as e:
- if e.errno == errno.ECHILD:
- # child already exited
- print "got ECHILD"
- time.sleep(1)
- for s in subprocesses:
- print s.poll()
- else:
- raise
- active = sum([1 if s.poll() is None else 0 for s in subprocesses])
- print "active is %i" % active
-
- # wait for process to complete.
- rcode = subprocesses[len(subprocesses)-1].returncode
+ pids = set([s.pid for s in subprocesses])
+ rcode = {}
+ while len(pids) > 0:
+ (pid, status) = os.wait()
+ pids.discard(pid)
+ rcode[pid] = (status >> 8)
if sig.sig is not None:
logger.critical("terminating on signal %s" % sig.sig)
sys.exit(2)
else:
- logger.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
+ for i in xrange(len(cmd)):
+ r = rcode[subprocesses[i].pid]
+ logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
except Exception as e:
logger.exception("caught exception")
@@ -360,10 +350,12 @@ if "task.vwd" in taskp:
else:
outcollection = robust_put.upload(outdir, logger)
+success = reduce(lambda x, y: x & (y == 0), [True]+rcode.values())
+
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
'output': outcollection,
- 'success': (rcode == 0),
+ 'success': success,
'progress':1.0
}).execute()
commit dd131dfc2686e378a7d90f5cc269340ab5adada9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 13:18:15 2014 -0400
sleep so it doesn't go haywire
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 2af422f..609572e 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -316,7 +316,9 @@ try:
if e.errno == errno.ECHILD:
# child already exited
print "got ECHILD"
- pass
+ time.sleep(1)
+ for s in subprocesses:
+ print s.poll()
else:
raise
active = sum([1 if s.poll() is None else 0 for s in subprocesses])
commit 3f726dd7022da6e2be816ba6cc493212596087f5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 12:50:33 2014 -0400
Use branch
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 2d966b6..6cebb44 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -119,7 +119,7 @@ def main(arguments=None):
component = {
"script": "run-command",
- "script_version": "0fdfa049801418ecd1faf33ec1415f3b689ea761",
+ "script_version": "3609-arv-run",
"repository": "arvados",
"script_parameters": {
"command": [args.command]+commandargs
commit c9df4289923a621c929920fe958dbde287f29d73
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 12:50:00 2014 -0400
testing
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index edc0f59..2af422f 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -315,10 +315,12 @@ try:
except OSError as e:
if e.errno == errno.ECHILD:
# child already exited
+ print "got ECHILD"
pass
else:
raise
active = sum([1 if s.poll() is None else 0 for s in subprocesses])
+ print "active is %i" % active
# wait for process to complete.
rcode = subprocesses[len(subprocesses)-1].returncode
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 9df0a84..2d966b6 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -119,7 +119,7 @@ def main(arguments=None):
component = {
"script": "run-command",
- "script_version": "4f4ad25bf60751a09e316dca8c29cf3628ad7bdc",
+ "script_version": "0fdfa049801418ecd1faf33ec1415f3b689ea761",
"repository": "arvados",
"script_parameters": {
"command": [args.command]+commandargs
commit 0fdfa049801418ecd1faf33ec1415f3b689ea761
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 12:46:15 2014 -0400
Catch ECHILD from os.waitpid()
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 34419b4..edc0f59 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -28,6 +28,7 @@ import crunchutil.vwd as vwd
import argparse
import json
import tempfile
+import errno
parser = argparse.ArgumentParser()
parser.add_argument('--dry-run', action='store_true')
@@ -309,7 +310,14 @@ try:
active = 1
while active > 0:
- os.waitpid(0, 0)
+ try:
+ os.waitpid(0, 0)
+ except OSError as e:
+ if e.errno == errno.ECHILD:
+ # child already exited
+ pass
+ else:
+ raise
active = sum([1 if s.poll() is None else 0 for s in subprocesses])
# wait for process to complete.
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index e27b3df..9df0a84 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -7,12 +7,15 @@ import re
import os
import stat
import put
-import arvados.events
import time
+#import arvados.command.ws as ws
+import subprocess
arvrun_parser = argparse.ArgumentParser()
arvrun_parser.add_argument('--dry-run', action="store_true")
+arvrun_parser.add_argument('--local', action="store_true")
arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
+arvrun_parser.add_argument('--git-dir', type=str, default="")
arvrun_parser.add_argument('command')
arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
@@ -116,7 +119,7 @@ def main(arguments=None):
component = {
"script": "run-command",
- "script_version": "bf243e064a7a2ee4e69a87dc3ba46e949a545150",
+ "script_version": "4f4ad25bf60751a09e316dca8c29cf3628ad7bdc",
"repository": "arvados",
"script_parameters": {
"command": [args.command]+commandargs
@@ -141,21 +144,12 @@ def main(arguments=None):
if args.dry_run:
print(json.dumps(pipeline, indent=4))
+ elif args.local:
+ subprocess.call(["arv-crunch-job", "--job", json.dumps(component), "--git-dir", args.git_dir])
else:
api = arvados.api('v1')
pi = api.pipeline_instances().create(body=pipeline).execute()
- ws = None
- def report(x):
- if "event_type" in x:
- print "\n"
- print x
- if x["event_type"] == "stderr":
- print x["properties"]["text"]
- elif x["event_type"] == "update" and x["properties"]["new_attributes"]["state"] in ["Complete", "Failed"]:
- ws.close_connection()
-
- ws = arvados.events.subscribe(api, [["object_uuid", "=", pi["uuid"]], ["event_type", "in", ["stderr", "update"]]], report)
- ws.run_forever()
+ #ws.main(["--pipeline", pi["uuid"]])
if __name__ == '__main__':
main()
commit 4f4ad25bf60751a09e316dca8c29cf3628ad7bdc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Sep 16 22:47:44 2014 -0400
3609: Fix trimming redirect parts of the command line.
3609: Collect command line arguments, uploads files, builds arv-run submission. Needs work on event listener.
3609: Now print log messages for submitted pipeline.
3609: Fix trimming redirect parts of the command line.
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
new file mode 100644
index 0000000..e27b3df
--- /dev/null
+++ b/sdk/python/arvados/commands/run.py
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+
+import arvados
+import argparse
+import json
+import re
+import os
+import stat
+import put
+import arvados.events
+import time
+
+arvrun_parser = argparse.ArgumentParser()
+arvrun_parser.add_argument('--dry-run', action="store_true")
+arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs")
+arvrun_parser.add_argument('command')
+arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
+
+needupload_files = []
+
+class ArvFile(object):
+ def __init__(self, prefix, fn):
+ self.prefix = prefix
+ self.fn = fn
+
+def statfile(prefix, fn, pattern):
+ absfn = os.path.abspath(fn)
+ if os.path.exists(absfn):
+ fn = os.path.abspath(fn)
+ st = os.stat(fn)
+ if stat.S_ISREG(st.st_mode):
+ mount = os.path.dirname(fn)+"/.arvados#collection"
+ if os.path.exists(mount):
+ with file(mount, 'r') as f:
+ c = json.load(f)
+ return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")"
+ else:
+ needupload_files.append(fn)
+ return ArvFile(prefix, fn[1:])
+ return prefix+fn
+
+def main(arguments=None):
+ args = arvrun_parser.parse_args(arguments)
+
+ patterns = [re.compile("(--[^=]+=)(.*)"),
+ re.compile("(-[^=]+=)(.*)"),
+ re.compile("(-.)(.+)")]
+
+ commandargs = []
+
+ for a in args.args:
+ if a[0] == '-':
+ matched = False
+ for p in patterns:
+ m = p.match(a)
+ if m:
+ commandargs.append(statfile(m.group(1), m.group(2), p))
+ matched = True
+ break
+ if not matched:
+ commandargs.append(a)
+ else:
+ commandargs.append(statfile('', a, None))
+
+ n = True
+ pathprefix = "/"
+ files = [c for c in commandargs if isinstance(c, ArvFile)]
+ if len(files) > 0:
+ while n:
+ pathstep = None
+ for c in files:
+ if pathstep is None:
+ sp = c.fn.split('/')
+ if len(sp) < 2:
+ n = False
+ break
+ pathstep = sp[0] + "/"
+ else:
+ if not c.fn.startswith(pathstep):
+ n = False
+ break
+ if n:
+ pathprefix += pathstep
+ for c in files:
+ c.fn = c.fn[len(pathstep):]
+
+ os.chdir(pathprefix)
+
+ if args.dry_run:
+ print("cd %s" % pathprefix)
+ print("arv-put \"%s\"" % '" "'.join([c.fn for c in files]))
+ pdh = "$(input)"
+ else:
+ pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
+
+ commandargs = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, ArvFile) else c for c in commandargs]
+
+ cut = None
+ i = -1
+ stdio = [None, None]
+ for j in xrange(0, len(commandargs)):
+ c = commandargs[j]
+ if c == '<':
+ stdio[0] = []
+ i = 0
+ cut = j if cut is None else cut
+ elif c == '>':
+ stdio[1] = []
+ i = 1
+ cut = j if cut is None else cut
+ elif i > -1:
+ stdio[i].append(c)
+
+ if cut is not None:
+ commandargs = commandargs[:cut]
+
+ component = {
+ "script": "run-command",
+ "script_version": "bf243e064a7a2ee4e69a87dc3ba46e949a545150",
+ "repository": "arvados",
+ "script_parameters": {
+ "command": [args.command]+commandargs
+ },
+ "runtime_constraints": {
+ "docker_image": args.docker_image
+ }
+ }
+
+ if stdio[0]:
+ component["script_parameters"]["task.stdin"] = stdio[0][0]
+ if stdio[1]:
+ component["script_parameters"]["task.stdout"] = stdio[1][0]
+
+ pipeline = {
+ "name": "",
+ "components": {
+ args.command: component
+ },
+ "state":"RunningOnServer"
+ }
+
+ if args.dry_run:
+ print(json.dumps(pipeline, indent=4))
+ else:
+ api = arvados.api('v1')
+ pi = api.pipeline_instances().create(body=pipeline).execute()
+ ws = None
+ def report(x):
+ if "event_type" in x:
+ print "\n"
+ print x
+ if x["event_type"] == "stderr":
+ print x["properties"]["text"]
+ elif x["event_type"] == "update" and x["properties"]["new_attributes"]["state"] in ["Complete", "Failed"]:
+ ws.close_connection()
+
+ ws = arvados.events.subscribe(api, [["object_uuid", "=", pi["uuid"]], ["event_type", "in", ["stderr", "update"]]], report)
+ ws.run_forever()
+
+if __name__ == '__main__':
+ main()
diff --git a/sdk/python/bin/arv-run b/sdk/python/bin/arv-run
new file mode 100755
index 0000000..41f5fd3
--- /dev/null
+++ b/sdk/python/bin/arv-run
@@ -0,0 +1,4 @@
+#!/usr/bin/env python
+
+from arvados.commands.run import main
+main()
commit c8a5c25611c964d2af8ba26b88622b70692257e4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 13 12:02:02 2014 -0400
3609: Piped commands works
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index b9f1ea6..34419b4 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -53,7 +53,7 @@ if not args.dry_run:
taskp = arvados.current_task()['parameters']
else:
outdir = "/tmp"
- jobp = json.loads(args.job_parameters)
+ jobp = json.loads(args.script_parameters)
os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
os.environ['CRUNCH_SRC'] = '/tmp/crunche-src'
@@ -91,8 +91,9 @@ class SigHandler(object):
def __init__(self):
self.sig = None
- def send_signal(self, sp, signum):
- sp.send_signal(signum)
+ def send_signal(self, subprocesses, signum):
+ for sp in subprocesses:
+ sp.send_signal(signum)
self.sig = signum
def add_to_group(gr, match):
@@ -235,23 +236,24 @@ try:
if "task.cwd" in taskp:
os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
- if "piped_commands" in taskp:
- cmd = []
- for c in taskp["piped_commands"]:
- cmd += expand_list(taskp, c)
+ cmd = []
+ if isinstance(taskp["command"][0], list):
+ for c in taskp["command"]:
+ cmd.append(expand_list(taskp, c))
else:
- cmd = [expand_list(taskp, taskp["command"])]
+ cmd.append(expand_list(taskp, taskp["command"]))
- if not args.dry_run:
- if "task.stdin" in taskp:
- stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
+ if "task.stdin" in taskp:
+ stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
+ if not args.dry_run:
stdinfile = open(stdinname, "rb")
- if "task.stdout" in taskp:
- stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
+ if "task.stdout" in taskp:
+ stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
+ if not args.dry_run:
stdoutfile = open(stdoutname, "wb")
- logger.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
+ logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
if args.dry_run:
sys.exit(0)
@@ -268,26 +270,50 @@ except Exception as e:
try:
subprocesses = []
- next_cmd_stdin = stdinfile
+ close_streams = []
+ if stdinfile:
+ close_streams.append(stdinfile)
+ next_stdin = stdinfile
+
for i in xrange(len(cmd)):
if i == len(cmd)-1:
- next_cmd_stdout = stdoutfile
+ # this is the last command in the pipeline, so its stdout should go to stdoutfile
+ next_stdout = stdoutfile
else:
- next_cmd_stdout = subprocess.PIPE
- sp = subprocess.Popen(cmd, shell=False, stdin=next_cmd_stdin, stdout=next_cmd_stdout)
- next_cmd_stdin = sp.stdout
+ # this is an intermediate command in the pipeline, so its stdout should go to a pipe
+ next_stdout = subprocess.PIPE
- sig = SigHandler()
+ sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout)
- # forward signals to the process.
- signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
- signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
- signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
+ # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
+ # consuming process ends prematurely.
+ if sp.stdout:
+ close_streams.append(sp.stdout)
+
+ # Send this processes's stdout to to the next process's stdin
+ next_stdin = sp.stdout
subprocesses.append(sp)
+ # File descriptors have been handed off to the subprocesses, so close them here.
+ for s in close_streams:
+ s.close()
+
+ # Set up signal handling
+ sig = SigHandler()
+
+ # Forward terminate signals to the subprocesses.
+ signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
+ signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
+ signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
+
+ active = 1
+ while active > 0:
+ os.waitpid(0, 0)
+ active = sum([1 if s.poll() is None else 0 for s in subprocesses])
+
# wait for process to complete.
- rcode = sp.wait()
+ rcode = subprocesses[len(subprocesses)-1].returncode
if sig.sig is not None:
logger.critical("terminating on signal %s" % sig.sig)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list