[ARVADOS] updated: 42008041173b0f7dcc4e90023517767396c64f78
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 7 14:30:27 EDT 2015
Summary of changes:
sdk/python/arvados/commands/cwl_job.py | 82 +++++++++++++++++++----------
sdk/python/arvados/commands/keepdocker.py | 9 +++-
sdk/python/arvados/events.py | 4 ++
sdk/python/tests/test_cwl_job.py | 28 +++++++---
services/api/config/application.default.yml | 2 +-
5 files changed, 87 insertions(+), 38 deletions(-)
via 42008041173b0f7dcc4e90023517767396c64f78 (commit)
from 87e7d0850132db2805096b5bffbcd101ee5dbea2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 42008041173b0f7dcc4e90023517767396c64f78
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Apr 7 14:33:32 2015 -0400
4685: Can now run tasks!
diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
index 6eea2f1..9350d72 100644
--- a/sdk/python/arvados/commands/cwl_job.py
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -15,6 +15,7 @@ import threading
import signal
import arvados.commands.keepdocker
import tempfile
+import pprint
EX_TEMPFAIL = 75
TASK_TEMPFAIL = 111
@@ -99,14 +100,15 @@ class TaskMonitor(threading.Thread):
check = api.job_tasks().get(uuid=self.task["uuid"]).execute()
if check["success"] is None:
# Task didn't set its own success, so mark it failed.
- api.job_tasks().update(uuid=self.task["uuid"], body={"success": False}).execute()
+ check = api.job_tasks().update(uuid=self.task["uuid"], body={"success": False}).execute()
+ print "Task %s completed with success %s" % (self.task["uuid"], check["success"])
-def run_on_slot(resources, slot, task, task_uuid):
+def run_on_slot(resources, api_config, slot, task):
execution_script = Template(script_header + """
mkdir $tmpdir/job_output $tmpdir/keep
if which crunchstat ; then
- CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$tmpdir/cidfile -poll=10000"
+ CRUNCHSTAT="crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000"
else
CRUNCHSTAT=""
fi
@@ -121,7 +123,7 @@ arv-mount --by-id $tmpdir/keep --allow-other --exec \
--attach=stderr \
-i \
--rm \
- --cidfile=$tmpdir/cidfile \
+ --cidfile=$cidfile \
--sig-proxy \
--volume=$tmpdir/keep:/keep:ro \
--volume=$tmpdir/job_output:/tmp/job_output:rw \
@@ -135,19 +137,26 @@ arv-mount --by-id $tmpdir/keep --allow-other --exec \
code=$$?
-OUT=`arv-put --portable-data-hash $tmpdir/job_output`
+echo "Docker exit code is $$code"
+
+OUT=`arv-put --portable-data-hash --batch-progress $tmpdir/job_output`
+arv_put_code=$$?
+echo "arv-put exit code is $$arv_put_code"
echo "Output is $$OUT"
-if [[ -n "$$task_uuid" ]]; then
- if [[ "$$code" == "0" ]]; then
- if [[ "$$?" != "0" ]]; then
- arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": true}"
+if test -n "$task_uuid" ; then
+ if test "$$code" = "0" ; then
+ if test "$$arv_put_code" = "0" ; then
+ echo "Task success"
+ arv job_task update --uuid $task_uuid --job-task "{\\"output\\":\\"$$OUT\\", \\"success\\": true}"
else
+ echo "Task temporary failure"
code=$TASK_TEMPFAIL
fi
else
- arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": false}"
+ echo "Task failed"
+ arv job_task update --uuid $task_uuid --job-task "{\\"output\\":\\"$$OUT\\", \\"success\\": false}"
fi
fi
@@ -156,25 +165,29 @@ rm -rf $tmpdir
exit $$code
""")
+ pprint.pprint(task)
+
tmpdir = "/tmp/%s-%i" % (slot, random.randint(1, 100000))
- env = " ".join(["--env=%s=%s" % (pipes.quote(e), pipes.quote(task["environment"][e])) for e in task["environment"]])
+ params = task["parameters"]
+
+ env = " ".join(["--env=%s=%s" % (pipes.quote(e), pipes.quote(params["environment"][e])) for e in params.get("environment", {})])
stdin_redirect=""
stdout_redirect=""
- if task.get("stdin"):
- stdin_redirect = "< %s/keep/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdin"]))
+ if params.get("stdin"):
+ stdin_redirect = "< %s/keep/%s" % (pipes.quote(tmpdir), pipes.quote(params["stdin"]))
- if task.get("stdout"):
- stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdout"]))
+ if params.get("stdout"):
+ stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(params["stdout"]))
- ex = execution_script.substitute(docker_hash=pipes.quote(task["docker_hash"]),
+ ex = execution_script.substitute(docker_hash=pipes.quote(params["docker_hash"]),
tmpdir=pipes.quote(tmpdir),
+ cidfile=os.path.join(tmpdir, "cidfile"),
env=env,
- cmd=" ".join([pipes.quote(c) for c in task["command"]]),
- task_uuid=pipes.quote(task_uuid),
- project_uuid=pipes.quote(project_uuid),
+ cmd=" ".join([pipes.quote(c) for c in params["command"]]),
+ task_uuid=pipes.quote(task["uuid"]),
stdin_redirect=stdin_redirect,
stdout_redirect=stdout_redirect,
TASK_CANCELED=TASK_CANCELED,
@@ -183,13 +196,16 @@ exit $$code
ARVADOS_API_TOKEN=pipes.quote(api_config["ARVADOS_API_TOKEN"]),
ARVADOS_API_HOST_INSECURE=pipes.quote(api_config.get("ARVADOS_API_HOST_INSECURE", "0")))
+
if resources["have_slurm"]:
pass
else:
slots = resources["slots"]
slots[slot]["task"] = task
- slots[slot]["task"]["__subprocess"] = subprocess.Popen(ex, shell=True)
- TaskMonitor(slots[slot]["task"]["__subprocess"], api_config, task).start()
+ sub = subprocess.Popen(ex, shell=True, stdin=subprocess.PIPE, close_fds=True)
+ sub.stdin.close()
+ slots[slot]["task"]["__subprocess"] = sub
+ TaskMonitor(sub, api_config, task).start()
class TaskEvents(object):
def __init__(self, api_config, resources, job_uuid):
@@ -198,6 +214,7 @@ class TaskEvents(object):
self.ws = arvados.events.subscribe(api_from_config("v1", api_config), [["object_uuid", "=", job_uuid]], self.on_event)
self.ws.subscribe([["object_uuid", "is_a", "arvados#jobTask"]])
self.task_queue = []
+ self.api_config = api_config
def next_task(self):
while self.task_queue:
@@ -205,9 +222,17 @@ class TaskEvents(object):
for slot in self.slots:
if self.slots[slot]["task"] is None:
task = self.task_queue[0]
- run_on_slot(self.resources, slot, task["parameters"], task["uuid"])
- del self.task_queue[0]
- assigned = True
+ try:
+ assigned = True
+ run_on_slot(self.resources, self.api_config, slot, task)
+ del self.task_queue[0]
+ except:
+ api = api_from_config("v1", self.api_config)
+ check = api.job_tasks().get(uuid=task["uuid"]).execute()
+ if check["success"] is None:
+ api.job_tasks().update(uuid=task["uuid"], body={"success": False}).execute()
+ break
+
if not assigned:
break
@@ -217,8 +242,8 @@ class TaskEvents(object):
def finish_task(self, task):
for slot in self.slots:
- if self.slots[slot]["task"]["uuid"] == task["uuid"]:
- slot["task"] = None
+ if self.slots[slot]["task"] and self.slots[slot]["task"]["uuid"] == task["uuid"]:
+ self.slots[slot]["task"] = None
self.next_task()
def cancel_tasks(self):
@@ -307,7 +332,8 @@ docker run \
if resources["have_slurm"]:
pass
else:
- sub = subprocess.Popen(ex, shell=True)
+ sub = subprocess.Popen(ex, shell=True, stdin=subprocess.PIPE, close_fds=True)
+ sub.stdin.close()
JobMonitor(sub, api_config, job).start()
return sub
@@ -380,6 +406,6 @@ def main(arguments=None):
ts.ws.run_forever()
else:
- run_on_slot(resources, resources["slots"].keys()[0], job["script_parameters"], "")
+ run_on_slot(resources, api_config, resources["slots"].keys()[0], {"parameters": job["script_parameters"]})
return 0
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index 145deb1..59e0c69 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -196,12 +196,17 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None,
if image_name:
image_link_name = "{}:{}".format(image_name, image_tag or 'latest')
docker_image_filters.append(['name', '=', image_link_name])
- if image_hash:
+ elif image_hash:
docker_image_filters.append(['name', '=', image_hash])
- if image_collection:
+ elif image_collection:
docker_image_filters.append(['head_uuid', '=', image_collection])
existing_links = list_all(api_client.links().list, num_retries, filters=docker_image_filters)
+
+ if image_name or image_hash:
+ existing_links = list_all(api_client.links().list, num_retries,
+ filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']],
+ ['head_uuid', 'in', [u['head_uuid'] for u in existing_links]]])
images = {}
for link in existing_links:
collection_uuid = link["head_uuid"]
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 09f2a87..71697da 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -32,8 +32,10 @@ class EventClient(WebSocketClient):
super(EventClient, self).__init__(url, ssl_options=ssl_options)
self.filters = filters
self.on_event = on_event
+ self.ready = threading.Event()
def opened(self):
+ self.ready.set()
self.subscribe(self.filters)
def received_message(self, m):
@@ -47,12 +49,14 @@ class EventClient(WebSocketClient):
pass
def subscribe(self, filters, last_log_id=None):
+ self.ready.wait()
m = {"method": "subscribe", "filters": filters}
if last_log_id is not None:
m["last_log_id"] = last_log_id
self.send(json.dumps(m))
def unsubscribe(self, filters):
+ self.ready.wait()
self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
class PollClient(threading.Thread):
diff --git a/sdk/python/tests/test_cwl_job.py b/sdk/python/tests/test_cwl_job.py
index a633ad1..9de589d 100644
--- a/sdk/python/tests/test_cwl_job.py
+++ b/sdk/python/tests/test_cwl_job.py
@@ -6,6 +6,8 @@ import run_test_server
import subprocess
import os
import stat
+import arvados.commands.keepdocker
+import re
def chmodx(fn):
os.chmod(fn, stat.S_IRUSR|stat.S_IRGRP|stat.S_IROTH|stat.S_IXUSR|stat.S_IXGRP|stat.S_IXOTH)
@@ -20,19 +22,25 @@ class CwlJobTestCase(run_test_server.TestCaseWithServers):
super(CwlJobTestCase, cls).setUpClass()
run_test_server.authorize_with("admin")
cls.api_client = arvados.api('v1')
+
+ host = re.match("^https?://([^/]+)", cls.api_client._rootDesc.get('rootUrl'))
+ os.environ["ARVADOS_API_HOST"] = host.group(1)
+
os.chdir(os.path.join(run_test_server.ARVADOS_DIR, "docker"))
subprocess.check_call(["./build.sh", "cwl-runner-image"])
arvados.commands.keepdocker.upload_image(cls.api_client, ["arvados/cwl-runner"])
+ arvados.commands.keepdocker.upload_image(cls.api_client, ["arvados/debian", "wheezy"])
- imgs_in_arv = list_images_in_arv(cls.api_client, 1, image_name="arvados/cwl-runner")
+ imgs_in_arv = arvados.commands.keepdocker.list_images_in_arv(cls.api_client, 1, image_name="arvados/debian", image_tag="wheezy")
repo = cls.api_client.repositories().create(body={"repository": {
"owner_uuid": "zzzzz-tpzed-000000000000000",
"name": "testrepo"
}}).execute()
- os.mkdir(os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo"))
- os.chdir(os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo"))
+ testrepo = os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo")
+ os.mkdir(testrepo)
+ os.chdir(testrepo)
subprocess.check_call(["git", "init"])
with open("foo", "w") as f:
@@ -55,24 +63,30 @@ exit 1
"""#!/usr/bin/env python
import arvados
import arvados.events
+import sys
ws = None
+retcode = None
def on_event(ev):
global ws
+ global retcode
if ev.get('object_kind') == "arvados#jobTask":
if ev.get('event_type') == "update":
if ev["properties"]["new_attributes"].get("success") is not None:
+ retcode = 0 if ev["properties"]["new_attributes"]["success"] else 1
ws.close()
-ws = arvados.events.subscribe(arvados.api(), [["object_uuid", "is_a", "arvados#jobTask"]], self.on_event)
+ws = arvados.events.subscribe(arvados.api(), [["object_uuid", "is_a", "arvados#jobTask"]], on_event)
-arvados.api().job_tasks().create(body={
+arvados.api().job_tasks().create(body={"parameters": {
"environment": {},
- "docker_hash": '""" + imgs_in_arv[0].hash + """',
- "command": ["ls", "/"]}).execute()
+ "docker_hash": '""" + imgs_in_arv[0][1]["dockerhash"] + """',
+ "command": ["ls", "/"]} }).execute()
ws.run_forever()
+
+sys.exit(retcode)
""")
chmodx("one_task")
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index 53b787b..6a31986 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -45,7 +45,7 @@ test:
blob_signing_key: zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc
user_profile_notification_address: arvados at example.com
workbench_address: https://localhost:3001/
- git_repositories_dir: <%= "#{Dir.pwd}/tmp/git" %>
+ git_repositories_dir: <%= Rails.root.join("tmp", "git") %>
git_http_base: http://localhost:3005
common:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list