[ARVADOS] updated: 87e7d0850132db2805096b5bffbcd101ee5dbea2
git at public.curoverse.com
git at public.curoverse.com
Mon Apr 6 22:46:40 EDT 2015
Summary of changes:
sdk/python/arvados/commands/cwl_job.py | 22 +++++++--
sdk/python/tests/test_cwl_job.py | 90 +++++++++++++++++++++++++++++++---
2 files changed, 101 insertions(+), 11 deletions(-)
via 87e7d0850132db2805096b5bffbcd101ee5dbea2 (commit)
from be4dfebdc550a1d8ce663244afd7481995938712 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 87e7d0850132db2805096b5bffbcd101ee5dbea2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Apr 6 22:50:13 2015 -0400
4685: One task test
diff --git a/sdk/python/arvados/commands/cwl_job.py b/sdk/python/arvados/commands/cwl_job.py
index dd136af..6eea2f1 100644
--- a/sdk/python/arvados/commands/cwl_job.py
+++ b/sdk/python/arvados/commands/cwl_job.py
@@ -86,6 +86,21 @@ def determine_resources(slurm_jobid=None, slurm_nodelist=None):
"nodes": nodes,
"slots": slots}
+class TaskMonitor(threading.Thread):
+ def __init__(self, sub, api_config, task):
+ super(TaskMonitor, self).__init__()
+ self.sub = sub
+ self.api_config = api_config
+ self.task = task
+
+ def run(self):
+ self.sub.wait()
+ api = api_from_config("v1", self.api_config)
+ check = api.job_tasks().get(uuid=self.task["uuid"]).execute()
+ if check["success"] is None:
+ # Task didn't set its own success, so mark it failed.
+ api.job_tasks().update(uuid=self.task["uuid"], body={"success": False}).execute()
+
def run_on_slot(resources, slot, task, task_uuid):
execution_script = Template(script_header + """
mkdir $tmpdir/job_output $tmpdir/keep
@@ -126,7 +141,7 @@ echo "Output is $$OUT"
if [[ -n "$$task_uuid" ]]; then
if [[ "$$code" == "0" ]]; then
- if [[ "$$?" == "0" ]]; then
+ if [[ "$$?" != "0" ]]; then
arv job_task update --uuid $task_uuid "{\"output\":\"$$OUT\", "success": true}"
else
code=$TASK_TEMPFAIL
@@ -148,10 +163,10 @@ exit $$code
stdin_redirect=""
stdout_redirect=""
- if task["stdin"]:
+ if task.get("stdin"):
stdin_redirect = "< %s/keep/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdin"]))
- if task["stdout"]:
+ if task.get("stdout"):
stdout_redirect = "> %s/job_output/%s" % (pipes.quote(tmpdir), pipes.quote(task["stdout"]))
ex = execution_script.substitute(docker_hash=pipes.quote(task["docker_hash"]),
@@ -174,6 +189,7 @@ exit $$code
slots = resources["slots"]
slots[slot]["task"] = task
slots[slot]["task"]["__subprocess"] = subprocess.Popen(ex, shell=True)
+ TaskMonitor(slots[slot]["task"]["__subprocess"], api_config, task).start()
class TaskEvents(object):
def __init__(self, api_config, resources, job_uuid):
diff --git a/sdk/python/tests/test_cwl_job.py b/sdk/python/tests/test_cwl_job.py
index e45bee5..a633ad1 100644
--- a/sdk/python/tests/test_cwl_job.py
+++ b/sdk/python/tests/test_cwl_job.py
@@ -7,6 +7,9 @@ import subprocess
import os
import stat
+def chmodx(fn):
+ os.chmod(fn, stat.S_IRUSR|stat.S_IRGRP|stat.S_IROTH|stat.S_IXUSR|stat.S_IXGRP|stat.S_IXOTH)
+
class CwlJobTestCase(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
@@ -21,6 +24,8 @@ class CwlJobTestCase(run_test_server.TestCaseWithServers):
subprocess.check_call(["./build.sh", "cwl-runner-image"])
arvados.commands.keepdocker.upload_image(cls.api_client, ["arvados/cwl-runner"])
+ imgs_in_arv = list_images_in_arv(cls.api_client, 1, image_name="arvados/cwl-runner")
+
repo = cls.api_client.repositories().create(body={"repository": {
"owner_uuid": "zzzzz-tpzed-000000000000000",
"name": "testrepo"
@@ -29,17 +34,54 @@ class CwlJobTestCase(run_test_server.TestCaseWithServers):
os.mkdir(os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo"))
os.chdir(os.path.join(run_test_server.ARVADOS_DIR, "services/api/tmp/git/testrepo"))
subprocess.check_call(["git", "init"])
+
with open("foo", "w") as f:
- f.write("""#!/bin/sh
- echo foo
- """)
- os.chmod("foo", stat.S_IRUSR|stat.S_IRGRP|stat.S_IROTH|stat.S_IXUSR|stat.S_IXGRP|stat.S_IXOTH)
- subprocess.check_call(["git", "add", "foo"])
+ f.write(
+"""#!/bin/sh
+echo foo
+""")
+ chmodx("foo")
+
+ with open("bar", "w") as f:
+ f.write(
+"""#!/bin/sh
+echo bar
+exit 1
+""")
+ chmodx("bar")
+
+ with open("one_task", "w") as f:
+ f.write(
+"""#!/usr/bin/env python
+import arvados
+import arvados.events
+
+ws = None
+
+def on_event(ev):
+ global ws
+ if ev.get('object_kind') == "arvados#jobTask":
+ if ev.get('event_type') == "update":
+ if ev["properties"]["new_attributes"].get("success") is not None:
+ ws.close()
+
+ws = arvados.events.subscribe(arvados.api(), [["object_uuid", "is_a", "arvados#jobTask"]], self.on_event)
+
+arvados.api().job_tasks().create(body={
+ "environment": {},
+ "docker_hash": '""" + imgs_in_arv[0].hash + """',
+ "command": ["ls", "/"]}).execute()
+
+ws.run_forever()
+""")
+ chmodx("one_task")
+
+ subprocess.check_call(["git", "add", "foo", "bar", "one_task"])
subprocess.check_call(["git", "commit", "-mTest"])
def test_parse_sinfo(self):
- nodes = cwl_job.parse_sinfo("""
-16 compute0,compute2
+ nodes = cwl_job.parse_sinfo(
+"""16 compute0,compute2
8 compute1,compute[3-5],compute7
""")
self.assertEqual({"compute0": {"slots": 16},
@@ -61,7 +103,7 @@ class CwlJobTestCase(run_test_server.TestCaseWithServers):
"compute1[3]": {"node": "compute1", "slot": 3, "task": None}},
slots)
- def test_run_job(self):
+ def test_run_job_success(self):
job = CwlJobTestCase.api_client.jobs().create(body={"job": {
"script": "foo",
"script_version": "master",
@@ -76,3 +118,35 @@ class CwlJobTestCase(run_test_server.TestCaseWithServers):
job2 = CwlJobTestCase.api_client.jobs().get(uuid=job["uuid"]).execute()
self.assertEqual(job2["state"], "Complete")
+
+ def test_run_job_fail(self):
+ job = CwlJobTestCase.api_client.jobs().create(body={"job": {
+ "script": "bar",
+ "script_version": "master",
+ "script_parameters": { },
+ "repository": "testrepo",
+ "runtime_constraints": {
+ "docker_image": "arvados/cwl-runner",
+ "cwl_job": True
+ } } }).execute()
+ cwl_job.main(["--job", job["uuid"],
+ "--job-api-token", os.environ["ARVADOS_API_TOKEN"]])
+
+ job2 = CwlJobTestCase.api_client.jobs().get(uuid=job["uuid"]).execute()
+ self.assertEqual(job2["state"], "Failed")
+
+ def test_run_job_one_task(self):
+ job = CwlJobTestCase.api_client.jobs().create(body={"job": {
+ "script": "one_task",
+ "script_version": "master",
+ "script_parameters": { },
+ "repository": "testrepo",
+ "runtime_constraints": {
+ "docker_image": "arvados/cwl-runner",
+ "cwl_job": True
+ } } }).execute()
+ cwl_job.main(["--job", job["uuid"],
+ "--job-api-token", os.environ["ARVADOS_API_TOKEN"]])
+
+ job2 = CwlJobTestCase.api_client.jobs().get(uuid=job["uuid"]).execute()
+ self.assertEqual(job2["state"], "Complete")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list