[ARVADOS] created: 5e701cdce50d617a40c141a21cd31cfa5d209f66
Git user
git at public.curoverse.com
Fri Jan 20 10:02:04 EST 2017
at 5e701cdce50d617a40c141a21cd31cfa5d209f66 (commit)
commit 5e701cdce50d617a40c141a21cd31cfa5d209f66
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jan 19 13:29:11 2017 -0500
10812: Update cwltool and schema-salad dependencies.
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 55b0485..c3a5bcd 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -48,8 +48,8 @@ setup(name='arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170118141124',
- 'schema-salad==2.2.20170111180227',
+ 'cwltool==1.0.20170119234115',
+ 'schema-salad==2.2.20170119151016',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170112173420',
'setuptools'
commit 64e215fa5f76bf9f4e95d281ae916d2bbd85fba5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 18 17:46:54 2017 -0500
10812: Handle $schema references.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 437e925..53a4a6c 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -98,6 +98,10 @@ def upload_dependencies(arvrunner, name, document_loader,
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
+ if "$schemas" in workflowobj:
+ for s in workflowobj["$schemas"]:
+ sc.append({"class": "File", "location": s})
+
mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
"keep:%s/%s",
@@ -109,6 +113,12 @@ def upload_dependencies(arvrunner, name, document_loader,
adjustFileObjs(workflowobj, setloc)
adjustDirObjs(workflowobj, setloc)
+ if "$schemas" in workflowobj:
+ sch = []
+ for s in workflowobj["$schemas"]:
+ sch.append(mapper.mapper(s).resolved)
+ workflowobj["$schemas"] = sch
+
return mapper
commit cea2b003e4a5f4c25234cca6a1a86d8e84d267f0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 18 11:47:07 2017 -0500
10812: Don't use cStringIO which doesn't like unicode strings.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 5d2a2d9..437e925 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -6,7 +6,7 @@ import json
import re
import subprocess
-from cStringIO import StringIO
+from StringIO import StringIO
from schema_salad.sourceline import SourceLine
commit 1fe053a25f53372644128839ee70c370622c6707
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 16 10:51:10 2017 -0500
10812: Check for dockerOutputDirectory and raise UnsupportedRequirement up front.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index a5fa4e0..a0c9d57 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -211,6 +211,11 @@ class ArvCwlRunner(object):
raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
if obj.get("stderr"):
raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
+ if obj.get("class") == "DockerRequirement":
+ if obj.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 1b52fb5..8b3db8f 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl'
+ 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main'
},
'repository': 'arvados',
'script_version': 'master',
@@ -146,7 +146,7 @@ def stubs(func):
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
+ 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -180,7 +180,7 @@ def stubs(func):
'kind': 'collection'
},
'/var/lib/cwl/workflow.json': {
- 'json': expect_packed_workflow,
+ 'content': expect_packed_workflow,
'kind': 'json'
},
'stdout': {
@@ -680,7 +680,7 @@ class TestSubmit(unittest.TestCase):
},
'/var/lib/cwl/workflow.json': {
'kind': 'json',
- 'json': {
+ 'content': {
'cwlVersion': 'v1.0',
'$graph': [
{
@@ -1080,7 +1080,7 @@ class TestTemplateInputs(unittest.TestCase):
},
'script_parameters': {
'cwl:tool':
- '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl',
+ '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl#main',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
commit 7d68c26a0ecc58e87f923369880f39d1434163bd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 16 10:22:30 2017 -0500
10812: Fix imports, reference #main in cwl:tool
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index f68d99e..bf1052c 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -4,13 +4,15 @@ import copy
import json
import time
-from cwltool.process import get_feature, shortname
+from cwltool.process import get_feature, shortname, UnsupportedRequirement
from cwltool.errors import WorkflowException
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
from cwltool.pathmapper import adjustDirObjs
+from schema_salad.sourceline import SourceLine
+
import ruamel.yaml as yaml
import arvados.collection
@@ -271,7 +273,7 @@ class RunnerJob(Runner):
else:
packed = packed_workflow(self.arvrunner, self.tool)
wf_pdh = self.upload_workflow_collection(packed)
- self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
+ self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
adjustDirObjs(self.job_order, trim_listing)
commit 01e67e3cd5a99e3c09a42543a0f9ff4b589dc66f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 16:33:11 2017 -0500
10812: Improve check that already packed workflow collection exists with same
PDH and similar name. Don't crash when registering pipeline template that has
array inputs.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 66ac000..a5fa4e0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 51e5956..6015c1d 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -236,7 +236,7 @@ class RunnerContainer(Runner):
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "json": packed
+ "content": packed
}
if self.tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index b7a6eb0..f68d99e 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -241,9 +241,11 @@ class RunnerJob(Runner):
with collection.open("workflow.cwl", "w") as f:
f.write(yaml.round_trip_dump(packed))
- exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]
+ if self.arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
+ exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
@@ -385,10 +387,12 @@ class RunnerTemplate(object):
if not isinstance(types, list):
types = [types]
param['required'] = 'null' not in types
- non_null_types = set(types) - set(['null'])
+ non_null_types = [t for t in types if t != "null"]
if len(non_null_types) == 1:
the_type = [c for c in non_null_types][0]
- dataclass = self.type_to_dataclass.get(the_type)
+ dataclass = None
+ if isinstance(the_type, basestring):
+ dataclass = self.type_to_dataclass.get(the_type)
if dataclass:
param['dataclass'] = dataclass
# Note: If we didn't figure out a single appropriate
commit 9300aec2ac1790a092823f412c9c577770fda670
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 15:51:54 2017 -0500
10812: Don't try to upload embedded tools separately unless they have an
identifier.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 5306e90..5d2a2d9 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -184,14 +184,15 @@ def upload_workflow_deps(arvrunner, tool):
document_loader = tool.doc_loader
def upload_tool_deps(deptool):
- upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- include_primary=False)
- document_loader.idx[deptool["id"]] = deptool
+ if "id" in deptool:
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
tool.visit(upload_tool_deps)
commit 75d3a577ab554b3ac22b5f6ccda9bbb8fd984eb7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 08:41:08 2017 -0500
10812: Handle workflow keep references.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 9b38c38..51e5956 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -190,8 +190,6 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- packed = packed_workflow(self.arvrunner, self.tool)
-
adjustDirObjs(self.job_order, trim_listing)
container_req = {
@@ -224,13 +222,24 @@ class RunnerContainer(Runner):
"properties": {}
}
- workflowpath = "/var/lib/cwl/workflow.json#main"
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": packed
- }
- if self.tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+ if self.tool.tool.get("id", "").startswith("keep:"):
+ sp = self.tool.tool["id"].split('/')
+ workflowcollection = sp[0][5:]
+ workflowname = "/".join(sp[1:])
+ workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+ container_req["mounts"]["/var/lib/cwl/workflow"] = {
+ "kind": "collection",
+ "portable_data_hash": "%s" % workflowcollection
+ }
+ else:
+ packed = packed_workflow(self.arvrunner, self.tool)
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if self.tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 025225a..b7a6eb0 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -265,7 +265,7 @@ class RunnerJob(Runner):
"""
if self.tool.tool["id"].startswith("keep:"):
- pass
+ self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
else:
packed = packed_workflow(self.arvrunner, self.tool)
wf_pdh = self.upload_workflow_collection(packed)
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 24e89d1..1b52fb5 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
+ 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
@@ -238,7 +238,7 @@ class TestSubmit(unittest.TestCase):
def test_submit(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug",
+ ["--submit", "--no-wait", "--api=jobs", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
@@ -272,14 +272,14 @@ class TestSubmit(unittest.TestCase):
def test_submit_no_reuse(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--disable-reuse",
+ ["--submit", "--no-wait", "--api=jobs", "--debug", "--disable-reuse",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -290,14 +290,14 @@ class TestSubmit(unittest.TestCase):
def test_submit_on_error(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--on-error=stop",
+ ["--submit", "--no-wait", "--api=jobs", "--debug", "--on-error=stop",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -314,9 +314,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -345,9 +345,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -364,9 +364,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["name"] = "hello job 123"
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["name"] = "hello job 123"
+
stubs.api.pipeline_instances().create.assert_called_with(
body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
@@ -384,9 +384,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -456,11 +456,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--disable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -479,11 +479,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -503,12 +503,12 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-name="+output_name, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- stubs.expect_container_spec["output_name"] = output_name
+ expect_container["output_name"] = output_name
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -528,11 +528,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -550,9 +550,9 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["runtime_constraints"]["ram"] = 2048*1024*1024
-
expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["runtime_constraints"]["ram"] = 2048*1024*1024
+
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -628,6 +628,31 @@ class TestSubmit(unittest.TestCase):
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_jobs_keepref(self, stubs, tm, reader):
+ capture_stdout = cStringIO.StringIO()
+
+ with open("tests/wf/expect_arvworkflow.cwl") as f:
+ reader().open().__enter__().read.return_value = f.read()
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=jobs", "--debug",
+ "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["x"] = "XxX"
+ del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["y"]
+ del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["z"]
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["cwl:tool"] = "99999999999999999999999999999994+99/expect_arvworkflow.cwl#main"
+ expect_pipeline["name"] = "expect_arvworkflow.cwl#main"
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=JsonDiffMatcher(expect_pipeline))
+
@mock.patch("time.sleep")
@stubs
def test_submit_arvworkflow(self, stubs, tm):
@@ -733,9 +758,9 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["name"] = "hello container 123"
-
expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["name"] = "hello container 123"
+
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -1055,8 +1080,7 @@ class TestTemplateInputs(unittest.TestCase):
},
'script_parameters': {
'cwl:tool':
- '99999999999999999999999999999991+99/'
- 'wf/inputs_test.cwl',
+ '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
commit 32f230728dcef7a94ab94c8e2eba4b944ff06a9a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 18 10:57:28 2017 -0500
10812: Use packed workflows for all run modes.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 3ffc4c7..66ac000 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_instance
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
@@ -177,7 +177,7 @@ class ArvCwlRunner(object):
for p in proc_states["items"]:
self.on_message({
- "object_uuid": p["uuid"],
+ "object_uuid": p["uuid"],
"event_type": "update",
"properties": {
"new_attributes": p
@@ -334,23 +334,44 @@ class ArvCwlRunner(object):
keep_client=self.keep_client)
self.fs_access = make_fs_access(kwargs["basedir"])
+ if not kwargs.get("name"):
+ kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+ # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+ # Also uploads docker images.
+ upload_workflow_deps(self, tool)
+
+ # Reload tool object which may have been updated by
+ # upload_workflow_deps
+ tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+ makeTool=self.arv_make_tool,
+ loader=tool.doc_loader,
+ avsc_names=tool.doc_schema,
+ metadata=tool.metadata)
+
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ tool, job_order)
+
existing_uuid = kwargs.get("update_workflow")
if existing_uuid or kwargs.get("create_workflow"):
+ # Create a pipeline template or workflow record and exit.
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
kwargs.get("enable_reuse"),
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"))
+ name=kwargs["name"])
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
- else:
+ elif self.work_api == "containers":
return (upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name")), "success")
+ self.project_uuid,
+ uuid=existing_uuid,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"]),
+ "success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
@@ -360,9 +381,6 @@ class ArvCwlRunner(object):
kwargs["tmpdir_prefix"] = "tmp"
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
- if not kwargs["name"]:
- del kwargs["name"]
-
if self.work_api == "containers":
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
@@ -373,26 +391,35 @@ class ArvCwlRunner(object):
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
-
runnerjob = None
if kwargs.get("submit"):
+ # Submit a runner job to run the workflow for us.
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool":
kwargs["runnerjob"] = tool.tool["id"]
+ upload_dependencies(self,
+ kwargs["name"],
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ False)
runnerjob = tool.job(job_order,
self.output_callback,
**kwargs).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"), on_error=kwargs.get("on_error"))
- else:
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"), on_error=kwargs.get("on_error"))
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"], on_error=kwargs.get("on_error"))
+ elif self.work_api == "jobs":
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"], on_error=kwargs.get("on_error"))
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 987b0d6..9b38c38 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -6,14 +6,14 @@ import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles
+from cwltool.pathmapper import adjustFiles, adjustDirObjs
from cwltool.utils import aslist
import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .fsaccess import CollectionFetcher
logger = logging.getLogger('arvados.cwl-runner')
@@ -190,7 +190,9 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = packed_workflow(self.arvrunner, self.tool)
+
+ adjustDirObjs(self.job_order, trim_listing)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,
@@ -222,27 +224,13 @@ class RunnerContainer(Runner):
"properties": {}
}
- workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- if workflowcollection.startswith("keep:"):
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
- workflowname = os.path.basename(self.tool.tool["id"])
- workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
- container_req["mounts"]["/var/lib/cwl/workflow"] = {
- "kind": "collection",
- "portable_data_hash": "%s" % workflowcollection
- }
- elif workflowcollection.startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- wfuuid = workflowcollection[6:workflowcollection.index("#")]
- wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
- wfobj = yaml.safe_load(wfrecord["definition"])
- if container_req["name"].startswith("arvwf:"):
- container_req["name"] = wfrecord["name"]
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": wfobj
- }
- container_req["properties"]["template_uuid"] = wfuuid
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if self.tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 11ef653..025225a 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -9,11 +9,14 @@ from cwltool.errors import WorkflowException
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
+from cwltool.pathmapper import adjustDirObjs
+
+import ruamel.yaml as yaml
import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .pathmapper import InitialWorkDirPathMapper
from .perf import Perf
from . import done
@@ -231,6 +234,28 @@ class ArvadosJob(object):
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+ def upload_workflow_collection(self, packed):
+ collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(yaml.round_trip_dump(packed))
+
+ exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=self.name,
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=self.arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
@@ -239,9 +264,14 @@ class RunnerJob(Runner):
a pipeline template or pipeline instance.
"""
- workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ if self.tool.tool["id"].startswith("keep:"):
+ pass
+ else:
+ packed = packed_workflow(self.arvrunner, self.tool)
+ wf_pdh = self.upload_workflow_collection(packed)
+ self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+ adjustDirObjs(self.job_order, trim_listing)
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 9e70a6e..cd7e41a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import ruamel.yaml as yaml
-from .runner import upload_docker, upload_dependencies, trim_listing
+from .runner import upload_dependencies, trim_listing, packed_workflow
from .arvtool import ArvadosCommandTool
from .perf import Perf
@@ -22,11 +22,8 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
submit_runner_ram=0, name=None):
- upload_docker(arvRunner, tool)
- document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
-
- packed = pack(document_loader, workflowobj, uri, tool.metadata)
+ packed = packed_workflow(arvRunner, tool)
adjustDirObjs(job_order, trim_listing)
@@ -39,8 +36,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
if not name:
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
- upload_dependencies(arvRunner, name, document_loader,
- packed, uri, False)
+ upload_dependencies(arvRunner, name, tool.doc_loader,
+ packed, tool.tool["id"], False)
# TODO nowhere for submit_runner_ram to go.
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 500ea0f..b249da7 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -139,7 +139,9 @@ class CollectionFetcher(DefaultFetcher):
with self.fsaccess.open(url, "r") as f:
return f.read()
if url.startswith("arvwf:"):
- return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
+ record = self.api_client.workflows().get(uuid=url[6:]).execute()
+ definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
+ return definition
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 1c3625e..5306e90 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -4,6 +4,8 @@ from functools import partial
import logging
import json
import re
+import subprocess
+
from cStringIO import StringIO
from schema_salad.sourceline import SourceLine
@@ -16,6 +18,7 @@ from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
from cwltool.builder import substitute
+from cwltool.pack import pack
import arvados.collection
import ruamel.yaml as yaml
@@ -45,7 +48,7 @@ def trim_listing(obj):
del obj["location"]
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run):
+ workflowobj, uri, loadref_run, include_primary=True):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
@@ -92,7 +95,7 @@ def upload_dependencies(arvrunner, name, document_loader,
normalizeFilesDirs(sc)
- if "id" in workflowobj:
+ if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
mapper = ArvPathMapper(arvrunner, sc, "",
@@ -110,6 +113,7 @@ def upload_dependencies(arvrunner, name, document_loader,
def upload_docker(arvrunner, tool):
+ """Visitor which uploads Docker images referenced in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
@@ -118,45 +122,82 @@ def upload_docker(arvrunner, tool):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
-
-def upload_instance(arvrunner, name, tool, job_order):
- upload_docker(arvrunner, tool)
-
- for t in tool.tool["inputs"]:
- def setSecondary(fileobj):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
-
- if isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(e)
-
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(job_order[shortname(t["id"])])
-
- workflowmapper = upload_dependencies(arvrunner,
- name,
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
- True)
- jobmapper = upload_dependencies(arvrunner,
- os.path.basename(job_order.get("id", "#")),
- tool.doc_loader,
- job_order,
- job_order.get("id", "#"),
- False)
-
- if "id" in job_order:
- del job_order["id"]
-
- return workflowmapper
+
+def packed_workflow(arvrunner, tool):
+ """Create a packed workflow.
+
+ A "packed" workflow is one where all the components have been combined into a single document."""
+
+ return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+ tool.tool["id"], tool.metadata)
+
+def tag_git_version(packed):
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ except (OSError, subprocess.CalledProcessError):
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
+
+
+def upload_job_order(arvrunner, name, tool, job_order):
+ """Upload local files referenced in the input object and return updated input
+ object with 'location' updated to the proper keep references.
+ """
+
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
+ jobmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line.
+ if "job_order" in job_order:
+ del job_order["job_order"]
+
+ return job_order
+
+def upload_workflow_deps(arvrunner, tool):
+ # Ensure that Docker images needed by this workflow are available
+ tool.visit(partial(upload_docker, arvrunner))
+
+ document_loader = tool.doc_loader
+
+ def upload_tool_deps(deptool):
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
+
+ tool.visit(upload_tool_deps)
def arvados_jobs_image(arvrunner):
+ """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
+
img = "arvados/jobs:"+__version__
try:
arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
@@ -165,6 +206,9 @@ def arvados_jobs_image(arvrunner):
return img
class Runner(object):
+ """Base class for runner processes, which submit an instance of
+ arvados-cwl-runner and wait for the final result."""
+
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None):
@@ -191,20 +235,9 @@ class Runner(object):
def update_pipeline_component(self, record):
pass
- def arvados_job_spec(self, *args, **kwargs):
- if self.name is None:
- self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
-
- # Need to filter this out, gets added by cwltool when providing
- # parameters on the command line.
- if "job_order" in self.job_order:
- del self.job_order["job_order"]
-
- workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
- adjustDirObjs(self.job_order, trim_listing)
- return workflowmapper
-
def done(self, record):
+ """Base method for handling a completed runner."""
+
try:
if record["state"] == "Complete":
if record.get("exit_code") is not None:
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 0a01fb4..24e89d1 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -17,6 +17,8 @@ import arvados.keep
from .matcher import JsonDiffMatcher
from .mock_discovery import get_rootDesc
+import ruamel.yaml as yaml
+
_rootDesc = None
def stubs(func):
@@ -104,7 +106,7 @@ def stubs(func):
'script_parameters': {
'x': {
'basename': 'blorp.txt',
- 'location': 'keep:99999999999999999999999999999994+99/blorp.txt',
+ 'location': 'keep:99999999999999999999999999999992+99/blorp.txt',
'class': 'File'
},
'y': {
@@ -122,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
@@ -139,12 +141,12 @@ def stubs(func):
'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
'script_parameters': {
'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+ 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999992+99/blorp.txt'}},
'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+ 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -167,6 +169,9 @@ def stubs(func):
stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+ with open("tests/wf/submit_wf_packed.cwl") as f:
+ expect_packed_workflow = yaml.round_trip_load(f)
+
stubs.expect_container_spec = {
'priority': 1,
'mounts': {
@@ -174,9 +179,9 @@ def stubs(func):
'writable': True,
'kind': 'collection'
},
- '/var/lib/cwl/workflow': {
- 'portable_data_hash': '99999999999999999999999999999991+99',
- 'kind': 'collection'
+ '/var/lib/cwl/workflow.json': {
+ 'json': expect_packed_workflow,
+ 'kind': 'json'
},
'stdout': {
'path': '/var/spool/cwl/cwl.output.json',
@@ -186,7 +191,7 @@ def stubs(func):
'kind': 'json',
'content': {
'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
- 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999994+99/blorp.txt'},
+ 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999992+99/blorp.txt'},
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}
@@ -198,7 +203,7 @@ def stubs(func):
'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'output_path': '/var/spool/cwl',
@@ -240,32 +245,24 @@ class TestSubmit(unittest.TestCase):
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'replication_desired': None,
- 'name': 'New collection'
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
- }, ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input',
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
stubs.api.pipeline_instances().create.assert_called_with(
- body=expect_pipeline)
+ body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
@@ -426,27 +423,19 @@ class TestSubmit(unittest.TestCase):
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'name': 'New collection',
- 'replication_desired': None,
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
- }, ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input',
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -469,7 +458,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--disable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -492,7 +481,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=stop',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -516,7 +505,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-name="+output_name, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_spec["output_name"] = output_name
expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -541,7 +530,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -670,6 +659,20 @@ class TestSubmit(unittest.TestCase):
'cwlVersion': 'v1.0',
'$graph': [
{
+ 'id': '#main',
+ 'inputs': [
+ {'type': 'string', 'id': '#main/x'}
+ ],
+ 'steps': [
+ {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+ 'run': '#submit_tool.cwl',
+ 'id': '#main/step1',
+ 'out': []}
+ ],
+ 'class': 'Workflow',
+ 'outputs': []
+ },
+ {
'inputs': [
{
'inputBinding': {'position': 1},
@@ -683,19 +686,6 @@ class TestSubmit(unittest.TestCase):
'outputs': [],
'baseCommand': 'cat',
'class': 'CommandLineTool'
- }, {
- 'id': '#main',
- 'inputs': [
- {'type': 'string', 'id': '#main/x'}
- ],
- 'steps': [
- {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
- 'run': '#submit_tool.cwl',
- 'id': '#main/step1',
- 'out': []}
- ],
- 'class': 'Workflow',
- 'outputs': []
}
]
}
@@ -747,7 +737,7 @@ class TestSubmit(unittest.TestCase):
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
- body=expect_container)
+ body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
@@ -803,7 +793,7 @@ class TestCreateTemplate(unittest.TestCase):
'dataclass': 'File',
'required': True,
'type': 'File',
- 'value': '99999999999999999999999999999994+99/blorp.txt',
+ 'value': '99999999999999999999999999999992+99/blorp.txt',
}
expect_component['script_parameters']['y'] = {
'dataclass': 'Collection',
@@ -1126,7 +1116,7 @@ class TestTemplateInputs(unittest.TestCase):
expect_template = copy.deepcopy(self.expect_template)
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
+ params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
params["floatInput"]["value"] = 1.234
params["boolInput"]["value"] = True
diff --git a/sdk/cwl/tests/wf/expect_arvworkflow.cwl b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
index 56ce0d5..4a3a037 100644
--- a/sdk/cwl/tests/wf/expect_arvworkflow.cwl
+++ b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
@@ -1,14 +1,5 @@
+cwlVersion: v1.0
$graph:
-- baseCommand: cat
- class: CommandLineTool
- id: '#submit_tool.cwl'
- inputs:
- - id: '#submit_tool.cwl/x'
- inputBinding: {position: 1}
- type: string
- outputs: []
- requirements:
- - {class: DockerRequirement, dockerPull: 'debian:8'}
- class: Workflow
id: '#main'
inputs:
@@ -21,4 +12,13 @@ $graph:
- {id: '#main/step1/x', source: '#main/x'}
out: []
run: '#submit_tool.cwl'
-cwlVersion: v1.0
+- baseCommand: cat
+ class: CommandLineTool
+ id: '#submit_tool.cwl'
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ inputBinding: {position: 1}
+ type: string
+ outputs: []
+ requirements:
+ - {class: DockerRequirement, dockerPull: 'debian:8'}
diff --git a/sdk/cwl/tests/wf/expect_packed.cwl b/sdk/cwl/tests/wf/expect_packed.cwl
index f4d60db..d7b9d61 100644
--- a/sdk/cwl/tests/wf/expect_packed.cwl
+++ b/sdk/cwl/tests/wf/expect_packed.cwl
@@ -9,7 +9,7 @@ $graph:
type: File
default:
class: File
- location: keep:99999999999999999999999999999991+99/tool/blub.txt
+ location: keep:99999999999999999999999999999991+99/blub.txt
inputBinding:
position: 1
outputs: []
@@ -19,7 +19,7 @@ $graph:
inputs:
- id: '#main/x'
type: File
- default: {class: File, location: 'keep:99999999999999999999999999999991+99/input/blorp.txt',
+ default: {class: File, location: 'keep:99999999999999999999999999999992+99/blorp.txt',
basename: blorp.txt}
- id: '#main/y'
type: Directory
diff --git a/sdk/cwl/tests/wf/expect_packed.cwl b/sdk/cwl/tests/wf/submit_wf_packed.cwl
similarity index 53%
copy from sdk/cwl/tests/wf/expect_packed.cwl
copy to sdk/cwl/tests/wf/submit_wf_packed.cwl
index f4d60db..1f7fee0 100644
--- a/sdk/cwl/tests/wf/expect_packed.cwl
+++ b/sdk/cwl/tests/wf/submit_wf_packed.cwl
@@ -9,7 +9,7 @@ $graph:
type: File
default:
class: File
- location: keep:99999999999999999999999999999991+99/tool/blub.txt
+ location: keep:99999999999999999999999999999991+99/blub.txt
inputBinding:
position: 1
outputs: []
@@ -19,16 +19,10 @@ $graph:
inputs:
- id: '#main/x'
type: File
- default: {class: File, location: 'keep:99999999999999999999999999999991+99/input/blorp.txt',
- basename: blorp.txt}
- id: '#main/y'
type: Directory
- default: {class: Directory, location: 'keep:99999999999999999999999999999998+99',
- basename: 99999999999999999999999999999998+99}
- id: '#main/z'
type: Directory
- default: {class: Directory, basename: anonymous, listing: [{basename: renamed.txt,
- class: File, location: 'keep:99999999999999999999999999999998+99/file1.txt'}]}
outputs: []
steps:
- id: '#main/step1'
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list