[arvados] created: 2.5.0-13-g50bd5e3f7
git repository hosting
git at public.arvados.org
Thu Jan 12 22:10:14 UTC 2023
at 50bd5e3f71a1eb18c7721fc44a934413463baa7c (commit)
commit 50bd5e3f71a1eb18c7721fc44a934413463baa7c
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Jan 12 17:09:54 2023 -0500
19385: Fixing tests
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 299ad5c0d..614530c3e 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -216,7 +216,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
prefix = firstfile[:n+1]
- col = arvados.collection.Collection()
+ col = arvados.collection.Collection(api_client=arvRunner.api)
for w in workflow_files | import_files:
# 1. load YAML
@@ -260,6 +260,11 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
"arv:workflowMain": toolfile,
}
+ if git_info:
+ for g in git_info:
+ p = g.split("#", 1)[1]
+ properties["arv:"+p] = git_info[g]
+
col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
adjustDirObjs(job_order, trim_listing)
@@ -292,9 +297,9 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
hints.append(wf_runner_resources)
# uncomment me
- # wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
- # submit_runner_image or "arvados/jobs:"+__version__,
- # runtimeContext)
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ submit_runner_image or "arvados/jobs:"+__version__,
+ runtimeContext)
if submit_runner_ram:
wf_runner_resources["ramMin"] = submit_runner_ram
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index c2c992d44..026eb3099 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -714,8 +714,8 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
# commented out for testing only, uncomment me
- #with Perf(metrics, "upload_docker"):
- # upload_docker(arvrunner, tool, runtimeContext)
+ with Perf(metrics, "upload_docker"):
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
@@ -818,6 +818,9 @@ class Runner(Process):
reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
+ reuse_req, _ = self.embedded_tool.get_requirement("WorkReuse")
+ if reuse_req:
+ enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
self.uuid = None
self.final_output = None
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 4fbca5c05..1280095a8 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -200,14 +200,18 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
mocktool = mock.NonCallableMock(tool=gitinfo_workflow["$graph"][0], metadata=gitinfo_workflow)
stubs.git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
- #expect_packed_workflow.update(stubs.git_info)
+ expect_packed_workflow.update(stubs.git_info)
stubs.git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in stubs.git_info.items()}
- #if wfname == wfpath:
- # container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
- #else:
- container_name = wfname
+ step_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
+ if wfname == wfpath:
+ container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
+ else:
+ container_name = wfname
+
+ expect_packed_workflow["$graph"][0]["steps"][0]["id"] = "#main/"+step_name
+ expect_packed_workflow["$graph"][0]["steps"][0]["label"] = container_name
stubs.expect_container_spec = {
'priority': 500,
@@ -267,11 +271,10 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
'vcpus': 1,
'ram': (1024+256)*1024*1024
},
- 'properties': {},
+ 'properties': stubs.git_props,
'use_existing': False,
'secret_mounts': {}
}
- #'properties': stubs.git_props,
stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
stubs.api.workflows().create().execute.return_value = {
@@ -315,7 +318,7 @@ class TestSubmit(unittest.TestCase):
@stubs()
def test_submit_container(self, stubs):
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=containers", "--debug", "--disable-git",
+ ["--submit", "--no-wait", "--api=containers", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
@@ -324,7 +327,7 @@ class TestSubmit(unittest.TestCase):
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ 'name': 'submit_wf.cwl ('+ stubs.git_props["arv:gitDescribe"] +') input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
@@ -389,16 +392,25 @@ class TestSubmit(unittest.TestCase):
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ["--disable-reuse" if v == "--enable-reuse" else v for v in expect_container["command"]]
expect_container["use_existing"] = False
- expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["hints"] = [
+ {
+ "class": "LoadListingRequirement",
+ "loadListing": "deep_listing"
+ },
+ {
+ "class": "NetworkAccess",
+ "networkAccess": True
+ },
{
- "class": "http://arvados.org/cwl#ReuseRequirement",
+ "class": "WorkReuse",
"enableReuse": False,
},
+ {
+ "acrContainerImage": "999999999999999999999999999999d3+99",
+ "class": "http://arvados.org/cwl#WorkflowRunnerResources"
+ }
]
- expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$namespaces"] = {
- "arv": "http://arvados.org/cwl#",
- "cwltool": "http://commonwl.org/cwltool#"
- }
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["steps"][0]["run"] = "keep:40eac42ef5535aca47ca5e47b5786f58+137/wf/submit_wf_no_reuse.cwl"
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
@@ -666,6 +678,8 @@ class TestSubmit(unittest.TestCase):
@mock.patch("time.sleep")
@stubs()
def test_submit_keepref(self, stubs, tm, reader):
+ raise Exception("broken")
+
with open("tests/wf/expect_arvworkflow.cwl") as f:
reader().open().__enter__().read.return_value = f.read()
@@ -991,18 +1005,28 @@ class TestSubmit(unittest.TestCase):
"vcpus": 2,
"ram": (2000+512) * 2**20
}
- expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["hints"] = [
+ {
+ "class": "LoadListingRequirement",
+ "loadListing": "deep_listing"
+ },
+ {
+ "class": "NetworkAccess",
+ "networkAccess": True
+ },
{
"class": "http://arvados.org/cwl#WorkflowRunnerResources",
+ "acrContainerImage": "999999999999999999999999999999d3+99",
"coresMin": 2,
"ramMin": 2000,
"keep_cache": 512
}
]
- expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$namespaces"] = {
- "arv": "http://arvados.org/cwl#",
- }
+ #expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$namespaces"] = {
+ # "arv": "http://arvados.org/cwl#",
+ #}
expect_container["command"] = ["--collection-cache-size=512" if v == "--collection-cache-size=256" else v for v in expect_container["command"]]
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["steps"][0]["run"] = "keep:48136b8a3e0b5768ea179729309a365e+145/wf/submit_wf_runner_resources.cwl"
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
@@ -1414,7 +1438,15 @@ class TestSubmit(unittest.TestCase):
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["hints"] = [
+ {
+ "class": "LoadListingRequirement",
+ "loadListing": "deep_listing"
+ },
+ {
+ "class": "NetworkAccess",
+ "networkAccess": True
+ },
{
"class": "http://arvados.org/cwl#ProcessProperties",
"processProperties": [
@@ -1429,11 +1461,17 @@ class TestSubmit(unittest.TestCase):
}
}
],
+ },
+ {
+ "acrContainerImage": "999999999999999999999999999999d3+99",
+ "class": "http://arvados.org/cwl#WorkflowRunnerResources"
}
]
- expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$namespaces"] = {
- "arv": "http://arvados.org/cwl#"
- }
+ #expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$namespaces"] = {
+ # "arv": "http://arvados.org/cwl#"
+ #}
+
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["steps"][0]["run"] = "keep:a068c0b781383b8a27ec5c04a355295b+147/wf/submit_wf_process_properties.cwl"
expect_container["properties"].update({
"baz": "blorp.txt",
commit 55e73facf08cc8651eb71529a2ab4e0657d3c870
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Jan 12 15:53:20 2023 -0500
19385: Work in progress checkpoint, submitting uses wrappers
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
Tested-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 20eed989c..e151dbf7b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -381,7 +381,7 @@ def main(args=sys.argv[1:],
# unit tests.
stdout = None
- if arvargs.submit and (arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow)):
+ if arvargs.workflow.startswith("arvwf:") or workflow_uuid_pattern.match(arvargs.workflow):
executor.loadingContext.do_validate = False
executor.fast_submit = True
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 30e0092e9..e0ee7285a 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -537,19 +537,20 @@ class RunnerContainer(Runner):
}
elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
workflowpath = "/var/lib/cwl/workflow.json#main"
- record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
- packed = yaml.safe_load(record["definition"])
+ #record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+ #packed = yaml.safe_load(record["definition"])
+ packed = self.loadingContext.loader.idx[self.embedded_tool.tool["id"]]
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
+ main = self.loadingContext.loader.idx["_:main"]
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "content": packed
+ "content": main
}
container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index ef8398df3..9c20c0c50 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -17,9 +17,6 @@ import arvados.commands.keepdocker
logger = logging.getLogger('arvados.cwl-runner')
-cached_lookups = {}
-cached_lookups_lock = threading.Lock()
-
def determine_image_id(dockerImageId):
for line in (
subprocess.check_output( # nosec
@@ -56,10 +53,16 @@ def determine_image_id(dockerImageId):
return None
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
- force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, runtimeContext):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+ project_uuid = runtimeContext.project_uuid,
+ force_pull = runtimeContext.force_docker_pull,
+ tmp_outdir_prefix = runtimeContext.tmp_outdir_prefix,
+ match_local_docker = runtimeContext.match_local_docker,
+ copy_deps = runtimeContext.copy_deps
+ cached_lookups = runtimeContext.cached_docker_lookups
+
if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
return dockerRequirement["http://arvados.org/cwl#dockerCollectionPDH"]
@@ -69,11 +72,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
if hasattr(dockerRequirement, 'lc'):
dockerRequirement.lc.data["dockerImageId"] = dockerRequirement.lc.data["dockerPull"]
- global cached_lookups
- global cached_lookups_lock
- with cached_lookups_lock:
- if dockerRequirement["dockerImageId"] in cached_lookups:
- return cached_lookups[dockerRequirement["dockerImageId"]]
+ if dockerRequirement["dockerImageId"] in cached_lookups:
+ return cached_lookups[dockerRequirement["dockerImageId"]]
with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
sp = dockerRequirement["dockerImageId"].split(":")
@@ -154,13 +154,6 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
- with cached_lookups_lock:
- cached_lookups[dockerRequirement["dockerImageId"]] = pdh
+ cached_lookups[dockerRequirement["dockerImageId"]] = pdh
return pdh
-
-def arv_docker_clear_cache():
- global cached_lookups
- global cached_lookups_lock
- with cached_lookups_lock:
- cached_lookups = {}
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 784e1bdb3..299ad5c0d 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -129,7 +129,6 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
def rel_ref(s, baseuri, urlexpander, merged_map):
uri = urlexpander(s, baseuri)
- print("DDD", baseuri, merged_map)
fileuri = urllib.parse.urldefrag(baseuri)[0]
if fileuri in merged_map:
replacements = merged_map[fileuri].resolved
@@ -145,22 +144,24 @@ def rel_ref(s, baseuri, urlexpander, merged_map):
r = os.path.relpath(p2, p1)
if r == ".":
r = ""
- print("AAA", uri, s)
- print("BBBB", p1, p2, p3, r)
return os.path.join(r, p3)
-def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style, runtimeContext):
if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
d.fa.set_block_style()
if isinstance(d, MutableSequence):
for s in d:
- update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+ update_refs(s, baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
elif isinstance(d, MutableMapping):
if "id" in d:
baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+ if d.get("class") == "DockerRequirement":
+ dockerImageId = d.get("dockerImageId") or d.get("dockerPull")
+ d["http://arvados.org/cwl#dockerCollectionPDH"] = runtimeContext.cached_docker_lookups.get(dockerImageId)
+
for s in d:
for field in ("$include", "$import", "location", "run"):
if field in d and isinstance(d[field], str):
@@ -170,13 +171,15 @@ def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
for n, s in enumerate(d["$schemas"]):
d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
- update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+ update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style, runtimeContext)
def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
- runtimeContext, uuid=None,
- submit_runner_ram=0, name=None, merged_map=None,
- submit_runner_image=None,
- git_info=None):
+ runtimeContext,
+ uuid=None,
+ submit_runner_ram=0, name=None, merged_map=None,
+ submit_runner_image=None,
+ git_info=None,
+ set_defaults=False):
firstfile = None
workflow_files = set()
@@ -215,8 +218,6 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
col = arvados.collection.Collection()
- #print(merged_map.keys())
-
for w in workflow_files | import_files:
# 1. load YAML
@@ -235,7 +236,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
# 2. find $import, $include, $schema, run, location
# 3. update field value
- update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+ update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style, runtimeContext)
with col.open(w[n+1:], "wt") as f:
yamlloader.dump(result, stream=f)
@@ -252,10 +253,19 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
- col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
-
toolfile = tool.tool["id"][n+1:]
+ properties = {
+ "type": "workflow",
+ "arv:workflowMain": toolfile,
+ }
+
+ col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True, properties=properties)
+
+ adjustDirObjs(job_order, trim_listing)
+ adjustFileObjs(job_order, trim_anonymous_location)
+ adjustDirObjs(job_order, trim_anonymous_location)
+
# now construct the wrapper
step = {
@@ -268,6 +278,27 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
main = tool.tool
+ wf_runner_resources = None
+
+ hints = main.get("hints", [])
+ found = False
+ for h in hints:
+ if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+ wf_runner_resources = h
+ found = True
+ break
+ if not found:
+ wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
+ hints.append(wf_runner_resources)
+
+ # uncomment me
+ # wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ # submit_runner_image or "arvados/jobs:"+__version__,
+ # runtimeContext)
+
+ if submit_runner_ram:
+ wf_runner_resources["ramMin"] = submit_runner_ram
+
newinputs = []
for i in main["inputs"]:
inp = {}
@@ -282,6 +313,12 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
"loadListing", "default"):
if f in i:
inp[f] = i[f]
+
+ if set_defaults:
+ sn = shortname(i["id"])
+ if sn in job_order:
+ inp["default"] = job_order[sn]
+
inp["id"] = "#main/%s" % shortname(i["id"])
newinputs.append(inp)
@@ -309,7 +346,7 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
if main.get("requirements"):
wrapper["requirements"].extend(main["requirements"])
- if main.get("hints"):
+ if hints:
wrapper["hints"] = main["hints"]
doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
@@ -318,7 +355,12 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
for g in git_info:
doc[g] = git_info[g]
- update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+ update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False, runtimeContext)
+
+ return doc
+
+
+def make_workflow_record(arvRunner, doc, name, tool, project_uuid, update_uuid):
wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
@@ -331,8 +373,8 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
if project_uuid:
body["workflow"]["owner_uuid"] = project_uuid
- if uuid:
- call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+ if update_uuid:
+ call = arvRunner.api.workflows().update(uuid=update_uuid, body=body)
else:
call = arvRunner.api.workflows().create(body=body)
return call.execute(num_retries=arvRunner.num_retries)["uuid"]
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 3ce561f66..125527f78 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -42,6 +42,7 @@ class ArvRuntimeContext(RuntimeContext):
self.defer_downloads = False
self.varying_url_params = ""
self.prefer_cached_downloads = False
+ self.cached_docker_lookups = {}
super(ArvRuntimeContext, self).__init__(kwargs)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 1e6344f5e..7e9840db6 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -705,21 +705,34 @@ The 'jobs' API is no longer supported.
#with Perf(metrics, "load_tool"):
# tool = load_tool(tool.tool, loadingContext)
- if runtimeContext.update_workflow or runtimeContext.create_workflow:
- # Create a pipeline template or workflow record and exit.
- if self.work_api == "containers":
- uuid = new_upload_workflow(self, tool, job_order,
- runtimeContext.project_uuid,
- runtimeContext,
- uuid=runtimeContext.update_workflow,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image,
- git_info=git_info)
+ if runtimeContext.update_workflow or runtimeContext.create_workflow or (runtimeContext.submit and not self.fast_submit):
+ # upload workflow and get back the workflow wrapper
+
+ workflow_wrapper = new_upload_workflow(self, tool, job_order,
+ runtimeContext.project_uuid,
+ runtimeContext,
+ uuid=runtimeContext.update_workflow,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map,
+ submit_runner_image=runtimeContext.submit_runner_image,
+ git_info=git_info,
+ set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow))
+
+ if runtimeContext.update_workflow or runtimeContext.create_workflow:
+ # Now create a workflow record and exit.
+ uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
+ runtimeContext.project_uuid, runtimeContext.update_workflow)
self.stdout.write(uuid + "\n")
return (None, "success")
+ loadingContext.loader.idx["_:main"] = workflow_wrapper
+
+ # Reload just the wrapper workflow.
+ self.fast_submit = True
+ tool = load_tool(workflow_wrapper, loadingContext)
+
+
self.apply_reqs(job_order, tool)
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 5c09e671f..716cda335 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -17,7 +17,7 @@ import logging
import threading
from collections import OrderedDict
-import ruamel.yaml as yaml
+import ruamel.yaml
import cwltool.stdfsaccess
from cwltool.pathmapper import abspath
@@ -235,7 +235,8 @@ class CollectionFetcher(DefaultFetcher):
return f.read()
if url.startswith("arvwf:"):
record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
- definition = yaml.round_trip_load(record["definition"])
+ yaml = ruamel.yaml.YAML(typ='rt', pure=True)
+ definition = yaml.load(record["definition"])
definition["label"] = record["name"]
return yaml.round_trip_dump(definition)
return super(CollectionFetcher, self).fetch_text(url)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index dc6d0df3f..c2c992d44 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -447,17 +447,15 @@ def upload_dependencies(arvrunner, name, document_loader,
single_collection=True,
optional_deps=optional_deps)
- print("MMM", mapper._pathmap)
-
keeprefs = set()
def addkeepref(k):
if k.startswith("keep:"):
keeprefs.add(collection_pdh_pattern.match(k).group(1))
- def setloc(p):
+
+ def collectloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
- p["location"] = mapper.mapper(p["location"]).resolved
addkeepref(p["location"])
return
@@ -488,12 +486,10 @@ def upload_dependencies(arvrunner, name, document_loader,
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
"Collection uuid %s not found" % uuid)
- p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
- p[collectionUUID] = uuid
- #with Perf(metrics, "setloc"):
- # visit_class(workflowobj, ("File", "Directory"), setloc)
- # visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "collectloc"):
+ visit_class(workflowobj, ("File", "Directory"), collectloc)
+ visit_class(discovered, ("File", "Directory"), collectloc)
if discovered_secondaryfiles is not None:
for d in discovered:
@@ -518,7 +514,6 @@ def upload_dependencies(arvrunner, name, document_loader,
continue
col = col["items"][0]
col["name"] = arvados.util.trim_name(col["name"])
- print("CCC name", col["name"])
try:
arvrunner.api.collections().create(body={"collection": {
"owner_uuid": runtimeContext.project_uuid,
@@ -553,20 +548,10 @@ def upload_docker(arvrunner, tool, runtimeContext):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, runtimeContext)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
@@ -630,6 +615,45 @@ def tag_git_version(packed):
else:
packed["http://schema.org/version"] = githash
+def setloc(mapper, p):
+ loc = p.get("location")
+ if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
+ return
+
+ if not loc:
+ return
+
+ if collectionUUID in p:
+ uuid = p[collectionUUID]
+ if uuid not in uuid_map:
+ raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ gp = collection_pdh_pattern.match(loc)
+ if gp and uuid_map[uuid] != gp.groups()[0]:
+ # This file entry has both collectionUUID and a PDH
+ # location. If the PDH doesn't match the one returned
+ # the API server, raise an error.
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Expected collection uuid %s to be %s but API server reported %s" % (
+ uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
+ gp = collection_uuid_pattern.match(loc)
+ if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ return
+
+ uuid = gp.groups()[0]
+ if uuid not in uuid_map:
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+ p[collectionUUID] = uuid
+
+
+def update_from_mapper(workflowobj, mapper):
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), partial(setloc, mapper))
def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
@@ -680,6 +704,8 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
if "job_order" in job_order:
del job_order["job_order"]
+ update_from_mapper(job_order, jobmapper)
+
return job_order
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
@@ -720,7 +746,6 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
discovered_secondaryfiles=discovered_secondaryfiles,
cache=tool_dep_cache)
- print("PM", pm.items())
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
@@ -734,12 +759,7 @@ def arvados_jobs_image(arvrunner, img, runtimeContext):
try:
return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
- True,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ True, runtimeContext)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
@@ -783,8 +803,8 @@ class Runner(Process):
collection_cache_is_default=True,
git_info=None):
- loadingContext = loadingContext.copy()
- loadingContext.metadata = updated_tool.metadata.copy()
+ self.loadingContext = loadingContext.copy()
+ self.loadingContext.metadata = updated_tool.metadata.copy()
super(Runner, self).__init__(updated_tool.tool, loadingContext)
@@ -809,9 +829,9 @@ class Runner(Process):
self.intermediate_output_ttl = intermediate_output_ttl
self.priority = priority
self.secret_store = secret_store
- self.enable_dev = loadingContext.enable_dev
+ self.enable_dev = self.loadingContext.enable_dev
self.git_info = git_info
- self.fast_parser = loadingContext.fast_parser
+ self.fast_parser = self.loadingContext.fast_parser
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index bad4d4408..5820b56b5 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -8,7 +8,7 @@ from builtins import object
import arvados_cwl
import arvados_cwl.context
import arvados_cwl.util
-from arvados_cwl.arvdocker import arv_docker_clear_cache
+#from arvados_cwl.arvdocker import arv_docker_clear_cache
import copy
import arvados.config
import logging
@@ -61,7 +61,7 @@ class TestContainer(unittest.TestCase):
def setUp(self):
cwltool.process._names = set()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
def tearDown(self):
root_logger = logging.getLogger('')
@@ -128,7 +128,7 @@ class TestContainer(unittest.TestCase):
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_run(self, keepdocker):
for enable_reuse in (True, False):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@@ -929,7 +929,7 @@ class TestContainer(unittest.TestCase):
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_setting_storage_class(self, keepdocker):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@@ -1005,7 +1005,7 @@ class TestContainer(unittest.TestCase):
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_setting_process_properties(self, keepdocker):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@@ -1101,7 +1101,7 @@ class TestContainer(unittest.TestCase):
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_cuda_requirement(self, keepdocker):
arvados_cwl.add_arv_hints()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@@ -1206,7 +1206,7 @@ class TestContainer(unittest.TestCase):
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_match_local_docker(self, keepdocker, determine_image_id):
arvados_cwl.add_arv_hints()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@@ -1280,7 +1280,7 @@ class TestContainer(unittest.TestCase):
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(container_request))
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runtimeContext.match_local_docker = True
container_request['container_image'] = '99999999999999999999999999999993+99'
container_request['name'] = 'test_run_True_2'
@@ -1298,7 +1298,7 @@ class TestContainer(unittest.TestCase):
arvados_cwl.add_arv_hints()
for enable_preemptible in (None, True, False):
for preemptible_hint in (None, True, False):
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
runner = mock.MagicMock()
runner.ignore_docker_for_reuse = False
@@ -1445,7 +1445,7 @@ class TestContainer(unittest.TestCase):
class TestWorkflow(unittest.TestCase):
def setUp(self):
cwltool.process._names = set()
- arv_docker_clear_cache()
+ #arv_docker_clear_cache()
def helper(self, runner, enable_reuse=True):
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index dcbee726b..4fbca5c05 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -42,7 +42,7 @@ import arvados.keep
from .matcher import JsonDiffMatcher, StripYAMLComments
from .mock_discovery import get_rootDesc
-import ruamel.yaml as yaml
+import ruamel.yaml
_rootDesc = None
@@ -179,12 +179,6 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
- stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- stubs.api.jobs().create().execute.return_value = {
- "uuid": stubs.expect_job_uuid,
- "state": "Queued",
- }
-
stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
stubs.api.container_requests().create().execute.return_value = {
"uuid": stubs.expect_container_request_uuid,
@@ -192,96 +186,11 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
"state": "Queued"
}
- stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.api.pipeline_templates().create().execute.return_value = {
- "uuid": stubs.expect_pipeline_template_uuid,
- }
- stubs.expect_job_spec = {
- 'runtime_constraints': {
- 'docker_image': '999999999999999999999999999999d3+99',
- 'min_ram_mb_per_node': 1024
- },
- 'script_parameters': {
- 'x': {
- 'basename': 'blorp.txt',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- 'class': 'File'
- },
- 'y': {
- 'basename': '99999999999999999999999999999998+99',
- 'location': 'keep:99999999999999999999999999999998+99',
- 'class': 'Directory'
- },
- 'z': {
- 'basename': 'anonymous',
- "listing": [{
- "basename": "renamed.txt",
- "class": "File",
- "location": "keep:99999999999999999999999999999998+99/file1.txt",
- "size": 0
- }],
- 'class': 'Directory'
- },
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner'
- }
- stubs.pipeline_component = stubs.expect_job_spec.copy()
- stubs.expect_pipeline_instance = {
- 'name': 'submit_wf.cwl',
- 'state': 'RunningOnServer',
- 'owner_uuid': None,
- "components": {
- "cwl-runner": {
- 'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
- 'script_parameters': {
- 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {
- 'basename': 'blorp.txt',
- 'class': 'File',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- "size": 16
- }},
- 'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
- 'listing': [
- {
- 'basename': 'renamed.txt',
- 'class': 'File', 'location':
- 'keep:99999999999999999999999999999998+99/file1.txt',
- 'size': 0
- }
- ]}},
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
- 'arv:debug': True,
- 'arv:enable_reuse': True,
- 'arv:on_error': 'continue'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner',
- 'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
- }
- }
- }
- stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
- stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
- stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "state": "Queued"
- }
- stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
- stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
-
cwd = os.getcwd()
- filepath = os.path.join(cwd, "tests/wf/submit_wf_packed.cwl")
+ filepath = os.path.join(cwd, "tests/wf/submit_wf_wrapper.cwl")
with open(filepath) as f:
- expect_packed_workflow = yaml.round_trip_load(f)
+ yaml = ruamel.yaml.YAML(typ='rt', pure=True)
+ expect_packed_workflow = yaml.load(f)
if wfpath is None:
wfpath = wfname
@@ -291,14 +200,14 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
mocktool = mock.NonCallableMock(tool=gitinfo_workflow["$graph"][0], metadata=gitinfo_workflow)
stubs.git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
- expect_packed_workflow.update(stubs.git_info)
+ #expect_packed_workflow.update(stubs.git_info)
stubs.git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in stubs.git_info.items()}
- if wfname == wfpath:
- container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
- else:
- container_name = wfname
+ #if wfname == wfpath:
+ # container_name = "%s (%s)" % (wfpath, stubs.git_props["arv:gitDescribe"])
+ #else:
+ container_name = wfname
stubs.expect_container_spec = {
'priority': 500,
@@ -358,10 +267,11 @@ def stubs(wfdetails=('submit_wf.cwl', None)):
'vcpus': 1,
'ram': (1024+256)*1024*1024
},
+ 'properties': {},
'use_existing': False,
- 'properties': stubs.git_props,
'secret_mounts': {}
}
+ #'properties': stubs.git_props,
stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
stubs.api.workflows().create().execute.return_value = {
@@ -383,7 +293,7 @@ class TestSubmit(unittest.TestCase):
def setUp(self):
cwltool.process._names = set()
- arvados_cwl.arvdocker.arv_docker_clear_cache()
+ #arvados_cwl.arvdocker.arv_docker_clear_cache()
def tearDown(self):
root_logger = logging.getLogger('')
@@ -405,7 +315,7 @@ class TestSubmit(unittest.TestCase):
@stubs()
def test_submit_container(self, stubs):
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=containers", "--debug",
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--disable-git",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
@@ -414,7 +324,7 @@ class TestSubmit(unittest.TestCase):
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl ('+ stubs.git_props["arv:gitDescribe"] +') input (169f39d466a5438ac4a90e779bf750c7+53)',
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
@@ -1104,7 +1014,7 @@ class TestSubmit(unittest.TestCase):
@mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
@mock.patch("arvados.api")
def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
- arvados_cwl.arvdocker.arv_docker_clear_cache()
+ #arvados_cwl.arvdocker.arv_docker_clear_cache()
arvrunner = mock.MagicMock()
arvrunner.project_uuid = ""
@@ -1641,7 +1551,7 @@ class TestCreateWorkflow(unittest.TestCase):
def setUp(self):
cwltool.process._names = set()
- arvados_cwl.arvdocker.arv_docker_clear_cache()
+ #arvados_cwl.arvdocker.arv_docker_clear_cache()
def tearDown(self):
root_logger = logging.getLogger('')
commit b420ec626f8cb5cd7a8b4252dfc2be76ba3ba844
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Jan 9 15:39:54 2023 -0500
19385: new_upload_workflow work in progress
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index f66e50dca..784e1bdb3 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -129,11 +129,16 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
def rel_ref(s, baseuri, urlexpander, merged_map):
uri = urlexpander(s, baseuri)
- if baseuri in merged_map:
- replacements = merged_map[baseuri].resolved
+ print("DDD", baseuri, merged_map)
+ fileuri = urllib.parse.urldefrag(baseuri)[0]
+ if fileuri in merged_map:
+ replacements = merged_map[fileuri].resolved
if uri in replacements:
return replacements[uri]
+ if s.startswith("keep:"):
+ return s
+
p1 = os.path.dirname(uri_file_path(baseuri))
p2 = os.path.dirname(uri_file_path(uri))
p3 = os.path.basename(uri_file_path(uri))
@@ -146,15 +151,13 @@ def rel_ref(s, baseuri, urlexpander, merged_map):
def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
- if isinstance(d, CommentedSeq):
- if set_block_style:
- d.fa.set_block_style()
+ if set_block_style and (isinstance(d, CommentedSeq) or isinstance(d, CommentedMap)):
+ d.fa.set_block_style()
+
+ if isinstance(d, MutableSequence):
for s in d:
update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
- elif isinstance(d, CommentedMap):
- if set_block_style:
- d.fa.set_block_style()
-
+ elif isinstance(d, MutableMapping):
if "id" in d:
baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
@@ -251,7 +254,88 @@ def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
- return col.manifest_locator()
+ toolfile = tool.tool["id"][n+1:]
+
+ # now construct the wrapper
+
+ step = {
+ "id": "#main/" + toolname,
+ "in": [],
+ "out": [],
+ "run": "keep:%s/%s" % (col.portable_data_hash(), toolfile),
+ "label": name
+ }
+
+ main = tool.tool
+
+ newinputs = []
+ for i in main["inputs"]:
+ inp = {}
+ # Make sure to only copy known fields that are meaningful at
+ # the workflow level. In practice this ensures that if we're
+ # wrapping a CommandLineTool we don't grab inputBinding.
+ # Right now also excludes extension fields, which is fine,
+ # Arvados doesn't currently look for any extension fields on
+ # input parameters.
+ for f in ("type", "label", "secondaryFiles", "streamable",
+ "doc", "format", "loadContents",
+ "loadListing", "default"):
+ if f in i:
+ inp[f] = i[f]
+ inp["id"] = "#main/%s" % shortname(i["id"])
+ newinputs.append(inp)
+
+ wrapper = {
+ "class": "Workflow",
+ "id": "#main",
+ "inputs": newinputs,
+ "outputs": [],
+ "steps": [step]
+ }
+
+ for i in main["inputs"]:
+ step["in"].append({
+ "id": "#main/step/%s" % shortname(i["id"]),
+ "source": "#main/%s" % shortname(i["id"])
+ })
+
+ for i in main["outputs"]:
+ step["out"].append({"id": "#main/step/%s" % shortname(i["id"])})
+ wrapper["outputs"].append({"outputSource": "#main/step/%s" % shortname(i["id"]),
+ "type": i["type"],
+ "id": "#main/%s" % shortname(i["id"])})
+
+ wrapper["requirements"] = [{"class": "SubworkflowFeatureRequirement"}]
+
+ if main.get("requirements"):
+ wrapper["requirements"].extend(main["requirements"])
+ if main.get("hints"):
+ wrapper["hints"] = main["hints"]
+
+ doc = {"cwlVersion": "v1.2", "$graph": [wrapper]}
+
+ if git_info:
+ for g in git_info:
+ doc[g] = git_info[g]
+
+ update_refs(wrapper, main["id"], tool.doc_loader.expand_url, merged_map, False)
+
+ wrappertext = json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
+
+ body = {
+ "workflow": {
+ "name": name,
+ "description": tool.tool.get("doc", ""),
+ "definition": wrappertext
+ }}
+ if project_uuid:
+ body["workflow"]["owner_uuid"] = project_uuid
+
+ if uuid:
+ call = arvRunner.api.workflows().update(uuid=uuid, body=body)
+ else:
+ call = arvRunner.api.workflows().create(body=body)
+ return call.execute(num_retries=arvRunner.num_retries)["uuid"]
def upload_workflow(arvRunner, tool, job_order, project_uuid,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 7364df334..dc6d0df3f 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -447,6 +447,8 @@ def upload_dependencies(arvrunner, name, document_loader,
single_collection=True,
optional_deps=optional_deps)
+ print("MMM", mapper._pathmap)
+
keeprefs = set()
def addkeepref(k):
if k.startswith("keep:"):
@@ -489,9 +491,9 @@ def upload_dependencies(arvrunner, name, document_loader,
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
- with Perf(metrics, "setloc"):
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ #with Perf(metrics, "setloc"):
+ # visit_class(workflowobj, ("File", "Directory"), setloc)
+ # visit_class(discovered, ("File", "Directory"), setloc)
if discovered_secondaryfiles is not None:
for d in discovered:
@@ -515,6 +517,8 @@ def upload_dependencies(arvrunner, name, document_loader,
logger.warning("Cannot find collection with portable data hash %s", kr)
continue
col = col["items"][0]
+ col["name"] = arvados.util.trim_name(col["name"])
+ print("CCC name", col["name"])
try:
arvrunner.api.collections().create(body={"collection": {
"owner_uuid": runtimeContext.project_uuid,
@@ -527,7 +531,7 @@ def upload_dependencies(arvrunner, name, document_loader,
"trash_at": col["trash_at"]
}}, ensure_unique_name=True).execute()
except Exception as e:
- logger.warning("Unable copy collection to destination: %s", e)
+ logger.warning("Unable to copy collection to destination: %s", e)
if "$schemas" in workflowobj:
sch = CommentedSeq()
@@ -683,7 +687,7 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- # testing only
+ # commented out for testing only, uncomment me
#with Perf(metrics, "upload_docker"):
# upload_docker(arvrunner, tool, runtimeContext)
@@ -715,6 +719,8 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles,
cache=tool_dep_cache)
+
+ print("PM", pm.items())
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 96d12f758..ae9f6d0d4 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,8 +36,8 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220913185150',
- 'schema-salad==8.3.20220913105718',
+ 'cwltool==3.1.20221224142944',
+ 'schema-salad>8.3.20220913105718',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
'networkx < 2.6',
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index c383d529e..a4b7e64a0 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -500,3 +500,20 @@ def get_vocabulary_once(svc):
if not hasattr(svc, '_cached_vocabulary'):
svc._cached_vocabulary = svc.vocabularies().get().execute()
return svc._cached_vocabulary
+
+def trim_name(collectionname):
+ """
+ trim_name takes a record name (collection name, project name, etc)
+ and trims it to fit the 255 character name limit, with additional
+ space for the timestamp added by ensure_unique_name, by removing
+ excess characters from the middle and inserting an ellipse
+ """
+
+ max_name_len = 254 - 28
+
+ if len(collectionname) > max_name_len:
+ over = len(collectionname) - max_name_len
+ split = int(max_name_len/2)
+ collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
+
+ return collectionname
commit 62e7af59cbad5577423b844213be7b2f59709602
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Jan 6 17:21:07 2023 -0500
19385: Messy work in progress for uploading workflows to collections
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index cf0b3b9da..ef8398df3 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -121,7 +121,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
if not out_of_project_images:
# Fetch Docker image if necessary.
try:
- result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
+ dockerjob = cwltool.docker.DockerCommandLineJob(None, None, None, None, None, None)
+ result = dockerjob.get_image(dockerRequirement, pull_image,
force_pull, tmp_outdir_prefix)
if not result:
raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 56226388d..f66e50dca 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -9,6 +9,14 @@ import os
import json
import copy
import logging
+import urllib
+from io import StringIO
+import sys
+
+from typing import (MutableSequence, MutableMapping)
+
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.sourceline import SourceLine, cmap
import schema_salad.ref_resolver
@@ -22,6 +30,8 @@ from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
from cwltool.context import LoadingContext
+from schema_salad.ref_resolver import file_uri, uri_file_path
+
import ruamel.yaml as yaml
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
@@ -117,6 +127,133 @@ def make_wrapper_workflow(arvRunner, main, packed, project_uuid, name, git_info,
return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
+def rel_ref(s, baseuri, urlexpander, merged_map):
+ uri = urlexpander(s, baseuri)
+ if baseuri in merged_map:
+ replacements = merged_map[baseuri].resolved
+ if uri in replacements:
+ return replacements[uri]
+
+ p1 = os.path.dirname(uri_file_path(baseuri))
+ p2 = os.path.dirname(uri_file_path(uri))
+ p3 = os.path.basename(uri_file_path(uri))
+ r = os.path.relpath(p2, p1)
+ if r == ".":
+ r = ""
+ print("AAA", uri, s)
+ print("BBBB", p1, p2, p3, r)
+ return os.path.join(r, p3)
+
+
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+ if isinstance(d, CommentedSeq):
+ if set_block_style:
+ d.fa.set_block_style()
+ for s in d:
+ update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+ elif isinstance(d, CommentedMap):
+ if set_block_style:
+ d.fa.set_block_style()
+
+ if "id" in d:
+ baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+
+ for s in d:
+ for field in ("$include", "$import", "location", "run"):
+ if field in d and isinstance(d[field], str):
+ d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
+
+ if "$schemas" in d:
+ for n, s in enumerate(d["$schemas"]):
+ d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
+
+ update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+
+def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
+ runtimeContext, uuid=None,
+ submit_runner_ram=0, name=None, merged_map=None,
+ submit_runner_image=None,
+ git_info=None):
+
+ firstfile = None
+ workflow_files = set()
+ import_files = set()
+ include_files = set()
+
+ for w in tool.doc_loader.idx:
+ if w.startswith("file://"):
+ workflow_files.add(urllib.parse.urldefrag(w)[0])
+ if firstfile is None:
+ firstfile = urllib.parse.urldefrag(w)[0]
+ if w.startswith("import:file://"):
+ import_files.add(urllib.parse.urldefrag(w[7:])[0])
+ if w.startswith("include:file://"):
+ include_files.add(urllib.parse.urldefrag(w[8:])[0])
+
+ all_files = workflow_files | import_files | include_files
+
+ n = 7
+ allmatch = True
+ while allmatch:
+ n += 1
+ for f in all_files:
+ if len(f)-1 < n:
+ n -= 1
+ allmatch = False
+ break
+ if f[n] != firstfile[n]:
+ allmatch = False
+ break
+
+ while firstfile[n] != "/":
+ n -= 1
+
+ prefix = firstfile[:n+1]
+
+ col = arvados.collection.Collection()
+
+ #print(merged_map.keys())
+
+ for w in workflow_files | import_files:
+ # 1. load YAML
+
+ text = tool.doc_loader.fetch_text(w)
+ if isinstance(text, bytes):
+ textIO = StringIO(text.decode('utf-8'))
+ else:
+ textIO = StringIO(text)
+
+ yamlloader = schema_salad.utils.yaml_no_ts()
+ result = yamlloader.load(textIO)
+
+ set_block_style = False
+ if result.fa.flow_style():
+ set_block_style = True
+
+ # 2. find $import, $include, $schema, run, location
+ # 3. update field value
+ update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+
+ with col.open(w[n+1:], "wt") as f:
+ yamlloader.dump(result, stream=f)
+
+ for w in include_files:
+ with col.open(w[n+1:], "wb") as f1:
+ with open(uri_file_path(w), "rb") as f2:
+ dat = f2.read(65536)
+ while dat:
+ f1.write(dat)
+ dat = f2.read(65536)
+
+ toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
+ toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
+
+ col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
+
+ return col.manifest_locator()
+
+
def upload_workflow(arvRunner, tool, job_order, project_uuid,
runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
@@ -139,7 +276,7 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid,
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
upload_dependencies(arvRunner, name, tool.doc_loader,
- packed, tool.tool["id"], False,
+ packed, tool.tool["id"],
runtimeContext)
wf_runner_resources = None
@@ -286,7 +423,6 @@ class ArvadosWorkflow(Workflow):
self.doc_loader,
joborder,
joborder.get("id", "#"),
- False,
runtimeContext)
if self.wf_pdh is None:
@@ -330,7 +466,6 @@ class ArvadosWorkflow(Workflow):
self.doc_loader,
packed,
self.tool["id"],
- False,
runtimeContext)
# Discover files/directories referenced by the
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 382c1643e..1e6344f5e 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -36,7 +36,7 @@ import arvados_cwl.util
from .arvcontainer import RunnerContainer, cleanup_name_for_collection
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
-from .arvworkflow import ArvadosWorkflow, upload_workflow
+from .arvworkflow import ArvadosWorkflow, upload_workflow, new_upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
from .perf import Perf
from .pathmapper import NoFollowPathMapper
@@ -674,16 +674,16 @@ The 'jobs' API is no longer supported.
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
loadingContext.disable_js_validation = True
- if submitting and not self.fast_submit:
- loadingContext.do_update = False
- # Document may have been auto-updated. Reload the original
- # document with updating disabled because we want to
- # submit the document with its original CWL version, not
- # the auto-updated one.
- with Perf(metrics, "load_tool original"):
- tool = load_tool(updated_tool.tool["id"], loadingContext)
- else:
- tool = updated_tool
+ # if submitting and not self.fast_submit:
+ # loadingContext.do_update = False
+ # # Document may have been auto-updated. Reload the original
+ # # document with updating disabled because we want to
+ # # submit the document with its original CWL version, not
+ # # the auto-updated one.
+ # with Perf(metrics, "load_tool original"):
+ # tool = load_tool(updated_tool.tool["id"], loadingContext)
+ # else:
+ tool = updated_tool
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
@@ -702,13 +702,13 @@ The 'jobs' API is no longer supported.
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
loadingContext.skip_resolve_all = True
- with Perf(metrics, "load_tool"):
- tool = load_tool(tool.tool, loadingContext)
+ #with Perf(metrics, "load_tool"):
+ # tool = load_tool(tool.tool, loadingContext)
if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
- uuid = upload_workflow(self, tool, job_order,
+ uuid = new_upload_workflow(self, tool, job_order,
runtimeContext.project_uuid,
runtimeContext,
uuid=runtimeContext.update_workflow,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 25c7eaf6c..7364df334 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -295,7 +295,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run, runtimeContext,
+ workflowobj, uri, runtimeContext,
include_primary=True, discovered_secondaryfiles=None,
cache=None):
"""Upload the dependencies of the workflowobj document to Keep.
@@ -303,64 +303,27 @@ def upload_dependencies(arvrunner, name, document_loader,
Returns a pathmapper object mapping local paths to keep references. Also
does an in-place update of references in "workflowobj".
- Use scandeps to find $import, $include, $schemas, run, File and Directory
+ Use scandeps to find $schemas, File and Directory
fields that represent external references.
If workflowobj has an "id" field, this will reload the document to ensure
it is scanning the raw document prior to preprocessing.
"""
- loaded = set()
- def loadref(b, u):
- joined = document_loader.fetcher.urljoin(b, u)
- defrg, _ = urllib.parse.urldefrag(joined)
- if defrg not in loaded:
- loaded.add(defrg)
- if cache is not None and defrg in cache:
- return cache[defrg]
- # Use fetch_text to get raw file (before preprocessing).
- text = document_loader.fetch_text(defrg)
- if isinstance(text, bytes):
- textIO = StringIO(text.decode('utf-8'))
- else:
- textIO = StringIO(text)
- yamlloader = YAML(typ='safe', pure=True)
- result = yamlloader.load(textIO)
- if cache is not None:
- cache[defrg] = result
- return result
- else:
- return {}
-
- if loadref_run:
- loadref_fields = set(("$import", "run"))
- else:
- loadref_fields = set(("$import",))
-
scanobj = workflowobj
- if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
- defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
- if cache is not None and defrg not in cache:
- # if we haven't seen this file before, want raw file
- # content (before preprocessing) to ensure that external
- # references like $include haven't already been inlined.
- scanobj = loadref("", workflowobj["id"])
-
metadata = scanobj
- with Perf(metrics, "scandeps include, location"):
+ with Perf(metrics, "scandeps"):
sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
+ set(),
+ set(("location",)),
+ None, urljoin=document_loader.fetcher.urljoin,
nestdirs=False)
-
- with Perf(metrics, "scandeps $schemas"):
optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ set(),
+ set(("$schemas",)),
+ None, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
if sc_result is None:
sc_result = []
@@ -703,7 +666,6 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
jobloader,
job_order,
job_order.get("id", "#"),
- False,
runtimeContext)
if "id" in job_order:
@@ -721,8 +683,9 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- with Perf(metrics, "upload_docker"):
- upload_docker(arvrunner, tool, runtimeContext)
+ # testing only
+ #with Perf(metrics, "upload_docker"):
+ # upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
@@ -748,7 +711,6 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
document_loader,
deptool,
deptool["id"],
- False,
runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles,
commit 9e1828a4263d7efc8fe7149c8c63ce0477551e8c
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed Jan 4 10:31:04 2023 -0500
19425: Fix rebase error
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c8a9f23e..30e0092e9 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -622,7 +622,7 @@ class RunnerContainer(Runner):
if runtimeContext.prefer_cached_downloads:
command.append("--prefer-cached-downloads")
- if runtimeContext.fast_parser:
+ if self.fast_parser:
command.append("--fast-parser")
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
commit 28b5ebc08c632a7d947883910b565ffdc6f85b68
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue Sep 13 16:50:09 2022 -0400
19425: Bump to released cwltool & schema-salad
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index c4255b351..96d12f758 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,8 +36,8 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220910163051',
- 'schema-salad==8.3.20220825114525',
+ 'cwltool==3.1.20220913185150',
+ 'schema-salad==8.3.20220913105718',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
'networkx < 2.6',
commit 472742c1b4c2abd62325471094f25bbbb4770ede
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Sun Sep 11 23:05:37 2022 -0400
19425: Set loadingContext.skip_resolve_all
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 356050e89..382c1643e 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -701,7 +701,7 @@ The 'jobs' API is no longer supported.
loadingContext.loader = tool.doc_loader
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
- loadingContext.codegen_idx = {}
+ loadingContext.skip_resolve_all = True
with Perf(metrics, "load_tool"):
tool = load_tool(tool.tool, loadingContext)
commit fec959159de7bb7bed944bd15868e11be0ecdcfa
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Sun Sep 11 22:07:50 2022 -0400
19425: Clear codegen_idx before reloading
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 447c14b8b..356050e89 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -697,10 +697,11 @@ The 'jobs' API is no longer supported.
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
# updated by upload_workflow_deps in ways that modify
- # inheritance of hints or requirements.
+ # hints or requirements.
loadingContext.loader = tool.doc_loader
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
+ loadingContext.codegen_idx = {}
with Perf(metrics, "load_tool"):
tool = load_tool(tool.tool, loadingContext)
commit 58287d8af844ed344736db29590b2b06c58c97c8
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Sat Sep 10 12:29:51 2022 -0400
19425: Fix passing --fast-parser
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 486103919..25c7eaf6c 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -843,6 +843,7 @@ class Runner(Process):
self.secret_store = secret_store
self.enable_dev = loadingContext.enable_dev
self.git_info = git_info
+ self.fast_parser = loadingContext.fast_parser
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index e4f4f0f34..c4255b351 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,7 +36,7 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220909193950',
+ 'cwltool==3.1.20220910163051',
'schema-salad==8.3.20220825114525',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
commit 0920e90f2454f47f1e4cf97ebbceb69e8c443e59
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Sep 9 17:09:55 2022 -0400
19425: cwltool dev
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 0015aad7f..e4f4f0f34 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,7 +36,7 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220909192237',
+ 'cwltool==3.1.20220909193950',
'schema-salad==8.3.20220825114525',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
commit 0eee268be6c2d47f310f49bf74bd356aaf5ad163
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Sep 9 15:26:43 2022 -0400
19425: cwltool dev version bump
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index a15c085da..4c8a9f23e 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -622,6 +622,9 @@ class RunnerContainer(Runner):
if runtimeContext.prefer_cached_downloads:
command.append("--prefer-cached-downloads")
+ if runtimeContext.fast_parser:
+ command.append("--fast-parser")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 8d1cbd237..0015aad7f 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,7 +36,7 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220909180746',
+ 'cwltool==3.1.20220909192237',
'schema-salad==8.3.20220825114525',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
commit 889e9fa0fe0faf311770d6bf7639432853544d79
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Sep 9 14:16:44 2022 -0400
19425: Add experimental --fast-parser
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7818ac84f..20eed989c 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -212,6 +212,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
action="store_true", default=False,
help=argparse.SUPPRESS)
+ parser.add_argument("--fast-parser", dest="fast_parser",
+ action="store_true", default=False,
+ help=argparse.SUPPRESS)
+
parser.add_argument("--thread-count", type=int,
default=0, help="Number of threads to use for job submit and output collection.")
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index e1a5077fb..8d1cbd237 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,8 +36,8 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220907141119',
- 'schema-salad==8.3.20220913105718',
+ 'cwltool==3.1.20220909180746',
+ 'schema-salad==8.3.20220825114525',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
'networkx < 2.6',
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list