[ARVADOS] created: 1.3.0-9-gfc53ce7d5

Git user git at public.curoverse.com
Fri Dec 7 16:05:55 EST 2018


        at  fc53ce7d5a7bd7974e8dac163238073e26e8b5d6 (commit)


commit fc53ce7d5a7bd7974e8dac163238073e26e8b5d6
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Dec 7 14:55:55 2018 -0500

    14476: Fix RunnerTemplate
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index c6616c987..2e4ef5501 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -386,19 +386,21 @@ class RunnerTemplate(object):
     }
 
     def __init__(self, runner, tool, job_order, enable_reuse, uuid,
-                 submit_runner_ram=0, name=None, merged_map=None):
+                 submit_runner_ram=0, name=None, merged_map=None,
+                 loadingContext=None):
         self.runner = runner
         self.embedded_tool = tool
         self.job = RunnerJob(
             runner=runner,
             tool=tool,
-            job_order=job_order,
             enable_reuse=enable_reuse,
             output_name=None,
             output_tags=None,
             submit_runner_ram=submit_runner_ram,
             name=name,
-            merged_map=merged_map)
+            merged_map=merged_map,
+            loadingContext=loadingContext)
+        self.job.job_order = job_order
         self.uuid = uuid
 
     def pipeline_component_spec(self):
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 8eeb95117..195eda695 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -559,7 +559,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                       uuid=existing_uuid,
                                       submit_runner_ram=runtimeContext.submit_runner_ram,
                                       name=runtimeContext.name,
-                                      merged_map=merged_map)
+                                      merged_map=merged_map,
+                                      loadingContext=loadingContext)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return (tmpl.uuid, "success")

commit d48d4acf9ac040aab9d961506e91f862d67850b7
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Dec 7 14:15:28 2018 -0500

    14476: When creating a Runner, check input is valid before submitting
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c49a449b..4f8c0338b 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -414,8 +414,8 @@ class RunnerContainer(Runner):
             "properties": {}
         }
 
-        if self.tool.tool.get("id", "").startswith("keep:"):
-            sp = self.tool.tool["id"].split('/')
+        if self.embedded_tool.tool.get("id", "").startswith("keep:"):
+            sp = self.embedded_tool.tool["id"].split('/')
             workflowcollection = sp[0][5:]
             workflowname = "/".join(sp[1:])
             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
@@ -424,14 +424,14 @@ class RunnerContainer(Runner):
                 "portable_data_hash": "%s" % workflowcollection
             }
         else:
-            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
                 "content": packed
             }
-            if self.tool.tool.get("id", "").startswith("arvwf:"):
-                container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+            if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+                container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
 
 
         # --local means execute the workflow instead of submitting a container request
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 9a03372d3..c6616c987 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -296,10 +296,10 @@ class RunnerJob(Runner):
         a pipeline template or pipeline instance.
         """
 
-        if self.tool.tool["id"].startswith("keep:"):
-            self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
+        if self.embedded_tool.tool["id"].startswith("keep:"):
+            self.job_order["cwl:tool"] = self.embedded_tool.tool["id"][5:]
         else:
-            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
             wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
@@ -388,7 +388,7 @@ class RunnerTemplate(object):
     def __init__(self, runner, tool, job_order, enable_reuse, uuid,
                  submit_runner_ram=0, name=None, merged_map=None):
         self.runner = runner
-        self.tool = tool
+        self.embedded_tool = tool
         self.job = RunnerJob(
             runner=runner,
             tool=tool,
@@ -420,7 +420,7 @@ class RunnerTemplate(object):
         job_params = spec['script_parameters']
         spec['script_parameters'] = {}
 
-        for param in self.tool.tool['inputs']:
+        for param in self.embedded_tool.tool['inputs']:
             param = copy.deepcopy(param)
 
             # Data type and "required" flag...
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 100096ab1..8eeb95117 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -618,11 +618,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
             if self.work_api == "containers":
                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
                     runtimeContext.runnerjob = tool.tool["id"]
-                    runnerjob = tool.job(job_order,
-                                         self.output_callback,
-                                         runtimeContext).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+                    tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
                                                 self.output_name,
                                                 self.output_tags,
                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -636,7 +633,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                                 collection_cache_size=runtimeContext.collection_cache_size,
                                                 collection_cache_is_default=self.should_estimate_cache_size)
             elif self.work_api == "jobs":
-                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+                tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
                                       self.output_name,
                                       self.output_tags,
                                       submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -654,10 +651,16 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-        if runnerjob and not runtimeContext.wait:
-            submitargs = runtimeContext.copy()
-            submitargs.submit = False
-            runnerjob.run(submitargs)
+        if runtimeContext.cwl_runner_job is not None:
+            self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+
+        jobiter = tool.job(job_order,
+                           self.output_callback,
+                           runtimeContext)
+
+        if runtimeContext.submit and not runtimeContext.wait:
+            runnerjob = jobiter.next()
+            runnerjob.run(runtimeContext)
             return (runnerjob.uuid, "success")
 
         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
@@ -672,14 +675,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         try:
             self.workflow_eval_lock.acquire()
-            if runnerjob:
-                jobiter = iter((runnerjob,))
-            else:
-                if runtimeContext.cwl_runner_job is not None:
-                    self.uuid = runtimeContext.cwl_runner_job.get('uuid')
-                jobiter = tool.job(job_order,
-                                   self.output_callback,
-                                   runtimeContext)
 
             # Holds the lock while this code runs and releases it when
             # it is safe to do so in self.workflow_eval_lock.wait(),
@@ -728,8 +723,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            if runnerjob and runnerjob.uuid and self.work_api == "containers":
-                self.api.container_requests().update(uuid=runnerjob.uuid,
+            if runtimeContext.submit and isinstance(tool, Runner):
+                runnerjob = tool
+                if runnerjob and runnerjob.uuid and self.work_api == "containers":
+                    self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.workflow_eval_lock.release()
@@ -744,8 +741,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-        if runtimeContext.submit and isinstance(runnerjob, Runner):
-            logger.info("Final output collection %s", runnerjob.final_output)
+        if runtimeContext.submit and isinstance(tool, Runner):
+            logger.info("Final output collection %s", tool.final_output)
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 6094cfe24..1d5f98f20 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -16,7 +16,7 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
@@ -356,25 +356,28 @@ def upload_workflow_collection(arvrunner, name, packed):
     return collection.portable_data_hash()
 
 
-class Runner(object):
+class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, tool, job_order, enable_reuse,
+    def __init__(self, runner, tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
                  priority=None, secret_store=None,
                  collection_cache_size=256,
                  collection_cache_is_default=True):
+
+        super(Runner, self).__init__(tool.tool, loadingContext)
+
         self.arvrunner = runner
-        self.tool = tool
-        self.job_order = job_order
+        self.embedded_tool = tool
+        self.job_order = None
         self.running = False
         if enable_reuse:
             # If reuse is permitted by command line arguments but
             # disabled by the workflow itself, disable it.
-            reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
+            reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
@@ -393,7 +396,7 @@ class Runner(object):
         self.submit_runner_ram = 1024  # defaut 1 GiB
         self.collection_cache_size = collection_cache_size
 
-        runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
         if runner_resource_req:
             if runner_resource_req.get("coresMin"):
                 self.submit_runner_cores = runner_resource_req["coresMin"]
@@ -414,6 +417,15 @@ class Runner(object):
 
         self.merged_map = merged_map or {}
 
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):  # type: (...) -> Generator[Any, None, None]
+        self.job_order = job_order
+        self._init_job(job_order, runtimeContext)
+        yield self
+
     def update_pipeline_component(self, record):
         pass
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list