[ARVADOS] created: 1.3.0-4-gf2b37ad61
Git user
git at public.curoverse.com
Fri Dec 7 14:37:00 EST 2018
at f2b37ad61f90ca600669d864e140e30ba834fc2b (commit)
commit f2b37ad61f90ca600669d864e140e30ba834fc2b
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Dec 7 14:15:28 2018 -0500
14476: When creating a Runner, check input is valid before submitting
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c49a449b..4f8c0338b 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -414,8 +414,8 @@ class RunnerContainer(Runner):
"properties": {}
}
- if self.tool.tool.get("id", "").startswith("keep:"):
- sp = self.tool.tool["id"].split('/')
+ if self.embedded_tool.tool.get("id", "").startswith("keep:"):
+ sp = self.embedded_tool.tool["id"].split('/')
workflowcollection = sp[0][5:]
workflowname = "/".join(sp[1:])
workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
@@ -424,14 +424,14 @@ class RunnerContainer(Runner):
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
- if self.tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+ if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
# --local means execute the workflow instead of submitting a container request
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 9a03372d3..c6616c987 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -296,10 +296,10 @@ class RunnerJob(Runner):
a pipeline template or pipeline instance.
"""
- if self.tool.tool["id"].startswith("keep:"):
- self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
+ if self.embedded_tool.tool["id"].startswith("keep:"):
+ self.job_order["cwl:tool"] = self.embedded_tool.tool["id"][5:]
else:
- packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
@@ -388,7 +388,7 @@ class RunnerTemplate(object):
def __init__(self, runner, tool, job_order, enable_reuse, uuid,
submit_runner_ram=0, name=None, merged_map=None):
self.runner = runner
- self.tool = tool
+ self.embedded_tool = tool
self.job = RunnerJob(
runner=runner,
tool=tool,
@@ -420,7 +420,7 @@ class RunnerTemplate(object):
job_params = spec['script_parameters']
spec['script_parameters'] = {}
- for param in self.tool.tool['inputs']:
+ for param in self.embedded_tool.tool['inputs']:
param = copy.deepcopy(param)
# Data type and "required" flag...
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index 7512d5bef..61f9cbbe0 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -103,6 +103,7 @@ def run():
arvargs.output_name = output_name
arvargs.output_tags = output_tags
arvargs.thread_count = 1
+ arvargs.collection_cache_size = None
runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 100096ab1..8eeb95117 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -618,11 +618,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
runtimeContext.runnerjob = tool.tool["id"]
- runnerjob = tool.job(job_order,
- self.output_callback,
- runtimeContext).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -636,7 +633,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
collection_cache_size=runtimeContext.collection_cache_size,
collection_cache_is_default=self.should_estimate_cache_size)
elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+ tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -654,10 +651,16 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if runnerjob and not runtimeContext.wait:
- submitargs = runtimeContext.copy()
- submitargs.submit = False
- runnerjob.run(submitargs)
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ runtimeContext)
+
+ if runtimeContext.submit and not runtimeContext.wait:
+ runnerjob = jobiter.next()
+ runnerjob.run(runtimeContext)
return (runnerjob.uuid, "success")
current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
@@ -672,14 +675,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
try:
self.workflow_eval_lock.acquire()
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
# Holds the lock while this code runs and releases it when
# it is safe to do so in self.workflow_eval_lock.wait(),
@@ -728,8 +723,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
- if runnerjob and runnerjob.uuid and self.work_api == "containers":
- self.api.container_requests().update(uuid=runnerjob.uuid,
+ if runtimeContext.submit and isinstance(tool, Runner):
+ runnerjob = tool
+ if runnerjob and runnerjob.uuid and self.work_api == "containers":
+ self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
@@ -744,8 +741,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if runtimeContext.submit and isinstance(runnerjob, Runner):
- logger.info("Final output collection %s", runnerjob.final_output)
+ if runtimeContext.submit and isinstance(tool, Runner):
+ logger.info("Final output collection %s", tool.final_output)
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 6094cfe24..1d5f98f20 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -16,7 +16,7 @@ from schema_salad.sourceline import SourceLine, cmap
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
@@ -356,25 +356,28 @@ def upload_workflow_collection(arvrunner, name, packed):
return collection.portable_data_hash()
-class Runner(object):
+class Runner(Process):
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
- def __init__(self, runner, tool, job_order, enable_reuse,
+ def __init__(self, runner, tool, loadingContext, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
intermediate_output_ttl=0, merged_map=None,
priority=None, secret_store=None,
collection_cache_size=256,
collection_cache_is_default=True):
+
+ super(Runner, self).__init__(tool.tool, loadingContext)
+
self.arvrunner = runner
- self.tool = tool
- self.job_order = job_order
+ self.embedded_tool = tool
+ self.job_order = None
self.running = False
if enable_reuse:
# If reuse is permitted by command line arguments but
# disabled by the workflow itself, disable it.
- reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
@@ -393,7 +396,7 @@ class Runner(object):
self.submit_runner_ram = 1024 # defaut 1 GiB
self.collection_cache_size = collection_cache_size
- runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+ runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
if runner_resource_req:
if runner_resource_req.get("coresMin"):
self.submit_runner_cores = runner_resource_req["coresMin"]
@@ -414,6 +417,15 @@ class Runner(object):
self.merged_map = merged_map or {}
+ def job(self,
+ job_order, # type: Mapping[Text, Text]
+ output_callbacks, # type: Callable[[Any, Any], Any]
+ runtimeContext # type: RuntimeContext
+ ): # type: (...) -> Generator[Any, None, None]
+ self.job_order = job_order
+ self._init_job(job_order, runtimeContext)
+ yield self
+
def update_pipeline_component(self, record):
pass
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list