[arvados] created: 2.1.0-3008-gd825b0330

git repository hosting git at public.arvados.org
Wed Nov 2 01:35:10 UTC 2022


        at  d825b0330a1b51d8ccbb25e7dc7d9aac26e781e0 (commit)


commit d825b0330a1b51d8ccbb25e7dc7d9aac26e781e0
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Tue Nov 1 21:34:35 2022 -0400

    19688: Trying a fast path submit for registered workflows
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 08a05d571..9200c5caa 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -359,6 +359,10 @@ def main(args=sys.argv[1:],
         # unit tests.
         stdout = None
 
+    if arvargs.submit and arvargs.wait is False and arvargs.workflow.startswith("arvwf:"):
+        executor.loadingContext.do_validate = False
+        executor.fast_submit = True
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 66fe143e0..57ab7367a 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -523,6 +523,15 @@ class RunnerContainer(Runner):
                 "kind": "collection",
                 "portable_data_hash": "%s" % workflowcollection
             }
+        elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+            workflowpath = "/var/lib/cwl/workflow.json#main"
+            record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+            packed = yaml.safe_load(record["definition"])
+            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+                "kind": "json",
+                "content": packed
+            }
+            container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
         else:
             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
             workflowpath = "/var/lib/cwl/workflow.json#main"
@@ -530,8 +539,6 @@ class RunnerContainer(Runner):
                 "kind": "json",
                 "content": packed
             }
-            if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
-                container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
 
         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
 
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 5f3feabf8..02c9c7a97 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -147,8 +147,13 @@ class ArvadosWorkflowStep(WorkflowStep):
                  **argv
                 ):  # type: (...) -> None
 
-        super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
-        self.tool["class"] = "WorkflowStep"
+        if arvrunner.fast_submit:
+            self.tool = toolpath_object
+            self.tool["inputs"] = []
+            self.tool["outputs"] = []
+        else:
+            super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+            self.tool["class"] = "WorkflowStep"
         self.arvrunner = arvrunner
 
     def job(self, joborder, output_callback, runtimeContext):
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 694f77baf..f95ac17e1 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -137,6 +137,7 @@ class ArvCwlExecutor(object):
         self.fs_access = None
         self.secret_store = None
         self.stdout = stdout
+        self.fast_submit = False
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -594,7 +595,8 @@ The 'jobs' API is no longer supported.
         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
 
-        updated_tool.visit(self.check_features)
+        if not self.fast_submit:
+            updated_tool.visit(self.check_features)
 
         self.pipeline = None
         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
@@ -662,7 +664,7 @@ The 'jobs' API is no longer supported.
         loadingContext = self.loadingContext.copy()
         loadingContext.do_validate = False
         loadingContext.disable_js_validation = True
-        if submitting:
+        if submitting and not self.fast_submit:
             loadingContext.do_update = False
             # Document may have been auto-updated. Reload the original
             # document with updating disabled because we want to
@@ -675,9 +677,12 @@ The 'jobs' API is no longer supported.
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        logger.info("Uploading workflow dependencies")
-        with Perf(metrics, "upload_workflow_deps"):
-            merged_map = upload_workflow_deps(self, tool, runtimeContext)
+        if not self.fast_submit:
+            logger.info("Uploading workflow dependencies")
+            with Perf(metrics, "upload_workflow_deps"):
+                merged_map = upload_workflow_deps(self, tool, runtimeContext)
+        else:
+            merged_map = {}
 
         # Recreate process object (ArvadosWorkflow or
         # ArvadosCommandTool) because tool document may have been

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list