[ARVADOS] updated: e4ea50a9fb2b1f44352d3adaee5df209c2a72c96
git at public.curoverse.com
git at public.curoverse.com
Fri Apr 3 11:42:31 EDT 2015
Summary of changes:
sdk/python/arvados/commands/cwl_job.py | 213 +++++++++++++++++++++++++----
sdk/python/arvados/commands/cwl_runner.py | 7 +
sdk/python/bin/{arv-cwl-job => cwl-runner} | 2 +-
sdk/python/setup.py | 3 +-
4 files changed, 198 insertions(+), 27 deletions(-)
create mode 100644 sdk/python/arvados/commands/cwl_runner.py
copy sdk/python/bin/{arv-cwl-job => cwl-runner} (53%)
via e4ea50a9fb2b1f44352d3adaee5df209c2a72c96 (commit)
from 2b08beada8fb933dc178fe5b9a52440af4dc4faa (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e4ea50a9fb2b1f44352d3adaee5df209c2a72c96
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 3 11:46:03 2015 -0400
4685: Adding "job executive" to cwl_job.
diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
index 2e8a926..ae94eea 100644
--- a/sdk/python/arvados/commands/cwl_job.py
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -12,8 +12,13 @@ import os
import pprint
import arvados.events
import threading
+import signal
+import arvados.commands.keepdocker
+import tempfile
EX_TEMPFAIL = 75
+TASK_TEMPFAIL = 111
+TASK_CANCELED = 112
def parse_sinfo(sinfo):
nodes = {}
@@ -57,61 +62,210 @@ def determine_resources(slurm_jobid=None, slurm_nodelist=None):
"nodes": nodes,
"slots": slots}
-def run_on_slot(resources, slot, task):
- tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
-
+def run_on_slot(resources, slot, task, task_uuid):
execution_script = Template("""
+set -e
+set -v
+
+read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
+trap "kill -TERM -$$pgrp; exit $TASK_CANCELED" EXIT INT QUIT TERM
+
if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
arv-get $docker_locator/$docker_hash.tar | docker load
fi
+
rm -rf $tmpdir
mkdir -p $tmpdir/job_output $tmpdir/keep
+
if which crunchstat ; then
CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000"
else
CRUNCHSTAT=""
fi
-exec \
- arv-mount --by-id $tmpdir/keep --allow-other --exec \
+set +e
+
+arv-mount --by-id $tmpdir/keep --allow-other --exec \
$$CRUNCHSTAT \
- docker run --attach=stdout --attach=stderr -i --rm --cidfile=$tmpdir/cidfile --sig-proxy \
- --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
- --workdir=/tmp/job_output --user=$$UID $env $docker_hash $cmd
+ docker run --attach=stdout --attach=stderr -i --rm
+ --cidfile=$tmpdir/cidfile --sig-proxy \
+ --volume=$tmpdir/keep:/keep:ro --volume=$tmpdir/job_output:/tmp/job_output:rw \
+ --workdir=/tmp/job_output --user=$$UID $env $docker_hash $cmd $stdin_redirect $stdout_redirect
+
+code=$$?
+
+OUT=`arv-put --portable-data-hash $tmpdir/job_output`
+
+echo "Output is $$OUT"
+
+if [[ -n "$$task_uuid" ]]; then
+ if [[ "$$code" == "0" ]]; then
+ if [[ "$$?" == "0" ]]; then
+ arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": true}"
+ else
+ code=$TASK_TEMPFAIL
+ fi
+ else
+ arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": false}"
+ fi
+fi
+
+rm -rf $tmpdir
+
+exit $$code
""")
- env = ""
- for e in task["environment"]:
- env += " --env=%s=%s" % (e, task["environment"][e])
- ex = execution_script.substitute(docker_hash=task["docker_hash"],
- docker_locator=task["docker_locator"],
- tmpdir=tmpdir,
- env=env,
- cmd=" ".join([pipes.quote(c) for c in task["command"]]))
+ tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
+
+ env = " ".join(["--env=%s=%s" % (pipes.quote(e), pipes.quote(task["environment"][e])) for e in task["environment"]])
+
+ stdin_redirect=""
+ stdout_redirect=""
+
+ if task["stdin"]:
+ stdin_redirect = "< %s/keep/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdin"]))
+
+ if task["stdout"]:
+ stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdout"]))
- print ex
+ ex = execution_script.substitute(docker_hash=pipes.quote(task["docker_hash"]),
+ docker_locator=pipes.quote(task["docker_locator"]),
+ tmpdir=pipes.quote(tmpdir),
+ env=env,
+ cmd=" ".join([pipes.quote(c) for c in task["command"]]),
+ task_uuid=pipes.quote(task_uuid),
+ project_uuid=pipes.quote(project_uuid),
+ stdin_redirect=stdin_redirect,
+ stdout_redirect=stdout_redirect,
+ TASK_CANCELED=TASK_CANCELED,
+ TASK_TEMPFAIL=TASK_TEMPFAIL)
if resources["have_slurm"]:
pass
else:
- resources["slots"][slot]["task"] = subprocess.Popen(ex, shell=True)
- resources["slots"][slot]["task"].wait()
+ slots = resources["slots"]
+ slots[slot]["task"] = task
+ slots[slot]["task"]["__subprocess"] = subprocess.Popen(ex, shell=True)
class TaskEvents(object):
def __init__(self, api_config, resources, job_uuid):
self.resources = resources
+ self.slots = resources["slots"]
self.ws = arvados.events.subscribe(api_from_config("v1", api_config), [["object_uuid", "=", job_uuid]], self.on_event)
self.ws.subscribe([["object_uuid", "is_a", "arvados#jobTask"]])
+ self.task_queue = []
+
+ def next_task(self):
+ while self.task_queue:
+ assigned = False
+ for slot in self.slots:
+ if self.slots[slot]["task"] is None:
+ task = self.task_queue[0]
+ run_on_slot(self.resources, slot, task, task["uuid"])
+ del self.task_queue[0]
+ assigned = True
+ if not assigned:
+ break
+
+ def new_task(self, task):
+ self.task_queue.append(task)
+ self.next_task()
+
+ def finish_task(self, task):
+ for slot in self.slots:
+ if self.slots[slot]["task"]["uuid"] == task["uuid"]:
+ slot["task"] = None
+ self.next_task()
+
+ def cancel_tasks(self):
+ for slot in self.slots:
+ if self.slots[slot]["task"].get("__subprocess") is not None:
+ self.slots[slot]["task"]["__subprocess"].terminate()
+ self.slots[slot]["task"]["__subprocess"].wait()
def on_event(self, ev):
if ev.get('event_type') == "update" and ev['object_kind'] == "arvados#job":
- if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ if ev["properties"]["new_attributes"]["state"] == "Cancelled":
+ self.cancel_tasks()
+ self.ws.close()
+ elif ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed"):
self.ws.close()
elif ev.get('object_kind') == "arvados#jobTask":
if ev.get('event_type') == "create":
- pass
+ self.new_task(ev["properties"]["new_attributes"])
if ev.get('event_type') == "update":
- pass
+ if ev["properties"]["new_attributes"].get("success") is not None:
+ self.finish_task(ev["properties"]["new_attributes"])
+
+def run_executive(api, job, api_config):
+ execution_script = Template("""
+set -e
+set -v
+
+read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
+trap "kill -TERM -$$pgrp; exit $TASK_CANCELED" EXIT INT QUIT TERM
+
+if ! docker images -q --no-trunc --all | grep -qxF $docker_hash ; then
+ arv-get $docker_locator/$docker_hash.tar | docker load
+fi
+
+rm -rf $tmpdir
+mkdir -p $tmpdir
+
+cd $tmpdir
+git init
+git config --local credential.$githttp/.helper '!tok(){ echo password=$ARVADOS_API_TOKEN; };tok'
+git config --local credential.$githttp/.username none
+ARVADOS_API_HOST=$ARVADOS_API_HOST ARVADOS_API_TOKEN=$ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE=$ARVADOS_API_HOST_INSECURE git fetch $githttp/$gitrepo $script_version
+git checkout $script_version
+
+docker run \
+ --env=CRUNCH_JOB_UUID=$job_uuid \
+ --env=ARVADOS_API_HOST=$ARVADOS_API_HOST \
+ --env=ARVADOS_API_TOKEN=$ARVADOS_API_TOKEN \
+ --env=ARVADOS_API_HOST_INSECURE=$ARVADOS_API_HOST_INSECURE \
+ --volume=$tmpdir:/tmp/git:ro \
+ --privileged \
+ --user=$$UID \
+ --rm \
+ --workdir=/tmp/git \
+ $docker_hash /tmp/git/$script
+""")
+
+ tmpdir = "/tmp/%s-%i" % (job["uuid"], random.randint(1, 100000))
+ cr = arvados.CollectionReader(job["docker_image_locator"], api_client=api)
+
+ if len(cr) != 1:
+ raise arvados.errors.ArgumentError("docker_image_locator must only contain a single file")
+
+ docker_image = re.match("([0-9a-f]{40})\.tar", cr.keys()[0])
+ if docker_image:
+ docker_hash = docker_image.group(1)
+ else:
+ raise arvados.errors.ArgumentError("docker_image_locator must contain a docker image")
+
+ ex = execution_script.substitute(docker_hash=docker_hash,
+ docker_locator=job["docker_image_locator"],
+ tmpdir=tmpdir,
+ ARVADOS_API_HOST=pipes.quote(api_config["ARVADOS_API_HOST"]),
+ ARVADOS_API_TOKEN=pipes.quote(api_config["ARVADOS_API_TOKEN"]),
+ ARVADOS_API_HOST_INSECURE=pipes.quote(api_config.get("ARVADOS_API_TOKEN", "0")),
+ TASK_CANCELED=TASK_CANCELED,
+ githttp=pipes.quote(api._rootDesc.get("gitHttpBase")),
+ gitrepo=pipes.quote(job["repository"]))
+
+ if resources["have_slurm"]:
+ pass
+ else:
+ subprocess.Popen(ex, shell=True)
+
+class SigHandler(object):
+ def __init__(self, ts):
+ self.ts = ts
+
+ def send_signal(self, signum, fram):
+ self.ts.ws.close()
+ self.ts.cancel_tasks()
def main(arguments=None):
@@ -121,14 +275,13 @@ def main(arguments=None):
parser.add_argument("--job", type=str)
parser.add_argument("--git-dir", type=str)
- print sys.argv
args = parser.parse_args(arguments)
api = None
if os.environ.get("ARVADOS_API_HOST"):
api_config = {"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
"ARVADOS_API_TOKEN": args.job_api_token,
- "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE")}
+ "ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "0")}
api = api_from_config("v1", api_config)
job_uuid = None
@@ -157,8 +310,18 @@ def main(arguments=None):
if job_uuid:
ts = TaskEvents(api_config, resources, job_uuid)
+ run_executive(api, job, api_config["ARVADOS_API_TOKEN"])
+
+ # Set up signal handling
+ sig = SigHandler(ts)
+
+ # Forward terminate signals to the subprocesses.
+ signal.signal(signal.SIGINT, self.sig)
+ signal.signal(signal.SIGTERM, self.sig)
+ signal.signal(signal.SIGQUIT, self.sig)
+
ts.ws.run_forever()
else:
- run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"])
+ run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"], "")
return 0
diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
new file mode 100644
index 0000000..cb2b62f
--- /dev/null
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -0,0 +1,7 @@
+import arvados
+import cwltool
+
+# Whee!
+
+def main(arguments):
+ return 0
diff --git a/sdk/python/bin/cwl-runner b/sdk/python/bin/cwl-runner
new file mode 100755
index 0000000..354b54e
--- /dev/null
+++ b/sdk/python/bin/cwl-runner
@@ -0,0 +1,6 @@
+#!/usr/bin/env python
+
+import sys
+from arvados.commands.cwl_runner import main
+
+sys.exit(main())
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index ca28025..7aa6b32 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -41,7 +41,8 @@ setup(name='arvados-python-client',
'httplib2',
'requests>=2.4',
'urllib3',
- 'ws4py'
+ 'ws4py',
+ 'cwltool',
],
test_suite='tests',
tests_require=['mock>=1.0', 'PyYAML'],
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list