[ARVADOS] created: 76e9d2eeb4a9fd43399393a28e2f2aa9b6f5db27

Git user git at public.curoverse.com
Thu Oct 13 08:35:33 EDT 2016


        at  76e9d2eeb4a9fd43399393a28e2f2aa9b6f5db27 (commit)


commit 76e9d2eeb4a9fd43399393a28e2f2aa9b6f5db27
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 12 16:58:25 2016 -0400

    10221: When using --submit with jobs API, create RunningOnServer pipeline instance.  This is more similar to the way it works when you run a workflow from workbench, and ensures that the pipeline will be marked completed even if the arvados-cwl-runner client goes away.

diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 8b1a934..8b6642f 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -2,6 +2,7 @@ import logging
 import re
 import copy
 import json
+import time
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
@@ -248,27 +249,27 @@ class RunnerJob(Runner):
 
     def run(self, *args, **kwargs):
         job_spec = self.arvados_job_spec(*args, **kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
-        response = self.arvrunner.api.jobs().create(
-            body=job_spec,
-            find_or_create=self.enable_reuse
-        ).execute(num_retries=self.arvrunner.num_retries)
-
-        self.uuid = response["uuid"]
+        self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
+            body={
+                "owner_uuid": self.arvrunner.project_uuid,
+                "name": shortname(self.tool.tool["id"]),
+                "components": {"cwl-runner": job_spec },
+                "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
+        logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
+
+        job = None
+        while not job:
+            time.sleep(2)
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().get(
+                uuid=self.arvrunner.pipeline["uuid"]).execute(
+                    num_retries=self.arvrunner.num_retries)
+            job = self.arvrunner.pipeline["components"]["cwl-runner"].get("job")
+
+        self.uuid = job["uuid"]
         self.arvrunner.processes[self.uuid] = self
 
-        logger.info("Submitted job %s", response["uuid"])
-
-        if kwargs.get("submit"):
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "name": shortname(self.tool.tool["id"]),
-                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
-                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
+        if job["state"] in ("Complete", "Failed", "Cancelled"):
             self.done(response)
 
 
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 6674efb..baf826a 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -112,6 +112,20 @@ def stubs(func):
             'script_version': 'master',
             'script': 'cwl-runner'
         }
+        stubs.pipeline_component = stubs.expect_job_spec.copy()
+        stubs.pipeline_instance = {
+            "uuid": "zzzzz-d1hrv-zzzzzzzzzzzzzzz",
+            "components": {
+                "cwl-runner": {
+                    "job": {
+                        "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+                        "state": "Queued"
+                    }
+                }
+            }
+        }
+        stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_instance
+        stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_instance
 
         stubs.expect_container_spec = {
             'priority': 1,
@@ -157,11 +171,12 @@ def stubs(func):
 
 
 class TestSubmit(unittest.TestCase):
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit(self, stubs):
+    def test_submit(self, stubs, tm):
         capture_stdout = cStringIO.StringIO()
         exited = arvados_cwl.main(
-            ["--submit", "--no-wait",
+            ["--submit", "--no-wait", "--debug",
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
@@ -192,16 +207,17 @@ class TestSubmit(unittest.TestCase):
             }, ensure_unique_name=True),
             mock.call().execute()])
 
-        expect_job = copy.deepcopy(stubs.expect_job_spec)
-        expect_job["owner_uuid"] = stubs.fake_user_uuid
-        stubs.api.jobs().create.assert_called_with(
-            body=expect_job,
+        expect_pipeline = copy.deepcopy(stubs.pipeline_instance)
+        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline,
             find_or_create=True)
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_job_uuid + '\n')
 
+    @mock.patch("time.sleep")
     @stubs
-    def test_submit_with_project_uuid(self, stubs):
+    def test_submit_with_project_uuid(self, stubs, tm):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
         exited = arvados_cwl.main(

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list