[ARVADOS] created: 2.1.0-2461-g58ebceb34
Git user
git at public.arvados.org
Thu May 12 18:33:17 UTC 2022
at 58ebceb34531be92f74f593d532276368a87a6de (commit)
commit 58ebceb34531be92f74f593d532276368a87a6de
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu May 12 14:31:57 2022 -0400
19070: Fix --update-workflow
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 8eb12913a..51e7cd8b9 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -307,7 +307,7 @@ class ArvadosWorkflow(Workflow):
if self.wf_pdh is None:
adjustFileObjs(packed, keepmount)
adjustDirObjs(packed, keepmount)
- self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
self.loadingContext = self.loadingContext.copy()
self.loadingContext.metadata = self.loadingContext.metadata.copy()
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 70bc0c457..1759e4ac2 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -517,7 +517,6 @@ The 'jobs' API is no longer supported.
updated_tool.visit(self.check_features)
- self.project_uuid = runtimeContext.project_uuid
self.pipeline = None
self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
self.secret_store = runtimeContext.secret_store
@@ -558,7 +557,9 @@ The 'jobs' API is no longer supported.
# gets uploaded goes into the same parent project, unless
# an alternate --project-uuid was provided.
existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
- self.project_uuid = existing_wf["owner_uuid"]
+ runtimeContext.project_uuid = existing_wf["owner_uuid"]
+
+ self.project_uuid = runtimeContext.project_uuid
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
@@ -604,7 +605,7 @@ The 'jobs' API is no longer supported.
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
uuid = upload_workflow(self, tool, job_order,
- self.project_uuid,
+ runtimeContext.project_uuid,
runtimeContext,
uuid=runtimeContext.update_workflow,
submit_runner_ram=runtimeContext.submit_runner_ram,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index f39c98d88..50b3bb94d 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -671,7 +671,7 @@ def arvados_jobs_image(arvrunner, img, runtimeContext):
raise Exception("Docker image %s is not available\n%s" % (img, e) )
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
collection = arvados.collection.Collection(api_client=arvrunner.api,
keep_client=arvrunner.keep_client,
num_retries=arvrunner.num_retries)
@@ -680,15 +680,15 @@ def upload_workflow_collection(arvrunner, name, packed):
filters = [["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", name+"%"]]
- if arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ if runtimeContext.project_uuid:
+ filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
else:
collection.save_new(name=name,
- owner_uuid=arvrunner.project_uuid,
+ owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
num_retries=arvrunner.num_retries)
logger.info("Uploaded to %s", collection.manifest_locator())
diff --git a/sdk/cwl/tests/scripts/download_all_data.sh b/sdk/cwl/tests/scripts/download_all_data.sh
index d3a9d7876..7c769b584 100755
--- a/sdk/cwl/tests/scripts/download_all_data.sh
+++ b/sdk/cwl/tests/scripts/download_all_data.sh
@@ -1,7 +1,7 @@
+#!/bin/sh
+
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
-#!/bin/bash
-
echo bubble
diff --git a/sdk/cwl/tests/test_copy_deps.py b/sdk/cwl/tests/test_copy_deps.py
index 2b78db8b0..853a7d360 100644
--- a/sdk/cwl/tests/test_copy_deps.py
+++ b/sdk/cwl/tests/test_copy_deps.py
@@ -10,14 +10,14 @@ api = arvados.api()
def check_contents(group, wf_uuid):
contents = api.groups().contents(uuid=group["uuid"]).execute()
if len(contents["items"]) != 3:
- raise Exception("Expected 3 items")
+ raise Exception("Expected 3 items in "+group["uuid"]+" was "+len(contents["items"]))
found = False
for c in contents["items"]:
if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
found = True
if not found:
- raise Exception("Couldn't find workflow")
+ raise Exception("Couldn't find workflow in "+group["uuid"])
found = False
for c in contents["items"]:
@@ -42,7 +42,9 @@ def test_create():
raise Exception("Expected 0 items")
# Create workflow, by default should also copy dependencies
- wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+ cmd = ["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
wf_uuid = wf_uuid.decode("utf-8").strip()
check_contents(group, wf_uuid)
finally:
@@ -57,7 +59,9 @@ def test_update():
raise Exception("Expected 0 items")
# Create workflow, but with --no-copy-deps it shouldn't copy anything
- wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+ cmd = ["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
wf_uuid = wf_uuid.decode("utf-8").strip()
contents = api.groups().contents(uuid=group["uuid"]).execute()
@@ -72,7 +76,9 @@ def test_update():
raise Exception("Couldn't find workflow")
# Updating by default will copy missing items
- wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"])
+ cmd = ["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
wf_uuid = wf_uuid.decode("utf-8").strip()
check_contents(group, wf_uuid)
@@ -88,7 +94,9 @@ def test_execute():
raise Exception("Expected 0 items")
# Execute workflow, shouldn't copy anything.
- wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
wf_uuid = wf_uuid.decode("utf-8").strip()
contents = api.groups().contents(uuid=group["uuid"]).execute()
@@ -115,7 +123,9 @@ def test_execute():
raise Exception("Didn't expect to find jobs image dependency")
# Execute workflow with --copy-deps
- wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"])
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
wf_uuid = wf_uuid.decode("utf-8").strip()
contents = api.groups().contents(uuid=group["uuid"]).execute()
commit 9ca1b9f63b0fdaac45d04393e413cbc0b7f16d88
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed May 11 17:19:51 2022 -0400
19070: Prefer to use the passed-in runtimeContext
Code cleanup, change most places to use the passed-in runtimeContext
instead of the ArvRunner top level runtimeContext.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 3faa510a0..21b629f37 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -219,7 +219,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
- exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave depenencies where they are.")
+ exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
parser.add_argument(
"--skip-schemas",
@@ -367,5 +367,5 @@ def main(args=sys.argv[1:],
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
- runtimeContext=executor.runtimeContext,
+ runtimeContext=executor.toplevel_runtimeContext,
input_required=not (arvargs.create_workflow or arvargs.update_workflow))
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 33b4c90c6..5082cc2f4 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -466,7 +466,7 @@ class RunnerContainer(Runner):
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
@@ -501,7 +501,7 @@ class RunnerContainer(Runner):
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
@@ -551,17 +551,17 @@ class RunnerContainer(Runner):
if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
- if self.on_error:
+ if runtimeContext.on_error:
command.append("--on-error=" + self.on_error)
- if self.intermediate_output_ttl:
- command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if runtimeContext.intermediate_output_ttl:
+ command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
- if self.arvrunner.trash_intermediate:
+ if runtimeContext.trash_intermediate:
command.append("--trash-intermediate")
- if self.arvrunner.project_uuid:
- command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if runtimeContext.project_uuid:
+ command.append("--project-uuid="+runtimeContext.project_uuid)
if self.enable_dev:
command.append("--enable-dev")
@@ -582,8 +582,8 @@ class RunnerContainer(Runner):
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
job_spec = self.arvados_job_spec(runtimeContext)
- if self.arvrunner.project_uuid:
- job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ if runtimeContext.project_uuid:
+ job_spec["owner_uuid"] = runtimeContext.project_uuid
extra_submit_params = {}
if runtimeContext.submit_runner_cluster:
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 4fe82a6fe..8eb12913a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -37,11 +37,12 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
sum_res_pars = ("outdirMin", "outdirMax")
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+ runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
submit_runner_image=None):
- packed = packed_workflow(arvRunner, tool, merged_map)
+ packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
@@ -57,7 +58,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
upload_dependencies(arvRunner, name, tool.doc_loader,
- packed, tool.tool["id"], False)
+ packed, tool.tool["id"], False,
+ runtimeContext)
wf_runner_resources = None
@@ -72,7 +74,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
hints.append(wf_runner_resources)
- wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ submit_runner_image or "arvados/jobs:"+__version__,
+ runtimeContext)
if submit_runner_ram:
wf_runner_resources["ramMin"] = submit_runner_ram
@@ -194,7 +198,8 @@ class ArvadosWorkflow(Workflow):
self.doc_loader,
joborder,
joborder.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if self.wf_pdh is None:
packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
@@ -237,7 +242,8 @@ class ArvadosWorkflow(Workflow):
self.doc_loader,
packed,
self.tool["id"],
- False)
+ False,
+ runtimeContext)
# Discover files/directories referenced by the
# workflow (mainly "default" values)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 0daa15d5f..70bc0c457 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -197,11 +197,11 @@ The 'jobs' API is no longer supported.
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
- self.runtimeContext = ArvRuntimeContext(vars(arvargs))
- self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
+ self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
- validate_cluster_target(self, self.runtimeContext)
+ validate_cluster_target(self, self.toplevel_runtimeContext)
def arv_make_tool(self, toolpath_object, loadingContext):
@@ -535,6 +535,8 @@ The 'jobs' API is no longer supported.
if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+ runtimeContext = runtimeContext.copy()
+
default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
if runtimeContext.storage_classes == "default":
runtimeContext.storage_classes = default_storage_classes
@@ -544,14 +546,14 @@ The 'jobs' API is no longer supported.
if not runtimeContext.name:
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
- if self.runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
# When creating or updating workflow record, by default
# always copy dependencies and ensure Docker images are up
# to date.
- self.runtimeContext.copy_deps = True
- self.runtimeContext.match_local_docker = True
+ runtimeContext.copy_deps = True
+ runtimeContext.match_local_docker = True
- if self.runtimeContext.update_workflow and self.project_uuid is None:
+ if runtimeContext.update_workflow and self.project_uuid is None:
# If we are updating a workflow, make sure anything that
# gets uploaded goes into the same parent project, unless
# an alternate --project-uuid was provided.
@@ -560,7 +562,7 @@ The 'jobs' API is no longer supported.
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order)
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
@@ -587,7 +589,7 @@ The 'jobs' API is no longer supported.
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
@@ -602,12 +604,13 @@ The 'jobs' API is no longer supported.
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
uuid = upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=runtimeContext.update_workflow,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image)
+ self.project_uuid,
+ runtimeContext,
+ uuid=runtimeContext.update_workflow,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map,
+ submit_runner_image=runtimeContext.submit_runner_image)
self.stdout.write(uuid + "\n")
return (None, "success")
@@ -616,7 +619,6 @@ The 'jobs' API is no longer supported.
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
self.eval_timeout = runtimeContext.eval_timeout
- runtimeContext = runtimeContext.copy()
runtimeContext.use_container = True
runtimeContext.tmpdir_prefix = "tmp"
runtimeContext.work_api = self.work_api
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 10323c2be..f39c98d88 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -240,7 +240,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run,
+ workflowobj, uri, loadref_run, runtimeContext,
include_primary=True, discovered_secondaryfiles=None):
"""Upload the dependencies of the workflowobj document to Keep.
@@ -449,12 +449,12 @@ def upload_dependencies(arvrunner, name, document_loader,
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
- if arvrunner.runtimeContext.copy_deps:
+ if runtimeContext.copy_deps:
# Find referenced collections and copy them into the
# destination project, for easy sharing.
already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
filters=[["portable_data_hash", "in", list(keeprefs)],
- ["owner_uuid", "=", arvrunner.project_uuid]],
+ ["owner_uuid", "=", runtimeContext.project_uuid]],
select=["uuid", "portable_data_hash", "created_at"]))
keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
@@ -469,7 +469,7 @@ def upload_dependencies(arvrunner, name, document_loader,
col = col["items"][0]
try:
arvrunner.api.collections().create(body={"collection": {
- "owner_uuid": arvrunner.project_uuid,
+ "owner_uuid": runtimeContext.project_uuid,
"name": col["name"],
"description": col["description"],
"properties": col["properties"],
@@ -491,7 +491,7 @@ def upload_dependencies(arvrunner, name, document_loader,
return mapper
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
@@ -501,24 +501,26 @@ def upload_docker(arvrunner, tool):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
+ upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
@@ -547,11 +549,11 @@ def packed_workflow(arvrunner, tool, merged_map):
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
@@ -572,7 +574,7 @@ def tag_git_version(packed):
packed["http://schema.org/version"] = githash
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
"""
@@ -608,7 +610,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
tool.doc_loader,
job_order,
job_order.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if "id" in job_order:
del job_order["id"]
@@ -622,10 +625,10 @@ def upload_job_order(arvrunner, name, tool, job_order):
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool)
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
@@ -640,6 +643,7 @@ def upload_workflow_deps(arvrunner, tool):
deptool,
deptool["id"],
False,
+ runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles)
document_loader.idx[deptool["id"]] = deptool
@@ -652,15 +656,17 @@ def upload_workflow_deps(arvrunner, tool):
return merged_map
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker,
- arvrunner.runtimeContext.copy_deps)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index ba0557a9c..305d51e14 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -1117,7 +1117,7 @@ class TestSubmit(unittest.TestCase):
"portable_data_hash": "9999999999999999999999999999999b+99"}
self.assertEqual("9999999999999999999999999999999b+99",
- arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+ arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext))
@stubs
commit dec2837a7cb9d6c0538e7a4b699fd08435e8353a
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri May 6 15:24:23 2022 -0400
19070: Fix tests
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/tests/test_copy_deps.py b/sdk/cwl/tests/test_copy_deps.py
index 298a6800f..2b78db8b0 100644
--- a/sdk/cwl/tests/test_copy_deps.py
+++ b/sdk/cwl/tests/test_copy_deps.py
@@ -136,7 +136,7 @@ def test_execute():
finally:
api.groups().delete(uuid=group["uuid"]).execute()
-
-test_create()
-test_update()
-test_execute()
+if __name__ == '__main__':
+ test_create()
+ test_update()
+ test_execute()
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 6951a082d..ba0557a9c 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -47,12 +47,14 @@ _rootDesc = None
def stubs(func):
@functools.wraps(func)
+ @mock.patch("arvados_cwl.arvdocker.determine_image_id")
@mock.patch("uuid.uuid4")
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@mock.patch("arvados.collection.KeepClient")
@mock.patch("arvados.keep.KeepClient")
@mock.patch("arvados.events.subscribe")
- def wrapped(self, events, keep_client1, keep_client2, keepdocker, uuid4, *args, **kwargs):
+ def wrapped(self, events, keep_client1, keep_client2, keepdocker,
+ uuid4, determine_image_id, *args, **kwargs):
class Stubs(object):
pass
stubs = Stubs()
@@ -63,6 +65,8 @@ def stubs(func):
"df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3",
"df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"]
+ determine_image_id.return_value = None
+
def putstub(p, **kwargs):
return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
keep_client1().put.side_effect = putstub
commit 7d01c644639aa75500c8cfb2a2b7bfd83941eac3
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri May 6 10:23:03 2022 -0400
19070: --create/update-workflow also implies --match-submitter-images
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index eace9f449..0daa15d5f 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -545,7 +545,11 @@ The 'jobs' API is no longer supported.
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
if self.runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ # When creating or updating workflow record, by default
+ # always copy dependencies and ensure Docker images are up
+ # to date.
self.runtimeContext.copy_deps = True
+ self.runtimeContext.match_local_docker = True
if self.runtimeContext.update_workflow and self.project_uuid is None:
# If we are updating a workflow, make sure anything that
commit a4d6d061cb30cc2f77d4d64283ee998bb3e3a97b
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed May 4 16:57:01 2022 -0400
19070: Add integration test for copying dependencies
Fix python tests
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index ed57c6dae..eace9f449 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -544,10 +544,10 @@ The 'jobs' API is no longer supported.
if not runtimeContext.name:
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
- if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
- runtimeContext.copy_deps = True
+ if self.runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ self.runtimeContext.copy_deps = True
- if runtimeContext.update_workflow and self.project_uuid is None:
+ if self.runtimeContext.update_workflow and self.project_uuid is None:
# If we are updating a workflow, make sure anything that
# gets uploaded goes into the same parent project, unless
# an alternate --project-uuid was provided.
diff --git a/sdk/cwl/tests/19070-copy-deps.cwl b/sdk/cwl/tests/19070-copy-deps.cwl
new file mode 100644
index 000000000..b0d61700e
--- /dev/null
+++ b/sdk/cwl/tests/19070-copy-deps.cwl
@@ -0,0 +1,17 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: CommandLineTool
+baseCommand: echo
+inputs:
+ message:
+ type: File
+ inputBinding:
+ position: 1
+ default:
+ class: File
+ location: keep:d7514270f356df848477718d58308cc4+94/b
+
+outputs: []
diff --git a/sdk/cwl/tests/arvados-tests.sh b/sdk/cwl/tests/arvados-tests.sh
index 9cb5234cf..1bbaa505e 100755
--- a/sdk/cwl/tests/arvados-tests.sh
+++ b/sdk/cwl/tests/arvados-tests.sh
@@ -18,6 +18,15 @@ if ! arv-get 20850f01122e860fb878758ac1320877+71 > /dev/null ; then
arv-put --portable-data-hash samples/sample1_S01_R1_001.fastq.gz
fi
+# Test for #18888
+# This is a standalone test because the bug was observed with this
+# command line and was thought to be due to command line handling.
arvados-cwl-runner 18888-download_def.cwl --scripts scripts/
+# Test for #19070
+# The most effective way to test this seemed to be to write an
+# integration test to check for the expected behavior.
+python test_copy_deps.py
+
+# Run integration tests
exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum --api=containers
diff --git a/sdk/cwl/tests/test_copy_deps.py b/sdk/cwl/tests/test_copy_deps.py
new file mode 100644
index 000000000..298a6800f
--- /dev/null
+++ b/sdk/cwl/tests/test_copy_deps.py
@@ -0,0 +1,142 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import subprocess
+
+api = arvados.api()
+
+def check_contents(group, wf_uuid):
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 3:
+ raise Exception("Expected 3 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+
+def test_create():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-1", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, by default should also copy dependencies
+ wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_update():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-2", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, but with --no-copy-deps it shouldn't copy anything
+ wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 1:
+ raise Exception("Expected 1 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow")
+
+ # Updating by default will copy missing items
+ wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"])
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_execute():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-3", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Execute workflow, shouldn't copy anything.
+ wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"])
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ # container request
+ # final output collection
+ # container log
+ # step output collection
+ # container request log
+ if len(contents["items"]) != 5:
+ raise Exception("Expected 5 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if found:
+ raise Exception("Didn't expect to find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if found:
+ raise Exception("Didn't expect to find jobs image dependency")
+
+ # Execute workflow with --copy-deps
+ wf_uuid = subprocess.check_output(["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"])
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+test_create()
+test_update()
+test_execute()
diff --git a/sdk/python/tests/test_arv_keepdocker.py b/sdk/python/tests/test_arv_keepdocker.py
index fd3a69cae..8fbfad437 100644
--- a/sdk/python/tests/test_arv_keepdocker.py
+++ b/sdk/python/tests/test_arv_keepdocker.py
@@ -48,11 +48,13 @@ class ArvKeepdockerTestCase(unittest.TestCase, tutil.VersionChecker):
self.run_arv_keepdocker(['--version'], sys.stderr)
self.assertVersionOutput(out, err)
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_image_format_compatibility(self, _1, _2):
+ def test_image_format_compatibility(self, _1, _2, _3):
old_id = hashlib.sha256(b'old').hexdigest()
new_id = 'sha256:'+hashlib.sha256(b'new').hexdigest()
for supported, img_id, expect_ok in [
@@ -152,11 +154,13 @@ class ArvKeepdockerTestCase(unittest.TestCase, tutil.VersionChecker):
self.run_arv_keepdocker(['[::1]/repo/img'], sys.stderr)
find_image_mock.assert_called_with('[::1]/repo/img', 'latest')
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_collection_property_update(self, _1, _2):
+ def test_collection_property_update(self, _1, _2, _3):
image_id = 'sha256:'+hashlib.sha256(b'image').hexdigest()
fakeDD = arvados.api('v1')._rootDesc
fakeDD['dockerImageFormats'] = ['v2']
commit 4af2a63dcd4460b11c8f9f05a51caf1954972c95
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed May 4 14:17:18 2022 -0400
19070: Fix test_arvados_jobs_image
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index b43f0268a..6951a082d 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -1077,6 +1077,18 @@ class TestSubmit(unittest.TestCase):
"name": "arvados/jobs:"+arvados_cwl.__version__,
"owner_uuid": "",
"properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "",
+ "link_class": "docker_image_hash",
+ "name": "123456",
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "link_class": "docker_image_repo+tag",
+ "name": "arvados/jobs:"+arvados_cwl.__version__,
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
{"items": [{"created_at": "",
"head_uuid": "",
"link_class": "docker_image_hash",
@@ -1090,10 +1102,16 @@ class TestSubmit(unittest.TestCase):
"owner_uuid": "",
"manifest_text": "",
"properties": ""
- }], "items_available": 1, "offset": 0},)
+ }], "items_available": 1, "offset": 0},
+ {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "owner_uuid": "",
+ "manifest_text": "",
+ "properties": ""
+ }], "items_available": 1, "offset": 0})
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"portable_data_hash": "9999999999999999999999999999999b+99"}
+
self.assertEqual("9999999999999999999999999999999b+99",
arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
commit 2b0fad4e2c099ebe42ae7cdb20615d9e0da2afc9
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue May 3 18:05:43 2022 -0400
19070: Fix most tests
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 6b670c73d..10323c2be 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -402,7 +402,8 @@ def upload_dependencies(arvrunner, name, document_loader,
keeprefs = set()
def addkeepref(k):
- keeprefs.add(collection_pdh_pattern.match(k).group(1))
+ if k.startswith("keep:"):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
def setloc(p):
loc = p.get("location")
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 5092fc457..b43f0268a 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -77,7 +77,7 @@ def stubs(func):
"arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})],
"arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})],
}
- def kd(a, b, image_name=None, image_tag=None):
+ def kd(a, b, image_name=None, image_tag=None, project_uuid=None):
return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
stubs.keepdocker.side_effect = kd
@@ -1599,6 +1599,9 @@ class TestCreateWorkflow(unittest.TestCase):
@stubs
def test_update(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug",
@@ -1610,6 +1613,7 @@ class TestCreateWorkflow(unittest.TestCase):
"name": "submit_wf.cwl",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
@@ -1622,6 +1626,9 @@ class TestCreateWorkflow(unittest.TestCase):
@stubs
def test_update_name(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug", "--name", "testing 123",
@@ -1633,6 +1640,7 @@ class TestCreateWorkflow(unittest.TestCase):
"name": "testing 123",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
commit b5d0e0ad775eb822b9ab4bed5b57c2e9072f4c0b
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue May 3 17:01:26 2022 -0400
19070: Add --copy-deps/--no-copy-deps
Copies dependencies by default using --create-workflow and --update
workflow
Keeps old behavior by default when running from the command line (can
get new behavior with --copy-deps).
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c73b358ec..3faa510a0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -217,6 +217,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+ exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave depenencies where they are.")
+
parser.add_argument(
"--skip-schemas",
action="store_true",
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index f75bde81e..33b4c90c6 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -247,7 +247,8 @@ class ArvadosContainer(JobBase):
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker)
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index d5295afc2..cf0b3b9da 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -57,7 +57,7 @@ def determine_image_id(dockerImageId):
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
- force_pull, tmp_outdir_prefix, match_local_docker):
+ force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
@@ -85,10 +85,14 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
image_tag=image_tag,
project_uuid=None)
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
- image_name=image_name,
- image_tag=image_tag,
- project_uuid=project_uuid)
+ if copy_deps:
+ # Only images that are available in the destination project
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag,
+ project_uuid=project_uuid)
+ else:
+ images = out_of_project_images
if match_local_docker:
local_image_id = determine_image_id(dockerRequirement["dockerImageId"])
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 316250106..64f85e207 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -38,6 +38,7 @@ class ArvRuntimeContext(RuntimeContext):
self.collection_cache_size = 256
self.match_local_docker = False
self.enable_preemptible = None
+ self.copy_deps = None
super(ArvRuntimeContext, self).__init__(kwargs)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index ef371b43d..ed57c6dae 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -544,6 +544,16 @@ The 'jobs' API is no longer supported.
if not runtimeContext.name:
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ runtimeContext.copy_deps = True
+
+ if runtimeContext.update_workflow and self.project_uuid is None:
+ # If we are updating a workflow, make sure anything that
+ # gets uploaded goes into the same parent project, unless
+ # an alternate --project-uuid was provided.
+ existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
+ self.project_uuid = existing_wf["owner_uuid"]
+
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
updated_tool, job_order)
@@ -571,10 +581,6 @@ The 'jobs' API is no longer supported.
else:
tool = updated_tool
- if runtimeContext.update_workflow and self.project_uuid is None:
- existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
- self.project_uuid = existing_wf["owner_uuid"]
-
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
merged_map = upload_workflow_deps(self, tool)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index dae0541bb..6b670c73d 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -39,6 +39,7 @@ from cwltool.builder import Builder
import schema_salad.validate as validate
import arvados.collection
+import arvados.util
from .util import collectionUUID
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
@@ -399,10 +400,15 @@ def upload_dependencies(arvrunner, name, document_loader,
single_collection=True,
optional_deps=optional_deps)
+ keeprefs = set()
+ def addkeepref(k):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
+
def setloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ addkeepref(p["location"])
return
if not loc:
@@ -424,7 +430,10 @@ def upload_dependencies(arvrunner, name, document_loader,
gp = collection_uuid_pattern.match(loc)
if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ addkeepref(p["location"])
return
+
uuid = gp.groups()[0]
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
@@ -439,6 +448,38 @@ def upload_dependencies(arvrunner, name, document_loader,
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
+ if arvrunner.runtimeContext.copy_deps:
+ # Find referenced collections and copy them into the
+ # destination project, for easy sharing.
+ already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+ filters=[["portable_data_hash", "in", list(keeprefs)],
+ ["owner_uuid", "=", arvrunner.project_uuid]],
+ select=["uuid", "portable_data_hash", "created_at"]))
+
+ keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+ for kr in keeprefs:
+ col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+ order="created_at desc",
+ select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+ limit=1).execute()
+ if len(col["items"]) == 0:
+ logger.warning("Cannot find collection with portable data hash %s", kr)
+ continue
+ col = col["items"][0]
+ try:
+ arvrunner.api.collections().create(body={"collection": {
+ "owner_uuid": arvrunner.project_uuid,
+ "name": col["name"],
+ "description": col["description"],
+ "properties": col["properties"],
+ "portable_data_hash": col["portable_data_hash"],
+ "manifest_text": col["manifest_text"],
+ "storage_classes_desired": col["storage_classes_desired"],
+ "trash_at": col["trash_at"]
+ }}, ensure_unique_name=True).execute()
+ except Exception as e:
+ logger.warning("Unable copy collection to destination: %s", e)
+
if "$schemas" in workflowobj:
sch = CommentedSeq()
for s in workflowobj["$schemas"]:
@@ -462,13 +503,15 @@ def upload_docker(arvrunner, tool):
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
True, arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
@@ -506,7 +549,8 @@ def packed_workflow(arvrunner, tool, merged_map):
arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
@@ -614,7 +658,8 @@ def arvados_jobs_image(arvrunner, img):
return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ arvrunner.runtimeContext.match_local_docker,
+ arvrunner.runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
commit be6d658dd95d3a0d63bb3441e50877a4916a5b8e
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Apr 28 12:53:55 2022 -0400
19070: Always make copy of Docker images with --project-uuid
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 04e2a4cff..d5295afc2 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -80,11 +80,17 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
image_name = sp[0]
image_tag = sp[1] if len(sp) > 1 else "latest"
+ out_of_project_images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag,
+ project_uuid=None)
+
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=project_uuid)
- if images and match_local_docker:
+ if match_local_docker:
local_image_id = determine_image_id(dockerRequirement["dockerImageId"])
if local_image_id:
# find it in the list
@@ -98,15 +104,25 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
# force re-upload.
images = []
+ for i in out_of_project_images:
+ if i[1]["dockerhash"] == local_image_id:
+ found = True
+ out_of_project_images = [i]
+ break
+ if not found:
+ # force re-upload.
+ out_of_project_images = []
+
if not images:
- # Fetch Docker image if necessary.
- try:
- result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
- force_pull, tmp_outdir_prefix)
- if not result:
- raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
- except OSError as e:
- raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
+ if not out_of_project_images:
+ # Fetch Docker image if necessary.
+ try:
+ result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
+ force_pull, tmp_outdir_prefix)
+ if not result:
+ raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
+ except OSError as e:
+ raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
# Upload image to Arvados
args = []
@@ -125,7 +141,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=project_uuid)
if not images:
raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 680ca0b7b..ef371b43d 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -571,6 +571,10 @@ The 'jobs' API is no longer supported.
else:
tool = updated_tool
+ if runtimeContext.update_workflow and self.project_uuid is None:
+ existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
+ self.project_uuid = existing_wf["owner_uuid"]
+
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
merged_map = upload_workflow_deps(self, tool)
@@ -584,13 +588,12 @@ The 'jobs' API is no longer supported.
loadingContext.metadata = tool.metadata
tool = load_tool(tool.tool, loadingContext)
- existing_uuid = runtimeContext.update_workflow
- if existing_uuid or runtimeContext.create_workflow:
+ if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
uuid = upload_workflow(self, tool, job_order,
self.project_uuid,
- uuid=existing_uuid,
+ uuid=runtimeContext.update_workflow,
submit_runner_ram=runtimeContext.submit_runner_ram,
name=runtimeContext.name,
merged_map=merged_map,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 7d4310b0e..dae0541bb 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -456,9 +456,9 @@ def upload_docker(arvrunner, tool):
(docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
- # TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
arvrunner.runtimeContext.force_docker_pull,
arvrunner.runtimeContext.tmp_outdir_prefix,
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index c8f6e7808..db4edd2df 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -258,7 +258,7 @@ def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
'tag': tag,
}
-def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
+def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
"""List all Docker images known to the api_client with image_name and
image_tag. If no image_name is given, defaults to listing all
Docker images.
@@ -273,13 +273,18 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
search_filters = []
repo_links = None
hash_links = None
+
+ project_filter = []
+ if project_uuid is not None:
+ project_filter = [["owner_uuid", "=", project_uuid]]
+
if image_name:
# Find images with the name the user specified.
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_repo+tag'],
['name', '=',
- '{}:{}'.format(image_name, image_tag or 'latest')]])
+ '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
if search_links:
repo_links = search_links
else:
@@ -287,7 +292,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_hash'],
- ['name', 'ilike', image_name + '%']])
+ ['name', 'ilike', image_name + '%']]+project_filter)
hash_links = search_links
# Only list information about images that were found in the search.
search_filters.append(['head_uuid', 'in',
@@ -299,7 +304,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
if hash_links is None:
hash_links = _get_docker_links(
api_client, num_retries,
- filters=search_filters + [['link_class', '=', 'docker_image_hash']])
+ filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
# Each collection may have more than one name (though again, one name
@@ -309,7 +314,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
repo_links = _get_docker_links(
api_client, num_retries,
filters=search_filters + [['link_class', '=',
- 'docker_image_repo+tag']])
+ 'docker_image_repo+tag']]+project_filter)
seen_image_names = collections.defaultdict(set)
images = []
for link in repo_links:
@@ -337,7 +342,7 @@ def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None)
# Remove any image listings that refer to unknown collections.
existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
api_client.collections().list, num_retries,
- filters=[['uuid', 'in', [im['collection'] for im in images]]],
+ filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
select=['uuid'])}
return [(image['collection'], image) for image in images
if image['collection'] in existing_coll_uuids]
commit 9bbb935dc521f16b2b2b3d7c99653fa20914d90d
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Apr 28 12:44:10 2022 -0400
19070: keepdocker will use image in arvados
If not available in Docker
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index 537ea3a94..c8f6e7808 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -85,7 +85,8 @@ class DockerError(Exception):
def popen_docker(cmd, *args, **kwargs):
manage_stdin = ('stdin' not in kwargs)
kwargs.setdefault('stdin', subprocess.PIPE)
- kwargs.setdefault('stdout', sys.stderr)
+ kwargs.setdefault('stdout', subprocess.PIPE)
+ kwargs.setdefault('stderr', subprocess.PIPE)
try:
docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
except OSError: # No docker in $PATH, try docker.io
@@ -385,18 +386,25 @@ def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None)
if args.pull and not find_image_hashes(args.image):
pull_image(args.image, args.tag)
+ images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
+
+ image_hash = None
try:
image_hash = find_one_image_hash(args.image, args.tag)
+ if not docker_image_compatible(api, image_hash):
+ if args.force_image_format:
+ logger.warning("forcing incompatible image")
+ else:
+ logger.error("refusing to store " \
+ "incompatible format (use --force-image-format to override)")
+ sys.exit(1)
except DockerError as error:
- logger.error(str(error))
- sys.exit(1)
-
- if not docker_image_compatible(api, image_hash):
- if args.force_image_format:
- logger.warning("forcing incompatible image")
+ if images_in_arv:
+ # We don't have Docker / we don't have the image locally,
+ # use image that's already uploaded to Arvados
+ image_hash = images_in_arv[0][1]['dockerhash']
else:
- logger.error("refusing to store " \
- "incompatible format (use --force-image-format to override)")
+ logger.error(str(error))
sys.exit(1)
image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list