[ARVADOS] created: cc9aed221c506cfc28f1b6ca37675dba6543d824

Git user git at public.curoverse.com
Thu Oct 13 09:56:11 EDT 2016


        at  cc9aed221c506cfc28f1b6ca37675dba6543d824 (commit)


commit cc9aed221c506cfc28f1b6ca37675dba6543d824
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 12 16:58:25 2016 -0400

    10259: When using --submit with jobs API, create RunningOnServer pipeline
    instance.  This is more similar to the way it works when you run a workflow
    from workbench, and ensures that the pipeline will be marked completed even if
    the arvados-cwl-runner client goes away.

diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 8b1a934..8b6642f 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -2,6 +2,7 @@ import logging
 import re
 import copy
 import json
+import time
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
@@ -248,27 +249,27 @@ class RunnerJob(Runner):
 
     def run(self, *args, **kwargs):
         job_spec = self.arvados_job_spec(*args, **kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
-        response = self.arvrunner.api.jobs().create(
-            body=job_spec,
-            find_or_create=self.enable_reuse
-        ).execute(num_retries=self.arvrunner.num_retries)
-
-        self.uuid = response["uuid"]
+        self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+            body={
+                "owner_uuid": self.arvrunner.project_uuid,
+                "name": shortname(self.tool.tool["id"]),
+                "components": {"cwl-runner": job_spec },
+                "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+        logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+        job = None
+        while not job:
+            time.sleep(2)
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+                uuid=self.arvrunner.pipeline["uuid"]).execute(
+                    num_retries=self.arvrunner.num_retries)
+            job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+
+        self.uuid = job["uuid"]
         self.arvrunner.processes[self.uuid] = self
 
-        logger.info("Submitted job %s", response["uuid"])
-
-        if kwargs.get("submit"):
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "name": shortname(self.tool.tool["id"]),
-                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
-                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
+        if job["state"] in ("Complete", "Failed", "Cancelled"):
             self.done(response)
 
 
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 6674efb..f5fe2c0 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -112,6 +112,37 @@ def stubs(func):
             'script_version': 'master',
             'script': 'cwl-runner'
         }
+        stubs.pipeline_component = stubs.expect_job_spec.copy()
+        stubs.expect_pipeline_instance = {
+            'name': 'submit_wf.cwl',
+            'state': 'RunningOnServer',
+            "components": {
+                "cwl-runner": {
+                    'runtime_constraints': {'docker_image': 'arvados/jobs'},
+                    'script_parameters': {
+                        'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
+                        'x': {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'},
+                        'z': {'basename': 'anonymous', 'class': 'Directory',
+                              'listing': [
+                                  {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
+                              ]},
+                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                    },
+                    'repository': 'arvados',
+                    'script_version': 'master',
+                    'script': 'cwl-runner'
+                }
+            }
+        }
+        stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+        stubs.pipeline_create["uuid"] = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+        stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+        stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+            "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
+        stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+        stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
 
         stubs.expect_container_spec = {
             'priority': 1,
@@ -157,11 +188,12 @@ def stubs(func):
 
 
 class TestSubmit(unittest.TestCase):
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit(self, stubs):
+    def test_submit(self, stubs, tm):
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
-            ["--submit", "--no-wait",
+            ["--submit", "--no-wait", "--debug",
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
@@ -192,16 +224,16 @@ class TestSubmit(unittest.TestCase):
             }, ensure_unique_name=True),
             mock.call().execute()])
 
-        expect_job = copy.deepcopy(stubs.expect_job_spec)
-        expect_job["owner_uuid"] = stubs.fake_user_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_job,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_job_uuid + '\n')
 
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit_with_project_uuid(self, stubs):
+    def test_submit_with_project_uuid(self, stubs, tm):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
         exited = arvados_cwl.main(
@@ -211,11 +243,10 @@ class TestSubmit(unittest.TestCase):
             sys.stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
 
-        expect_body = copy.deepcopy(stubs.expect_job_spec)
-        expect_body["owner_uuid"] = project_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_body,
-            find_or_create=True)
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        expect_pipeline["owner_uuid"] = project_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
 
     @stubs
     def test_submit_container(self, stubs):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list