[arvados] created: 2.1.0-3008-gd825b0330
git repository hosting
git at public.arvados.org
Wed Nov 2 01:35:10 UTC 2022
at d825b0330a1b51d8ccbb25e7dc7d9aac26e781e0 (commit)
commit d825b0330a1b51d8ccbb25e7dc7d9aac26e781e0
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue Nov 1 21:34:35 2022 -0400
19688: Trying a fast path submit for registered workflows
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 08a05d571..9200c5caa 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -359,6 +359,10 @@ def main(args=sys.argv[1:],
# unit tests.
stdout = None
+ if arvargs.submit and arvargs.wait is False and arvargs.workflow.startswith("arvwf:"):
+ executor.loadingContext.do_validate = False
+ executor.fast_submit = True
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 66fe143e0..57ab7367a 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -523,6 +523,15 @@ class RunnerContainer(Runner):
"kind": "collection",
"portable_data_hash": "%s" % workflowcollection
}
+ elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+ packed = yaml.safe_load(record["definition"])
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "content": packed
+ }
+ container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
else:
packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
workflowpath = "/var/lib/cwl/workflow.json#main"
@@ -530,8 +539,6 @@ class RunnerContainer(Runner):
"kind": "json",
"content": packed
}
- if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 5f3feabf8..02c9c7a97 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -147,8 +147,13 @@ class ArvadosWorkflowStep(WorkflowStep):
**argv
): # type: (...) -> None
- super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
- self.tool["class"] = "WorkflowStep"
+ if arvrunner.fast_submit:
+ self.tool = toolpath_object
+ self.tool["inputs"] = []
+ self.tool["outputs"] = []
+ else:
+ super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+ self.tool["class"] = "WorkflowStep"
self.arvrunner = arvrunner
def job(self, joborder, output_callback, runtimeContext):
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 694f77baf..f95ac17e1 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -137,6 +137,7 @@ class ArvCwlExecutor(object):
self.fs_access = None
self.secret_store = None
self.stdout = stdout
+ self.fast_submit = False
if keep_client is not None:
self.keep_client = keep_client
@@ -594,7 +595,8 @@ The 'jobs' API is no longer supported.
controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
- updated_tool.visit(self.check_features)
+ if not self.fast_submit:
+ updated_tool.visit(self.check_features)
self.pipeline = None
self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
@@ -662,7 +664,7 @@ The 'jobs' API is no longer supported.
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
loadingContext.disable_js_validation = True
- if submitting:
+ if submitting and not self.fast_submit:
loadingContext.do_update = False
# Document may have been auto-updated. Reload the original
# document with updating disabled because we want to
@@ -675,9 +677,12 @@ The 'jobs' API is no longer supported.
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- logger.info("Uploading workflow dependencies")
- with Perf(metrics, "upload_workflow_deps"):
- merged_map = upload_workflow_deps(self, tool, runtimeContext)
+ if not self.fast_submit:
+ logger.info("Uploading workflow dependencies")
+ with Perf(metrics, "upload_workflow_deps"):
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
+ else:
+ merged_map = {}
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list