[ARVADOS] updated: 2.1.0-2392-g437b5da04
Git user
git at public.arvados.org
Wed May 11 21:21:00 UTC 2022
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 4 +--
sdk/cwl/arvados_cwl/arvcontainer.py | 20 +++++------
sdk/cwl/arvados_cwl/arvworkflow.py | 18 ++++++----
sdk/cwl/arvados_cwl/executor.py | 34 +++++++++---------
sdk/cwl/arvados_cwl/runner.py | 70 ++++++++++++++++++++-----------------
sdk/cwl/tests/test_submit.py | 2 +-
6 files changed, 81 insertions(+), 67 deletions(-)
via 437b5da0464b83659def04df995a31d88668f3d6 (commit)
from 8b4b7e7edb68a9c252bcc57b2d291b0f6e7a7ce9 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 437b5da0464b83659def04df995a31d88668f3d6
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed May 11 17:19:51 2022 -0400
19070: Prefer to use the passed-in runtimeContext
Code cleanup, change most places to use the passed-in runtimeContext
instead of the ArvRunner top level runtimeContext.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 3faa510a0..21b629f37 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -219,7 +219,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
- exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave depenencies where they are.")
+ exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
parser.add_argument(
"--skip-schemas",
@@ -367,5 +367,5 @@ def main(args=sys.argv[1:],
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
- runtimeContext=executor.runtimeContext,
+ runtimeContext=executor.toplevel_runtimeContext,
input_required=not (arvargs.create_workflow or arvargs.update_workflow))
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 33b4c90c6..5082cc2f4 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -466,7 +466,7 @@ class RunnerContainer(Runner):
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
@@ -501,7 +501,7 @@ class RunnerContainer(Runner):
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
@@ -551,17 +551,17 @@ class RunnerContainer(Runner):
if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
- if self.on_error:
+ if runtimeContext.on_error:
command.append("--on-error=" + self.on_error)
- if self.intermediate_output_ttl:
- command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if runtimeContext.intermediate_output_ttl:
+ command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
- if self.arvrunner.trash_intermediate:
+ if runtimeContext.trash_intermediate:
command.append("--trash-intermediate")
- if self.arvrunner.project_uuid:
- command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if runtimeContext.project_uuid:
+ command.append("--project-uuid="+runtimeContext.project_uuid)
if self.enable_dev:
command.append("--enable-dev")
@@ -582,8 +582,8 @@ class RunnerContainer(Runner):
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
job_spec = self.arvados_job_spec(runtimeContext)
- if self.arvrunner.project_uuid:
- job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ if runtimeContext.project_uuid:
+ job_spec["owner_uuid"] = runtimeContext.project_uuid
extra_submit_params = {}
if runtimeContext.submit_runner_cluster:
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 4fe82a6fe..8eb12913a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -37,11 +37,12 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
sum_res_pars = ("outdirMin", "outdirMax")
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+ runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
submit_runner_image=None):
- packed = packed_workflow(arvRunner, tool, merged_map)
+ packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
@@ -57,7 +58,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
upload_dependencies(arvRunner, name, tool.doc_loader,
- packed, tool.tool["id"], False)
+ packed, tool.tool["id"], False,
+ runtimeContext)
wf_runner_resources = None
@@ -72,7 +74,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
hints.append(wf_runner_resources)
- wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ submit_runner_image or "arvados/jobs:"+__version__,
+ runtimeContext)
if submit_runner_ram:
wf_runner_resources["ramMin"] = submit_runner_ram
@@ -194,7 +198,8 @@ class ArvadosWorkflow(Workflow):
self.doc_loader,
joborder,
joborder.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if self.wf_pdh is None:
packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
@@ -237,7 +242,8 @@ class ArvadosWorkflow(Workflow):
self.doc_loader,
packed,
self.tool["id"],
- False)
+ False,
+ runtimeContext)
# Discover files/directories referenced by the
# workflow (mainly "default" values)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 0daa15d5f..70bc0c457 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -197,11 +197,11 @@ The 'jobs' API is no longer supported.
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
- self.runtimeContext = ArvRuntimeContext(vars(arvargs))
- self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
+ self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
- validate_cluster_target(self, self.runtimeContext)
+ validate_cluster_target(self, self.toplevel_runtimeContext)
def arv_make_tool(self, toolpath_object, loadingContext):
@@ -535,6 +535,8 @@ The 'jobs' API is no longer supported.
if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+ runtimeContext = runtimeContext.copy()
+
default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
if runtimeContext.storage_classes == "default":
runtimeContext.storage_classes = default_storage_classes
@@ -544,14 +546,14 @@ The 'jobs' API is no longer supported.
if not runtimeContext.name:
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
- if self.runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
# When creating or updating workflow record, by default
# always copy dependencies and ensure Docker images are up
# to date.
- self.runtimeContext.copy_deps = True
- self.runtimeContext.match_local_docker = True
+ runtimeContext.copy_deps = True
+ runtimeContext.match_local_docker = True
- if self.runtimeContext.update_workflow and self.project_uuid is None:
+ if runtimeContext.update_workflow and self.project_uuid is None:
# If we are updating a workflow, make sure anything that
# gets uploaded goes into the same parent project, unless
# an alternate --project-uuid was provided.
@@ -560,7 +562,7 @@ The 'jobs' API is no longer supported.
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order)
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
@@ -587,7 +589,7 @@ The 'jobs' API is no longer supported.
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
@@ -602,12 +604,13 @@ The 'jobs' API is no longer supported.
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
uuid = upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=runtimeContext.update_workflow,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image)
+ self.project_uuid,
+ runtimeContext,
+ uuid=runtimeContext.update_workflow,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map,
+ submit_runner_image=runtimeContext.submit_runner_image)
self.stdout.write(uuid + "\n")
return (None, "success")
@@ -616,7 +619,6 @@ The 'jobs' API is no longer supported.
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
self.eval_timeout = runtimeContext.eval_timeout
- runtimeContext = runtimeContext.copy()
runtimeContext.use_container = True
runtimeContext.tmpdir_prefix = "tmp"
runtimeContext.work_api = self.work_api
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 10323c2be..f39c98d88 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -240,7 +240,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run,
+ workflowobj, uri, loadref_run, runtimeContext,
include_primary=True, discovered_secondaryfiles=None):
"""Upload the dependencies of the workflowobj document to Keep.
@@ -449,12 +449,12 @@ def upload_dependencies(arvrunner, name, document_loader,
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
- if arvrunner.runtimeContext.copy_deps:
+ if runtimeContext.copy_deps:
# Find referenced collections and copy them into the
# destination project, for easy sharing.
already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
filters=[["portable_data_hash", "in", list(keeprefs)],
- ["owner_uuid", "=", arvrunner.project_uuid]],
+ ["owner_uuid", "=", runtimeContext.project_uuid]],
select=["uuid", "portable_data_hash", "created_at"]))
keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
@@ -469,7 +469,7 @@ def upload_dependencies(arvrunner, name, document_loader,
col = col["items"][0]
try:
arvrunner.api.collections().create(body={"collection": {
- "owner_uuid": arvrunner.project_uuid,
+ "owner_uuid": runtimeContext.project_uuid,
"name": col["name"],
"description": col["description"],
"properties": col["properties"],
@@ -491,7 +491,7 @@ def upload_dependencies(arvrunner, name, document_loader,
return mapper
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
@@ -501,24 +501,26 @@ def upload_docker(arvrunner, tool):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
+ upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
@@ -547,11 +549,11 @@ def packed_workflow(arvrunner, tool, merged_map):
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
@@ -572,7 +574,7 @@ def tag_git_version(packed):
packed["http://schema.org/version"] = githash
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
"""
@@ -608,7 +610,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
tool.doc_loader,
job_order,
job_order.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if "id" in job_order:
del job_order["id"]
@@ -622,10 +625,10 @@ def upload_job_order(arvrunner, name, tool, job_order):
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool)
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
@@ -640,6 +643,7 @@ def upload_workflow_deps(arvrunner, tool):
deptool,
deptool["id"],
False,
+ runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles)
document_loader.idx[deptool["id"]] = deptool
@@ -652,15 +656,17 @@ def upload_workflow_deps(arvrunner, tool):
return merged_map
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index ba0557a9c..305d51e14 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -1117,7 +1117,7 @@ class TestSubmit(unittest.TestCase):
"portable_data_hash": "9999999999999999999999999999999b+99"}
self.assertEqual("9999999999999999999999999999999b+99",
- arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+ arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext))
@stubs
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list