[ARVADOS] created: cc9aed221c506cfc28f1b6ca37675dba6543d824
Git user
git at public.curoverse.com
Thu Oct 13 09:56:11 EDT 2016
at cc9aed221c506cfc28f1b6ca37675dba6543d824 (commit)
commit cc9aed221c506cfc28f1b6ca37675dba6543d824
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 12 16:58:25 2016 -0400
10259: When using --submit with jobs API, create RunningOnServer pipeline
instance. This is more similar to the way it works when you run a workflow
from workbench, and ensures that the pipeline will be marked completed even if
the arvados-cwl-runner client goes away.
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 8b1a934..8b6642f 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -2,6 +2,7 @@ import logging
import re
import copy
import json
+import time
from cwltool.process import get_feature, shortname
from cwltool.errors import WorkflowException
@@ -248,27 +249,27 @@ class RunnerJob(Runner):
def run(self, *args, **kwargs):
job_spec = self.arvados_job_spec(*args, **kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
- response = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.uuid = response["uuid"]
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": shortname(self.tool.tool["id"]),
+ "components": {"cwl-runner": job_spec },
+ "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+ logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+ job = None
+ while not job:
+ time.sleep(2)
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+ uuid=self.arvrunner.pipeline["uuid"]).execute(
+ num_retries=self.arvrunner.num_retries)
+ job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+
+ self.uuid = job["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("Submitted job %s", response["uuid"])
-
- if kwargs.get("submit"):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": shortname(self.tool.tool["id"]),
- "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
- "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
+ if job["state"] in ("Complete", "Failed", "Cancelled"):
self.done(response)
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 6674efb..f5fe2c0 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -112,6 +112,37 @@ def stubs(func):
'script_version': 'master',
'script': 'cwl-runner'
}
+ stubs.pipeline_component = stubs.expect_job_spec.copy()
+ stubs.expect_pipeline_instance = {
+ 'name': 'submit_wf.cwl',
+ 'state': 'RunningOnServer',
+ "components": {
+ "cwl-runner": {
+ 'runtime_constraints': {'docker_image': 'arvados/jobs'},
+ 'script_parameters': {
+ 'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
+ 'x': {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'},
+ 'z': {'basename': 'anonymous', 'class': 'Directory',
+ 'listing': [
+ {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
+ ]},
+ 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ },
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'script': 'cwl-runner'
+ }
+ }
+ }
+ stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+ stubs.pipeline_create["uuid"] = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+ stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+ stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+ stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
stubs.expect_container_spec = {
'priority': 1,
@@ -157,11 +188,12 @@ def stubs(func):
class TestSubmit(unittest.TestCase):
+ @mock.patch("time.sleep")
@stubs
- def test_submit(self, stubs):
+ def test_submit(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait",
+ ["--submit", "--no-wait", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
@@ -192,16 +224,16 @@ class TestSubmit(unittest.TestCase):
}, ensure_unique_name=True),
mock.call().execute()])
- expect_job = copy.deepcopy(stubs.expect_job_spec)
- expect_job["owner_uuid"] = stubs.fake_user_uuid
- stubs.api.jobs().create.assert_called_with(
- body=expect_job,
- find_or_create=True)
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_job_uuid + '\n')
+ @mock.patch("time.sleep")
@stubs
- def test_submit_with_project_uuid(self, stubs):
+ def test_submit_with_project_uuid(self, stubs, tm):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
exited = arvados_cwl.main(
@@ -211,11 +243,10 @@ class TestSubmit(unittest.TestCase):
sys.stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- expect_body = copy.deepcopy(stubs.expect_job_spec)
- expect_body["owner_uuid"] = project_uuid
- stubs.api.jobs().create.assert_called_with(
- body=expect_body,
- find_or_create=True)
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = project_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
@stubs
def test_submit_container(self, stubs):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list