[arvados] created: 2.6.0-566-g8d73d90c86

git repository hosting git at public.arvados.org
Thu Sep 7 20:31:01 UTC 2023


        at  8d73d90c86b8da6bf43af42d8038b16a082ec080 (commit)


commit 8d73d90c86b8da6bf43af42d8038b16a082ec080
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 16:30:14 2023 -0400

    20933: Use acrContainerImage where available
    
    also refs #20592
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index f37550a4a1..63b3d578eb 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -422,6 +422,8 @@ def main(args=sys.argv[1:],
         # unit tests.
         stdout = None
 
+    executor.loadingContext.default_docker_image = arvargs.submit_runner_image or "arvados/jobs:"+__version__
+
     if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
         executor.loadingContext.do_validate = False
         if arvargs.submit and not workflow_op:
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index a94fdac522..11ae66b139 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -560,13 +560,19 @@ class RunnerContainer(Runner):
                 }
                 self.job_order[param] = {"$include": mnt}
 
+        container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+
+        workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+            container_image = workflow_runner_req.get("acrContainerImage")
+
         container_req = {
             "name": self.name,
             "output_path": "/var/spool/cwl",
             "cwd": "/var/spool/cwl",
             "priority": self.priority,
             "state": "Committed",
-            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+            "container_image": container_image,
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
                     "kind": "json",
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index b66e8ad3aa..86fecc0a1d 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -10,6 +10,7 @@ from ._version import __version__
 from functools import partial
 from schema_salad.sourceline import SourceLine
 from cwltool.errors import WorkflowException
+from arvados.util import portable_data_hash_pattern
 
 def validate_cluster_target(arvrunner, runtimeContext):
     if (runtimeContext.submit_runner_cluster and
@@ -61,8 +62,12 @@ class ArvadosCommandTool(CommandLineTool):
 
         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
         if not docker_req:
-            self.hints.append({"class": "DockerRequirement",
-                               "dockerPull": "arvados/jobs:"+__version__})
+            if portable_data_hash_pattern.match(loadingContext.default_docker_image):
+                self.hints.append({"class": "DockerRequirement",
+                                   "http://arvados.org/cwl#dockerCollectionPDH": loadingContext.default_docker_image})
+            else:
+                self.hints.append({"class": "DockerRequirement",
+                                   "dockerPull": loadingContext.default_docker_image})
 
         self.arvrunner = arvrunner
 
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index c5d56fb656..cdce3d643a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -29,7 +29,7 @@ from cwltool.load_tool import fetch_document, resolve_and_validate_document
 from cwltool.process import shortname, uniquename
 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
-from cwltool.context import LoadingContext
+from cwltool.context import LoadingContext, getdefault
 
 from schema_salad.ref_resolver import file_uri, uri_file_path
 
@@ -412,9 +412,10 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
         hints.append(wf_runner_resources)
 
-    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
-                                                                  submit_runner_image or "arvados/jobs:"+__version__,
-                                                                  runtimeContext)
+    if "acrContainerImage" not in wf_runner_resources:
+        wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+                                                                      submit_runner_image or "arvados/jobs:"+__version__,
+                                                                      runtimeContext)
 
     if submit_runner_ram:
         wf_runner_resources["ramMin"] = submit_runner_ram
@@ -594,8 +595,18 @@ class ArvadosWorkflow(Workflow):
         self.dynamic_resource_req = []
         self.static_resource_req = []
         self.wf_reffiles = []
-        self.loadingContext = loadingContext
-        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+        self.loadingContext = loadingContext.copy()
+
+        self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
+        tool_requirements = toolpath_object.get("requirements", [])
+        self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
+        tool_hints = toolpath_object.get("hints", [])
+
+        workflow_runner_req, _ = self.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+            self.loadingContext.default_docker_image = workflow_runner_req.get("acrContainerImage")
+
+        super(ArvadosWorkflow, self).__init__(toolpath_object, self.loadingContext)
         self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
 
     def job(self, joborder, output_callback, runtimeContext):
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index c7b9f5284d..dd64879b9f 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -7,6 +7,7 @@ from collections import namedtuple
 
 class ArvLoadingContext(LoadingContext):
     def __init__(self, kwargs=None):
+        self.default_docker_image = None
         super(ArvLoadingContext, self).__init__(kwargs)
 
 class ArvRuntimeContext(RuntimeContext):
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 0d177137bf..330dba3dbe 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -649,6 +649,10 @@ The 'jobs' API is no longer supported.
             runtimeContext.copy_deps = True
             runtimeContext.match_local_docker = True
 
+        if runtimeContext.print_keep_deps:
+            runtimeContext.copy_deps = False
+            runtimeContext.match_local_docker = False
+
         if runtimeContext.update_workflow and self.project_uuid is None:
             # If we are updating a workflow, make sure anything that
             # gets uploaded goes into the same parent project, unless
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 10fe9d7024..4042674035 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -309,21 +309,16 @@ def copy_workflow(wf_uuid, src, dst, args):
 
     # copy collections and docker images
     if args.recursive and wf["definition"]:
-        wf_def = yaml.safe_load(wf["definition"])
-        if wf_def is not None:
-            locations = []
-            docker_images = {}
-            graph = wf_def.get('$graph', None)
-            if graph is not None:
-                workflow_collections(graph, locations, docker_images)
-            else:
-                workflow_collections(wf_def, locations, docker_images)
-
-            if locations:
-                copy_collections(locations, src, dst, args)
-
-            for image in docker_images:
-                copy_docker_image(image, docker_images[image], src, dst, args)
+        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
+               "ARVADOS_API_TOKEN": src.api_token,
+               "PATH": os.environ["PATH"]}
+        result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
+                                env=env)
+        print(result)
+        exit()
+
+        #if locations:
+        #        copy_collections(locations, src, dst, args)
 
     # copy the workflow itself
     del wf['uuid']

commit fd17a167629eff63ebe2bc57454d0007b4600d39
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Sep 7 14:37:05 2023 -0400

    20933: Adding --print-keep-deps to assist arv-copy
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 8108934aae..f37550a4a1 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -124,6 +124,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
     exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
 
+    exgroup.add_argument("--print-keep-deps", action="store_true", help="To assist copying, print a list of Keep collections that this workflow depends on.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
                         default=True, dest="wait")
@@ -325,7 +327,9 @@ def main(args=sys.argv[1:],
             return 1
         arvargs.work_api = want_api
 
-    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+    workflow_op = arvargs.create_workflow or arvargs.update_workflow or arvargs.print_keep_deps
+
+    if workflow_op and not arvargs.job_order:
         job_order_object = ({}, "")
 
     add_arv_hints()
@@ -420,7 +424,7 @@ def main(args=sys.argv[1:],
 
     if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
         executor.loadingContext.do_validate = False
-        if arvargs.submit:
+        if arvargs.submit and not workflow_op:
             executor.fast_submit = True
 
     return cwltool.main.main(args=arvargs,
@@ -433,4 +437,4 @@ def main(args=sys.argv[1:],
                              custom_schema_callback=add_arv_hints,
                              loadingContext=executor.loadingContext,
                              runtimeContext=executor.toplevel_runtimeContext,
-                             input_required=not (arvargs.create_workflow or arvargs.update_workflow))
+                             input_required=not workflow_op)
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index cddcd15c54..c5d56fb656 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -292,7 +292,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
     # Find the longest common prefix among all the file names.  We'll
     # use this to recreate the directory structure in a keep
     # collection with correct relative references.
-    prefix = common_prefix(firstfile, all_files)
+    prefix = common_prefix(firstfile, all_files) if firstfile else ""
+
 
     col = arvados.collection.Collection(api_client=arvRunner.api)
 
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 125527f783..c7b9f5284d 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -43,6 +43,7 @@ class ArvRuntimeContext(RuntimeContext):
         self.varying_url_params = ""
         self.prefer_cached_downloads = False
         self.cached_docker_lookups = {}
+        self.print_keep_deps = False
 
         super(ArvRuntimeContext, self).__init__(kwargs)
 
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index ce8aa42095..0d177137bf 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -34,7 +34,7 @@ from arvados.errors import ApiError
 
 import arvados_cwl.util
 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
-from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps
 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
@@ -671,12 +671,10 @@ The 'jobs' API is no longer supported.
         # are going to wait for the result, and always_submit_runner
         # is false, then we don't submit a runner process.
 
-        submitting = (runtimeContext.update_workflow or
-                      runtimeContext.create_workflow or
-                      (runtimeContext.submit and not
+        submitting = (runtimeContext.submit and not
                        (updated_tool.tool["class"] == "CommandLineTool" and
                         runtimeContext.wait and
-                        not runtimeContext.always_submit_runner)))
+                        not runtimeContext.always_submit_runner))
 
         loadingContext = self.loadingContext.copy()
         loadingContext.do_validate = False
@@ -702,7 +700,7 @@ The 'jobs' API is no longer supported.
         loadingContext.skip_resolve_all = True
 
         workflow_wrapper = None
-        if submitting and not self.fast_submit:
+        if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
             # upload workflow and get back the workflow wrapper
 
             workflow_wrapper = upload_workflow(self, tool, job_order,
@@ -725,6 +723,11 @@ The 'jobs' API is no longer supported.
                 self.stdout.write(uuid + "\n")
                 return (None, "success")
 
+            if runtimeContext.print_keep_deps:
+                # Just find and print out all the collection dependencies and exit
+                print_keep_deps(tool)
+                return (None, "success")
+
             # Did not register a workflow, we're going to submit
             # it instead.
             loadingContext.loader.idx.clear()
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 4432813f6a..4d211d3ed6 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -946,3 +946,27 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
+
+
+def print_keep_deps_visitor(references, doc_loader, tool):
+    def collect_locators(obj):
+        loc = obj.get("location", "") or obj.get("http://arvados.org/cwl#dockerCollectionPDH", "")
+
+        g = arvados.util.keepuri_pattern.match(loc)
+        if g and g[1] not in references:
+            references.append(g[1])
+
+    sc_result = scandeps(tool["id"], tool,
+                         set(),
+                         set(("location", "id")),
+                         None, urljoin=doc_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    visit_class(sc_result, ("File", "Directory"), collect_locators)
+
+
+def print_keep_deps(tool):
+    references = []
+
+    tool.visit(partial(print_keep_deps_visitor, references, tool.doc_loader))
+    print(json.dumps(references))
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 16f2999ca8..c1ba790241 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -35,6 +35,8 @@ link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
 job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
+keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+\d+)/(.*)')
+keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+\d+)/(.*)')
 
 def _deprecated(version=None, preferred=None):
     """Mark a callable as deprecated in the SDK

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list