[arvados] created: 2.6.0-566-g8d73d90c86
git repository hosting
git at public.arvados.org
Thu Sep 7 20:31:01 UTC 2023
at 8d73d90c86b8da6bf43af42d8038b16a082ec080 (commit)
commit 8d73d90c86b8da6bf43af42d8038b16a082ec080
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 16:30:14 2023 -0400
20933: Use acrContainerImage where available
also refs #20592
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index f37550a4a1..63b3d578eb 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -422,6 +422,8 @@ def main(args=sys.argv[1:],
# unit tests.
stdout = None
+ executor.loadingContext.default_docker_image = arvargs.submit_runner_image or "arvados/jobs:"+__version__
+
if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
executor.loadingContext.do_validate = False
if arvargs.submit and not workflow_op:
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index a94fdac522..11ae66b139 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -560,13 +560,19 @@ class RunnerContainer(Runner):
}
self.job_order[param] = {"$include": mnt}
+ container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+
+ workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+ if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+ container_image = workflow_runner_req.get("acrContainerImage")
+
container_req = {
"name": self.name,
"output_path": "/var/spool/cwl",
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+ "container_image": container_image,
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index b66e8ad3aa..86fecc0a1d 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -10,6 +10,7 @@ from ._version import __version__
from functools import partial
from schema_salad.sourceline import SourceLine
from cwltool.errors import WorkflowException
+from arvados.util import portable_data_hash_pattern
def validate_cluster_target(arvrunner, runtimeContext):
if (runtimeContext.submit_runner_cluster and
@@ -61,8 +62,12 @@ class ArvadosCommandTool(CommandLineTool):
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if not docker_req:
- self.hints.append({"class": "DockerRequirement",
- "dockerPull": "arvados/jobs:"+__version__})
+ if portable_data_hash_pattern.match(loadingContext.default_docker_image):
+ self.hints.append({"class": "DockerRequirement",
+ "http://arvados.org/cwl#dockerCollectionPDH": loadingContext.default_docker_image})
+ else:
+ self.hints.append({"class": "DockerRequirement",
+ "dockerPull": loadingContext.default_docker_image})
self.arvrunner = arvrunner
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index c5d56fb656..cdce3d643a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -29,7 +29,7 @@ from cwltool.load_tool import fetch_document, resolve_and_validate_document
from cwltool.process import shortname, uniquename
from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
-from cwltool.context import LoadingContext
+from cwltool.context import LoadingContext, getdefault
from schema_salad.ref_resolver import file_uri, uri_file_path
@@ -412,9 +412,10 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
hints.append(wf_runner_resources)
- wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
- submit_runner_image or "arvados/jobs:"+__version__,
- runtimeContext)
+ if "acrContainerImage" not in wf_runner_resources:
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ submit_runner_image or "arvados/jobs:"+__version__,
+ runtimeContext)
if submit_runner_ram:
wf_runner_resources["ramMin"] = submit_runner_ram
@@ -594,8 +595,18 @@ class ArvadosWorkflow(Workflow):
self.dynamic_resource_req = []
self.static_resource_req = []
self.wf_reffiles = []
- self.loadingContext = loadingContext
- super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+ self.loadingContext = loadingContext.copy()
+
+ self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
+ tool_requirements = toolpath_object.get("requirements", [])
+ self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
+ tool_hints = toolpath_object.get("hints", [])
+
+ workflow_runner_req, _ = self.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+ if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+ self.loadingContext.default_docker_image = workflow_runner_req.get("acrContainerImage")
+
+ super(ArvadosWorkflow, self).__init__(toolpath_object, self.loadingContext)
self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
def job(self, joborder, output_callback, runtimeContext):
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index c7b9f5284d..dd64879b9f 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -7,6 +7,7 @@ from collections import namedtuple
class ArvLoadingContext(LoadingContext):
def __init__(self, kwargs=None):
+ self.default_docker_image = None
super(ArvLoadingContext, self).__init__(kwargs)
class ArvRuntimeContext(RuntimeContext):
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 0d177137bf..330dba3dbe 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -649,6 +649,10 @@ The 'jobs' API is no longer supported.
runtimeContext.copy_deps = True
runtimeContext.match_local_docker = True
+ if runtimeContext.print_keep_deps:
+ runtimeContext.copy_deps = False
+ runtimeContext.match_local_docker = False
+
if runtimeContext.update_workflow and self.project_uuid is None:
# If we are updating a workflow, make sure anything that
# gets uploaded goes into the same parent project, unless
diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py
index 10fe9d7024..4042674035 100755
--- a/sdk/python/arvados/commands/arv_copy.py
+++ b/sdk/python/arvados/commands/arv_copy.py
@@ -309,21 +309,16 @@ def copy_workflow(wf_uuid, src, dst, args):
# copy collections and docker images
if args.recursive and wf["definition"]:
- wf_def = yaml.safe_load(wf["definition"])
- if wf_def is not None:
- locations = []
- docker_images = {}
- graph = wf_def.get('$graph', None)
- if graph is not None:
- workflow_collections(graph, locations, docker_images)
- else:
- workflow_collections(wf_def, locations, docker_images)
-
- if locations:
- copy_collections(locations, src, dst, args)
-
- for image in docker_images:
- copy_docker_image(image, docker_images[image], src, dst, args)
+ env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
+ "ARVADOS_API_TOKEN": src.api_token,
+ "PATH": os.environ["PATH"]}
+ result = subprocess.run(["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
+ env=env)
+ print(result)
+ exit()
+
+ #if locations:
+ # copy_collections(locations, src, dst, args)
# copy the workflow itself
del wf['uuid']
commit fd17a167629eff63ebe2bc57454d0007b4600d39
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 7 14:37:05 2023 -0400
20933: Adding --print-keep-deps to assist arv-copy
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 8108934aae..f37550a4a1 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -124,6 +124,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
+ exgroup.add_argument("--print-keep-deps", action="store_true", help="To assist copying, print a list of Keep collections that this workflow depends on.")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
default=True, dest="wait")
@@ -325,7 +327,9 @@ def main(args=sys.argv[1:],
return 1
arvargs.work_api = want_api
- if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+ workflow_op = arvargs.create_workflow or arvargs.update_workflow or arvargs.print_keep_deps
+
+ if workflow_op and not arvargs.job_order:
job_order_object = ({}, "")
add_arv_hints()
@@ -420,7 +424,7 @@ def main(args=sys.argv[1:],
if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow) or arvargs.workflow.startswith("keep:"):
executor.loadingContext.do_validate = False
- if arvargs.submit:
+ if arvargs.submit and not workflow_op:
executor.fast_submit = True
return cwltool.main.main(args=arvargs,
@@ -433,4 +437,4 @@ def main(args=sys.argv[1:],
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
runtimeContext=executor.toplevel_runtimeContext,
- input_required=not (arvargs.create_workflow or arvargs.update_workflow))
+ input_required=not workflow_op)
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index cddcd15c54..c5d56fb656 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -292,7 +292,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
# Find the longest common prefix among all the file names. We'll
# use this to recreate the directory structure in a keep
# collection with correct relative references.
- prefix = common_prefix(firstfile, all_files)
+ prefix = common_prefix(firstfile, all_files) if firstfile else ""
+
col = arvados.collection.Collection(api_client=arvRunner.api)
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 125527f783..c7b9f5284d 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -43,6 +43,7 @@ class ArvRuntimeContext(RuntimeContext):
self.varying_url_params = ""
self.prefer_cached_downloads = False
self.cached_docker_lookups = {}
+ self.print_keep_deps = False
super(ArvRuntimeContext, self).__init__(kwargs)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index ce8aa42095..0d177137bf 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -34,7 +34,7 @@ from arvados.errors import ApiError
import arvados_cwl.util
from .arvcontainer import RunnerContainer, cleanup_name_for_collection
-from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps
from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
@@ -671,12 +671,10 @@ The 'jobs' API is no longer supported.
# are going to wait for the result, and always_submit_runner
# is false, then we don't submit a runner process.
- submitting = (runtimeContext.update_workflow or
- runtimeContext.create_workflow or
- (runtimeContext.submit and not
+ submitting = (runtimeContext.submit and not
(updated_tool.tool["class"] == "CommandLineTool" and
runtimeContext.wait and
- not runtimeContext.always_submit_runner)))
+ not runtimeContext.always_submit_runner))
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
@@ -702,7 +700,7 @@ The 'jobs' API is no longer supported.
loadingContext.skip_resolve_all = True
workflow_wrapper = None
- if submitting and not self.fast_submit:
+ if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
# upload workflow and get back the workflow wrapper
workflow_wrapper = upload_workflow(self, tool, job_order,
@@ -725,6 +723,11 @@ The 'jobs' API is no longer supported.
self.stdout.write(uuid + "\n")
return (None, "success")
+ if runtimeContext.print_keep_deps:
+ # Just find and print out all the collection dependencies and exit
+ print_keep_deps(tool)
+ return (None, "success")
+
# Did not register a workflow, we're going to submit
# it instead.
loadingContext.loader.idx.clear()
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 4432813f6a..4d211d3ed6 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -946,3 +946,27 @@ class Runner(Process):
self.arvrunner.output_callback({}, "permanentFail")
else:
self.arvrunner.output_callback(outputs, processStatus)
+
+
+def print_keep_deps_visitor(references, doc_loader, tool):
+ def collect_locators(obj):
+ loc = obj.get("location", "") or obj.get("http://arvados.org/cwl#dockerCollectionPDH", "")
+
+ g = arvados.util.keepuri_pattern.match(loc)
+ if g and g[1] not in references:
+ references.append(g[1])
+
+ sc_result = scandeps(tool["id"], tool,
+ set(),
+ set(("location", "id")),
+ None, urljoin=doc_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ visit_class(sc_result, ("File", "Directory"), collect_locators)
+
+
+def print_keep_deps(tool):
+ references = []
+
+ tool.visit(partial(print_keep_deps_visitor, references, tool.doc_loader))
+ print(json.dumps(references))
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 16f2999ca8..c1ba790241 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -35,6 +35,8 @@ link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
job_uuid_pattern = re.compile(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}')
container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
+keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+\d+)/(.*)')
+keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+\d+)/(.*)')
def _deprecated(version=None, preferred=None):
"""Mark a callable as deprecated in the SDK
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list