[ARVADOS] created: c3e3d4c2a9dbfae4f4025e5b55f272c956180519
Git user
git at public.curoverse.com
Wed Jan 18 10:59:14 EST 2017
at c3e3d4c2a9dbfae4f4025e5b55f272c956180519 (commit)
commit c3e3d4c2a9dbfae4f4025e5b55f272c956180519
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jan 17 16:54:13 2017 -0500
10812: Test fixes
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 1b52fb5..8b3db8f 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl'
+ 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main'
},
'repository': 'arvados',
'script_version': 'master',
@@ -146,7 +146,7 @@ def stubs(func):
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
+ 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -180,7 +180,7 @@ def stubs(func):
'kind': 'collection'
},
'/var/lib/cwl/workflow.json': {
- 'json': expect_packed_workflow,
+ 'content': expect_packed_workflow,
'kind': 'json'
},
'stdout': {
@@ -680,7 +680,7 @@ class TestSubmit(unittest.TestCase):
},
'/var/lib/cwl/workflow.json': {
'kind': 'json',
- 'json': {
+ 'content': {
'cwlVersion': 'v1.0',
'$graph': [
{
@@ -1080,7 +1080,7 @@ class TestTemplateInputs(unittest.TestCase):
},
'script_parameters': {
'cwl:tool':
- '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl',
+ '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl#main',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
commit a5913ae533e164ee36dfd801f62d4c8879e40ef4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 16 10:51:10 2017 -0500
10812: Check for dockerOutputDirectory and raise UnsupportedRequirement up front.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index a5fa4e0..a0c9d57 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -211,6 +211,11 @@ class ArvCwlRunner(object):
raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
if obj.get("stderr"):
raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
+ if obj.get("class") == "DockerRequirement":
+ if obj.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
commit 41ca3403d368bb2817144ce6f0f08ccd3fbaaa88
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 16 10:22:30 2017 -0500
10812: Fix imports, reference #main in cwl:tool
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index f68d99e..bf1052c 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -4,13 +4,15 @@ import copy
import json
import time
-from cwltool.process import get_feature, shortname
+from cwltool.process import get_feature, shortname, UnsupportedRequirement
from cwltool.errors import WorkflowException
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
from cwltool.pathmapper import adjustDirObjs
+from schema_salad.sourceline import SourceLine
+
import ruamel.yaml as yaml
import arvados.collection
@@ -271,7 +273,7 @@ class RunnerJob(Runner):
else:
packed = packed_workflow(self.arvrunner, self.tool)
wf_pdh = self.upload_workflow_collection(packed)
- self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
+ self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
adjustDirObjs(self.job_order, trim_listing)
commit 04cfc72469ff59ddcc6a860708f6f6d735e12ecc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jan 12 17:20:05 2017 -0500
10812: Bugfixes
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 66ac000..a5fa4e0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 51e5956..6015c1d 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -236,7 +236,7 @@ class RunnerContainer(Runner):
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "json": packed
+ "content": packed
}
if self.tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
commit 5e565c5ee9d62050ef0064c2a67aeba92dbdb1e8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jan 12 17:05:23 2017 -0500
10812: Perform merge of logs based on timestamp instead of sorting log lines.
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index 87908c2..15068b8 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -48,21 +48,39 @@ crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[
timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
def logtail(logcollection, logger, header, maxlen=25):
- logtail = deque([], maxlen*len(logcollection))
containersapi = ("crunch-run.txt" in logcollection)
+ mergelogs = {}
for log in logcollection.keys():
if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"):
logname = log[:-4]
+ logt = deque([], maxlen)
+ mergelogs[logname] = logt
with logcollection.open(log) as f:
for l in f:
if containersapi:
g = timestamp_re.match(l)
- logtail.append("%s %s %s" % (g.group(1), logname, g.group(2)))
+ logt.append((g.group(1), g.group(2)))
elif not crunchstat_re.match(l):
- logtail.append(l)
- if len(logcollection) > 1:
- logtail = sorted(logtail)[-maxlen:]
+ logt.append(l)
+
+ if len(mergelogs) > 1:
+ keys = mergelogs.keys()
+ logtail = []
+ while True:
+ earliest = None
+ for k in keys:
+ if mergelogs[k]:
+ if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
+ earliest = k
+ if earliest is None:
+ break
+ ts, msg = mergelogs[earliest].popleft()
+ logtail.append("%s %s %s" % (ts, earliest, msg))
+ logtail = logtail[-maxlen:]
+ else:
+ logtail = mergelogs.values()[0]
+
logtxt = "\n ".join(l.strip() for l in logtail)
logger.info(header)
logger.info("\n %s", logtxt)
commit d7ebd1a4ee9723836ac9ee4d2706958b7d012ed6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 16:33:11 2017 -0500
10812: Improve check that already packed workflow collection exists with same
PDH and similar name. Don't crash when registering pipeline template that has
array inputs.
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index b7a6eb0..f68d99e 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -241,9 +241,11 @@ class RunnerJob(Runner):
with collection.open("workflow.cwl", "w") as f:
f.write(yaml.round_trip_dump(packed))
- exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]
+ if self.arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
+ exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
@@ -385,10 +387,12 @@ class RunnerTemplate(object):
if not isinstance(types, list):
types = [types]
param['required'] = 'null' not in types
- non_null_types = set(types) - set(['null'])
+ non_null_types = [t for t in types if t != "null"]
if len(non_null_types) == 1:
the_type = [c for c in non_null_types][0]
- dataclass = self.type_to_dataclass.get(the_type)
+ dataclass = None
+ if isinstance(the_type, basestring):
+ dataclass = self.type_to_dataclass.get(the_type)
if dataclass:
param['dataclass'] = dataclass
# Note: If we didn't figure out a single appropriate
commit a1f5efeeaaaf11960909354cf16c8cf3a60a80ff
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 15:51:54 2017 -0500
10812: Don't try to upload embedded tools separately unless they have an
identifier.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 5306e90..5d2a2d9 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -184,14 +184,15 @@ def upload_workflow_deps(arvrunner, tool):
document_loader = tool.doc_loader
def upload_tool_deps(deptool):
- upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- include_primary=False)
- document_loader.idx[deptool["id"]] = deptool
+ if "id" in deptool:
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
tool.visit(upload_tool_deps)
commit 4cf19ce7508533b77632733a6ae22e5bf5d01f93
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 08:41:08 2017 -0500
10812: Handle workflow keep references.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 9b38c38..51e5956 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -190,8 +190,6 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- packed = packed_workflow(self.arvrunner, self.tool)
-
adjustDirObjs(self.job_order, trim_listing)
container_req = {
@@ -224,13 +222,24 @@ class RunnerContainer(Runner):
"properties": {}
}
- workflowpath = "/var/lib/cwl/workflow.json#main"
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": packed
- }
- if self.tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+ if self.tool.tool.get("id", "").startswith("keep:"):
+ sp = self.tool.tool["id"].split('/')
+ workflowcollection = sp[0][5:]
+ workflowname = "/".join(sp[1:])
+ workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+ container_req["mounts"]["/var/lib/cwl/workflow"] = {
+ "kind": "collection",
+ "portable_data_hash": "%s" % workflowcollection
+ }
+ else:
+ packed = packed_workflow(self.arvrunner, self.tool)
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if self.tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 025225a..b7a6eb0 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -265,7 +265,7 @@ class RunnerJob(Runner):
"""
if self.tool.tool["id"].startswith("keep:"):
- pass
+ self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
else:
packed = packed_workflow(self.arvrunner, self.tool)
wf_pdh = self.upload_workflow_collection(packed)
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 24e89d1..1b52fb5 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
+ 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
@@ -238,7 +238,7 @@ class TestSubmit(unittest.TestCase):
def test_submit(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug",
+ ["--submit", "--no-wait", "--api=jobs", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
@@ -272,14 +272,14 @@ class TestSubmit(unittest.TestCase):
def test_submit_no_reuse(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--disable-reuse",
+ ["--submit", "--no-wait", "--api=jobs", "--debug", "--disable-reuse",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -290,14 +290,14 @@ class TestSubmit(unittest.TestCase):
def test_submit_on_error(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--on-error=stop",
+ ["--submit", "--no-wait", "--api=jobs", "--debug", "--on-error=stop",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -314,9 +314,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -345,9 +345,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -364,9 +364,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["name"] = "hello job 123"
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["name"] = "hello job 123"
+
stubs.api.pipeline_instances().create.assert_called_with(
body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
@@ -384,9 +384,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -456,11 +456,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--disable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -479,11 +479,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -503,12 +503,12 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-name="+output_name, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- stubs.expect_container_spec["output_name"] = output_name
+ expect_container["output_name"] = output_name
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -528,11 +528,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -550,9 +550,9 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["runtime_constraints"]["ram"] = 2048*1024*1024
-
expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["runtime_constraints"]["ram"] = 2048*1024*1024
+
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -628,6 +628,31 @@ class TestSubmit(unittest.TestCase):
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_jobs_keepref(self, stubs, tm, reader):
+ capture_stdout = cStringIO.StringIO()
+
+ with open("tests/wf/expect_arvworkflow.cwl") as f:
+ reader().open().__enter__().read.return_value = f.read()
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=jobs", "--debug",
+ "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["x"] = "XxX"
+ del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["y"]
+ del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["z"]
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["cwl:tool"] = "99999999999999999999999999999994+99/expect_arvworkflow.cwl#main"
+ expect_pipeline["name"] = "expect_arvworkflow.cwl#main"
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=JsonDiffMatcher(expect_pipeline))
+
@mock.patch("time.sleep")
@stubs
def test_submit_arvworkflow(self, stubs, tm):
@@ -733,9 +758,9 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["name"] = "hello container 123"
-
expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["name"] = "hello container 123"
+
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -1055,8 +1080,7 @@ class TestTemplateInputs(unittest.TestCase):
},
'script_parameters': {
'cwl:tool':
- '99999999999999999999999999999991+99/'
- 'wf/inputs_test.cwl',
+ '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
commit af8b9895d81c93a9340ef9d8f461034e5efbc791
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jan 10 16:44:58 2017 -0500
10812: More WIP
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index b32cb6d..66ac000 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
@@ -177,7 +177,7 @@ class ArvCwlRunner(object):
for p in proc_states["items"]:
self.on_message({
- "object_uuid": p["uuid"],
+ "object_uuid": p["uuid"],
"event_type": "update",
"properties": {
"new_attributes": p
@@ -335,29 +335,43 @@ class ArvCwlRunner(object):
self.fs_access = make_fs_access(kwargs["basedir"])
if not kwargs.get("name"):
- kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(self.tool.tool["id"])
+ kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
- tool.visit(partial(upload_docker, self))
- upload_job_order(self, "%s input" % kwargs.get("name", shortname(tool.tool["id"])),
- tool, job_order)
+ # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+ # Also uploads docker images.
+ upload_workflow_deps(self, tool)
+
+ # Reload tool object which may have been updated by
+ # upload_workflow_deps
+ tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+ makeTool=self.arv_make_tool,
+ loader=tool.doc_loader,
+ avsc_names=tool.doc_schema,
+ metadata=tool.metadata)
+
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ tool, job_order)
existing_uuid = kwargs.get("update_workflow")
if existing_uuid or kwargs.get("create_workflow"):
+ # Create a pipeline template or workflow record and exit.
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
kwargs.get("enable_reuse"),
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"))
+ name=kwargs["name"])
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
- else:
+ elif self.work_api == "containers":
return (upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name")), "success")
+ self.project_uuid,
+ uuid=existing_uuid,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"]),
+ "success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
@@ -379,6 +393,7 @@ class ArvCwlRunner(object):
runnerjob = None
if kwargs.get("submit"):
+ # Submit a runner job to run the workflow for us.
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool":
kwargs["runnerjob"] = tool.tool["id"]
@@ -392,16 +407,19 @@ class ArvCwlRunner(object):
self.output_callback,
**kwargs).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
name=kwargs["name"], on_error=kwargs.get("on_error"))
- else:
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+ elif self.work_api == "jobs":
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
name=kwargs["name"], on_error=kwargs.get("on_error"))
-
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index ca221d0..9b38c38 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -6,14 +6,14 @@ import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles
+from cwltool.pathmapper import adjustFiles, adjustDirObjs
from cwltool.utils import aslist
import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .fsaccess import CollectionFetcher
logger = logging.getLogger('arvados.cwl-runner')
@@ -191,6 +191,7 @@ class RunnerContainer(Runner):
"""
packed = packed_workflow(self.arvrunner, self.tool)
+
adjustDirObjs(self.job_order, trim_listing)
container_req = {
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 9bb15d8..025225a 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -9,13 +9,14 @@ from cwltool.errors import WorkflowException
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
+from cwltool.pathmapper import adjustDirObjs
import ruamel.yaml as yaml
import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .pathmapper import InitialWorkDirPathMapper
from .perf import Perf
from . import done
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index f602ea4..5306e90 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -128,45 +128,24 @@ def packed_workflow(arvrunner, tool):
A "packed" workflow is one where all the components have been combined into a single document."""
- document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
- packed = pack(document_loader, workflowobj, uri, tool.metadata)
-
- # if tool.tool["id"].startswith("file://"):
- # path = os.path.dirname(tool.tool["id"][7:])
- # try:
- # githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
- # except (OSError, subprocess.CalledProcessError):
- # pass
- # else:
- # packed["http://schema.org/version"] = githash
-
- mapper = {}
- def upload_tool_deps(deptool):
- workflowmapper = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- include_primary=False)
- for k,v in workflowmapper.items():
- mapper[k] = v.resolved
-
- tool.visit(upload_tool_deps)
-
- def setloc(p):
- if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
- p["location"] = mapper[p["location"]]
- adjustFileObjs(packed, setloc)
- adjustDirObjs(packed, setloc)
+ return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+ tool.tool["id"], tool.metadata)
- #print yaml.round_trip_dump(packed)
+def tag_git_version(packed):
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ except (OSError, subprocess.CalledProcessError):
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
- return packed
def upload_job_order(arvrunner, name, tool, job_order):
- """Upload local files referenced in the input object and update 'location' to
- the proper keep references."""
+ """Upload local files referenced in the input object and return updated input
+ object with 'location' updated to the proper keep references.
+ """
for t in tool.tool["inputs"]:
def setSecondary(fileobj):
@@ -193,11 +172,29 @@ def upload_job_order(arvrunner, name, tool, job_order):
# Need to filter this out, gets added by cwltool when providing
# parameters on the command line.
- if "job_order" in self.job_order:
- del self.job_order["job_order"]
+ if "job_order" in job_order:
+ del job_order["job_order"]
return job_order
+def upload_workflow_deps(arvrunner, tool):
+ # Ensure that Docker images needed by this workflow are available
+ tool.visit(partial(upload_docker, arvrunner))
+
+ document_loader = tool.doc_loader
+
+ def upload_tool_deps(deptool):
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
+
+ tool.visit(upload_tool_deps)
+
def arvados_jobs_image(arvrunner):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 52248dc..24e89d1 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -256,7 +256,7 @@ class TestSubmit(unittest.TestCase):
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
+ 'name': 'submit_wf.cwl input',
}), ensure_unique_name=True),
mock.call().execute()])
@@ -434,7 +434,7 @@ class TestSubmit(unittest.TestCase):
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
+ 'name': 'submit_wf.cwl input',
}), ensure_unique_name=True),
mock.call().execute()])
commit 9b042b8229e2b12d602a98bf8ff40eff84336131
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jan 10 09:06:07 2017 -0500
10812: Dependency uploading refactor WIP.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 3ffc4c7..b32cb6d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_instance
+from. runner import Runner, upload_docker, upload_job_order
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
@@ -334,6 +334,13 @@ class ArvCwlRunner(object):
keep_client=self.keep_client)
self.fs_access = make_fs_access(kwargs["basedir"])
+ if not kwargs.get("name"):
+ kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(self.tool.tool["id"])
+
+ tool.visit(partial(upload_docker, self))
+ upload_job_order(self, "%s input" % kwargs.get("name", shortname(tool.tool["id"])),
+ tool, job_order)
+
existing_uuid = kwargs.get("update_workflow")
if existing_uuid or kwargs.get("create_workflow"):
if self.work_api == "jobs":
@@ -360,9 +367,6 @@ class ArvCwlRunner(object):
kwargs["tmpdir_prefix"] = "tmp"
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
- if not kwargs["name"]:
- del kwargs["name"]
-
if self.work_api == "containers":
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
@@ -373,24 +377,29 @@ class ArvCwlRunner(object):
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
-
runnerjob = None
if kwargs.get("submit"):
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool":
kwargs["runnerjob"] = tool.tool["id"]
+ upload_dependencies(self,
+ kwargs["name"],
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ False)
runnerjob = tool.job(job_order,
self.output_callback,
**kwargs).next()
else:
runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"), on_error=kwargs.get("on_error"))
+ name=kwargs["name"], on_error=kwargs.get("on_error"))
else:
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"), on_error=kwargs.get("on_error"))
+ name=kwargs["name"], on_error=kwargs.get("on_error"))
+
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
# Create pipeline for local run
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 72eed37..ca221d0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -190,7 +190,8 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = packed_workflow(self.arvrunner, self.tool)
+ adjustDirObjs(self.job_order, trim_listing)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,
@@ -227,8 +228,8 @@ class RunnerContainer(Runner):
"kind": "json",
"json": packed
}
- if packed.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = packed["id"][6:]
+ if self.tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 2fccb57..9bb15d8 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -263,10 +263,14 @@ class RunnerJob(Runner):
a pipeline template or pipeline instance.
"""
- packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
- wf_pdh = self.upload_workflow_collection(packed)
+ if self.tool.tool["id"].startswith("keep:"):
+ pass
+ else:
+ packed = packed_workflow(self.arvrunner, self.tool)
+ wf_pdh = self.upload_workflow_collection(packed)
+ self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
- self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
+ adjustDirObjs(self.job_order, trim_listing)
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 500ea0f..b249da7 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -139,7 +139,9 @@ class CollectionFetcher(DefaultFetcher):
with self.fsaccess.open(url, "r") as f:
return f.read()
if url.startswith("arvwf:"):
- return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
+ record = self.api_client.workflows().get(uuid=url[6:]).execute()
+ definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
+ return definition
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 00e5132..f602ea4 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -48,7 +48,7 @@ def trim_listing(obj):
del obj["location"]
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run):
+ workflowobj, uri, loadref_run, include_primary=True):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
@@ -95,7 +95,7 @@ def upload_dependencies(arvrunner, name, document_loader,
normalizeFilesDirs(sc)
- if "id" in workflowobj:
+ if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
mapper = ArvPathMapper(arvrunner, sc, "",
@@ -113,6 +113,7 @@ def upload_dependencies(arvrunner, name, document_loader,
def upload_docker(arvrunner, tool):
+ """Visitor which uploads Docker images referenced in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
@@ -123,7 +124,9 @@ def upload_docker(arvrunner, tool):
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
def packed_workflow(arvrunner, tool):
- tool.visit(partial(upload_docker, arvrunner))
+ """Create a packed workflow.
+
+ A "packed" workflow is one where all the components have been combined into a single document."""
document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
packed = pack(document_loader, workflowobj, uri, tool.metadata)
@@ -140,11 +143,12 @@ def packed_workflow(arvrunner, tool):
mapper = {}
def upload_tool_deps(deptool):
workflowmapper = upload_dependencies(arvrunner,
- shortname(deptool["id"]),
+ "%s dependencies" % (shortname(deptool["id"])),
document_loader,
deptool,
deptool["id"],
- False)
+ False,
+ include_primary=False)
for k,v in workflowmapper.items():
mapper[k] = v.resolved
@@ -160,8 +164,9 @@ def packed_workflow(arvrunner, tool):
return packed
-def upload_instance(arvrunner, name, tool, job_order):
- packed = packed_workflow(arvrunner, tool)
+def upload_job_order(arvrunner, name, tool, job_order):
+ """Upload local files referenced in the input object and update 'location' to
+ the proper keep references."""
for t in tool.tool["inputs"]:
def setSecondary(fileobj):
@@ -177,7 +182,7 @@ def upload_instance(arvrunner, name, tool, job_order):
setSecondary(job_order[shortname(t["id"])])
jobmapper = upload_dependencies(arvrunner,
- os.path.basename(job_order.get("id", "#")),
+ name,
tool.doc_loader,
job_order,
job_order.get("id", "#"),
@@ -186,9 +191,16 @@ def upload_instance(arvrunner, name, tool, job_order):
if "id" in job_order:
del job_order["id"]
- return packed
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line.
+ if "job_order" in self.job_order:
+ del self.job_order["job_order"]
+
+ return job_order
def arvados_jobs_image(arvrunner):
+ """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
+
img = "arvados/jobs:"+__version__
try:
arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
@@ -197,6 +209,9 @@ def arvados_jobs_image(arvrunner):
return img
class Runner(object):
+ """Base class for runner processes, which submit an instance of
+ arvados-cwl-runner and wait for the final result."""
+
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None):
@@ -223,20 +238,9 @@ class Runner(object):
def update_pipeline_component(self, record):
pass
- def upload_instance(self, **kwargs):
- if self.name is None:
- self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
-
- # Need to filter this out, gets added by cwltool when providing
- # parameters on the command line.
- if "job_order" in self.job_order:
- del self.job_order["job_order"]
-
- packed = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
- adjustDirObjs(self.job_order, trim_listing)
- return packed
-
def done(self, record):
+ """Base method for handling a completed runner."""
+
try:
if record["state"] == "Complete":
if record.get("exit_code") is not None:
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 7b2a0ec..52248dc 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -106,7 +106,7 @@ def stubs(func):
'script_parameters': {
'x': {
'basename': 'blorp.txt',
- 'location': 'keep:99999999999999999999999999999994+99/blorp.txt',
+ 'location': 'keep:99999999999999999999999999999992+99/blorp.txt',
'class': 'File'
},
'y': {
@@ -141,12 +141,12 @@ def stubs(func):
'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
'script_parameters': {
'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+ 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999992+99/blorp.txt'}},
'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl',
+ 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -191,7 +191,7 @@ def stubs(func):
'kind': 'json',
'content': {
'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
- 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999994+99/blorp.txt'},
+ 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999992+99/blorp.txt'},
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}
@@ -245,32 +245,24 @@ class TestSubmit(unittest.TestCase):
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'replication_desired': None,
- 'name': 'New collection'
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
'name': '#',
- }, ensure_unique_name=True),
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
stubs.api.pipeline_instances().create.assert_called_with(
- body=expect_pipeline)
+ body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
@@ -431,27 +423,19 @@ class TestSubmit(unittest.TestCase):
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'name': 'New collection',
- 'replication_desired': None,
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
'name': '#',
- }, ensure_unique_name=True),
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -474,7 +458,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--disable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -497,7 +481,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=stop',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -521,7 +505,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-name="+output_name, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_spec["output_name"] = output_name
expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -546,7 +530,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -675,6 +659,20 @@ class TestSubmit(unittest.TestCase):
'cwlVersion': 'v1.0',
'$graph': [
{
+ 'id': '#main',
+ 'inputs': [
+ {'type': 'string', 'id': '#main/x'}
+ ],
+ 'steps': [
+ {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+ 'run': '#submit_tool.cwl',
+ 'id': '#main/step1',
+ 'out': []}
+ ],
+ 'class': 'Workflow',
+ 'outputs': []
+ },
+ {
'inputs': [
{
'inputBinding': {'position': 1},
@@ -688,19 +686,6 @@ class TestSubmit(unittest.TestCase):
'outputs': [],
'baseCommand': 'cat',
'class': 'CommandLineTool'
- }, {
- 'id': '#main',
- 'inputs': [
- {'type': 'string', 'id': '#main/x'}
- ],
- 'steps': [
- {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
- 'run': '#submit_tool.cwl',
- 'id': '#main/step1',
- 'out': []}
- ],
- 'class': 'Workflow',
- 'outputs': []
}
]
}
@@ -808,7 +793,7 @@ class TestCreateTemplate(unittest.TestCase):
'dataclass': 'File',
'required': True,
'type': 'File',
- 'value': '99999999999999999999999999999994+99/blorp.txt',
+ 'value': '99999999999999999999999999999992+99/blorp.txt',
}
expect_component['script_parameters']['y'] = {
'dataclass': 'Collection',
@@ -1131,7 +1116,7 @@ class TestTemplateInputs(unittest.TestCase):
expect_template = copy.deepcopy(self.expect_template)
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
+ params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
params["floatInput"]["value"] = 1.234
params["boolInput"]["value"] = True
diff --git a/sdk/cwl/tests/wf/expect_arvworkflow.cwl b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
index 56ce0d5..4a3a037 100644
--- a/sdk/cwl/tests/wf/expect_arvworkflow.cwl
+++ b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
@@ -1,14 +1,5 @@
+cwlVersion: v1.0
$graph:
-- baseCommand: cat
- class: CommandLineTool
- id: '#submit_tool.cwl'
- inputs:
- - id: '#submit_tool.cwl/x'
- inputBinding: {position: 1}
- type: string
- outputs: []
- requirements:
- - {class: DockerRequirement, dockerPull: 'debian:8'}
- class: Workflow
id: '#main'
inputs:
@@ -21,4 +12,13 @@ $graph:
- {id: '#main/step1/x', source: '#main/x'}
out: []
run: '#submit_tool.cwl'
-cwlVersion: v1.0
+- baseCommand: cat
+ class: CommandLineTool
+ id: '#submit_tool.cwl'
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ inputBinding: {position: 1}
+ type: string
+ outputs: []
+ requirements:
+ - {class: DockerRequirement, dockerPull: 'debian:8'}
diff --git a/sdk/cwl/tests/wf/expect_packed.cwl b/sdk/cwl/tests/wf/expect_packed.cwl
index f4d60db..d7b9d61 100644
--- a/sdk/cwl/tests/wf/expect_packed.cwl
+++ b/sdk/cwl/tests/wf/expect_packed.cwl
@@ -9,7 +9,7 @@ $graph:
type: File
default:
class: File
- location: keep:99999999999999999999999999999991+99/tool/blub.txt
+ location: keep:99999999999999999999999999999991+99/blub.txt
inputBinding:
position: 1
outputs: []
@@ -19,7 +19,7 @@ $graph:
inputs:
- id: '#main/x'
type: File
- default: {class: File, location: 'keep:99999999999999999999999999999991+99/input/blorp.txt',
+ default: {class: File, location: 'keep:99999999999999999999999999999992+99/blorp.txt',
basename: blorp.txt}
- id: '#main/y'
type: Directory
diff --git a/sdk/cwl/tests/wf/submit_wf_packed.cwl b/sdk/cwl/tests/wf/submit_wf_packed.cwl
index a1ada94..1f7fee0 100644
--- a/sdk/cwl/tests/wf/submit_wf_packed.cwl
+++ b/sdk/cwl/tests/wf/submit_wf_packed.cwl
@@ -9,7 +9,7 @@ $graph:
type: File
default:
class: File
- location: keep:99999999999999999999999999999992+99/blub.txt
+ location: keep:99999999999999999999999999999991+99/blub.txt
inputBinding:
position: 1
outputs: []
commit 732b2fe1a673787cb7930f287080bbf37055da36
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 6 16:40:51 2017 -0500
10812: Fixing up tests.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index ff72e14..72eed37 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -227,7 +227,7 @@ class RunnerContainer(Runner):
"kind": "json",
"json": packed
}
- if packed["id"].startswith("arvwf:"):
+ if packed.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = packed["id"][6:]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index eee4ef6..2fccb57 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -240,7 +240,7 @@ class RunnerJob(Runner):
with collection.open("workflow.cwl", "w") as f:
f.write(yaml.round_trip_dump(packed))
- exists = api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 01853e6..cd7e41a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -36,8 +36,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
if not name:
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
- upload_dependencies(arvRunner, name, document_loader,
- packed, uri, False)
+ upload_dependencies(arvRunner, name, tool.doc_loader,
+ packed, tool.tool["id"], False)
# TODO nowhere for submit_runner_ram to go.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d8630d4..00e5132 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -18,6 +18,7 @@ from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
from cwltool.builder import substitute
+from cwltool.pack import pack
import arvados.collection
import ruamel.yaml as yaml
@@ -127,22 +128,22 @@ def packed_workflow(arvrunner, tool):
document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
packed = pack(document_loader, workflowobj, uri, tool.metadata)
- if tool.tool["id"].startswith("file://"):
- path = os.path.dirname(tool.tool["id"][7:])
- try:
- githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H', 'path']).strip()
- except OSError, subprocess.CalledProcessError:
- pass
- else:
- packed["http://schema.org/version"] = githash
+ # if tool.tool["id"].startswith("file://"):
+ # path = os.path.dirname(tool.tool["id"][7:])
+ # try:
+ # githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ # except (OSError, subprocess.CalledProcessError):
+ # pass
+ # else:
+ # packed["http://schema.org/version"] = githash
mapper = {}
- def upload_tool_deps(tool):
+ def upload_tool_deps(deptool):
workflowmapper = upload_dependencies(arvrunner,
- shortname(tool.tool["id"]),
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
+ shortname(deptool["id"]),
+ document_loader,
+ deptool,
+ deptool["id"],
False)
for k,v in workflowmapper.items():
mapper[k] = v.resolved
@@ -155,6 +156,8 @@ def packed_workflow(arvrunner, tool):
adjustFileObjs(packed, setloc)
adjustDirObjs(packed, setloc)
+ #print yaml.round_trip_dump(packed)
+
return packed
def upload_instance(arvrunner, name, tool, job_order):
@@ -231,7 +234,7 @@ class Runner(object):
packed = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
adjustDirObjs(self.job_order, trim_listing)
- return workflowmapper
+ return packed
def done(self, record):
try:
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 0a01fb4..7b2a0ec 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -17,6 +17,8 @@ import arvados.keep
from .matcher import JsonDiffMatcher
from .mock_discovery import get_rootDesc
+import ruamel.yaml as yaml
+
_rootDesc = None
def stubs(func):
@@ -122,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
@@ -144,7 +146,7 @@ def stubs(func):
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+ 'cwl:tool': 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -167,6 +169,9 @@ def stubs(func):
stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+ with open("tests/wf/submit_wf_packed.cwl") as f:
+ expect_packed_workflow = yaml.round_trip_load(f)
+
stubs.expect_container_spec = {
'priority': 1,
'mounts': {
@@ -174,9 +179,9 @@ def stubs(func):
'writable': True,
'kind': 'collection'
},
- '/var/lib/cwl/workflow': {
- 'portable_data_hash': '99999999999999999999999999999991+99',
- 'kind': 'collection'
+ '/var/lib/cwl/workflow.json': {
+ 'json': expect_packed_workflow,
+ 'kind': 'json'
},
'stdout': {
'path': '/var/spool/cwl/cwl.output.json',
@@ -198,7 +203,7 @@ def stubs(func):
'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'output_path': '/var/spool/cwl',
@@ -747,7 +752,7 @@ class TestSubmit(unittest.TestCase):
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
- body=expect_container)
+ body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
diff --git a/sdk/cwl/tests/wf/submit_wf_packed.cwl b/sdk/cwl/tests/wf/submit_wf_packed.cwl
new file mode 100644
index 0000000..a1ada94
--- /dev/null
+++ b/sdk/cwl/tests/wf/submit_wf_packed.cwl
@@ -0,0 +1,33 @@
+cwlVersion: v1.0
+$graph:
+- class: CommandLineTool
+ requirements:
+ - class: DockerRequirement
+ dockerPull: debian:8
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ type: File
+ default:
+ class: File
+ location: keep:99999999999999999999999999999992+99/blub.txt
+ inputBinding:
+ position: 1
+ outputs: []
+ baseCommand: cat
+ id: '#submit_tool.cwl'
+- class: Workflow
+ inputs:
+ - id: '#main/x'
+ type: File
+ - id: '#main/y'
+ type: Directory
+ - id: '#main/z'
+ type: Directory
+ outputs: []
+ steps:
+ - id: '#main/step1'
+ in:
+ - {id: '#main/step1/x', source: '#main/x'}
+ out: []
+ run: '#submit_tool.cwl'
+ id: '#main'
commit 15b5b54e438106afa0bf62ab69cafaf90b0b5d19
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 18 10:57:28 2017 -0500
10812: Use packed workflows for all run modes.
Conflicts:
sdk/python/arvados/commands/run.py
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 987b0d6..ff72e14 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -190,7 +190,7 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,
@@ -222,27 +222,13 @@ class RunnerContainer(Runner):
"properties": {}
}
- workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- if workflowcollection.startswith("keep:"):
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
- workflowname = os.path.basename(self.tool.tool["id"])
- workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
- container_req["mounts"]["/var/lib/cwl/workflow"] = {
- "kind": "collection",
- "portable_data_hash": "%s" % workflowcollection
- }
- elif workflowcollection.startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- wfuuid = workflowcollection[6:workflowcollection.index("#")]
- wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
- wfobj = yaml.safe_load(wfrecord["definition"])
- if container_req["name"].startswith("arvwf:"):
- container_req["name"] = wfrecord["name"]
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": wfobj
- }
- container_req["properties"]["template_uuid"] = wfuuid
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if packed["id"].startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = packed["id"][6:]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 11ef653..eee4ef6 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -10,6 +10,8 @@ from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
+import ruamel.yaml as yaml
+
import arvados.collection
from .arvdocker import arv_docker_get_image
@@ -231,6 +233,28 @@ class ArvadosJob(object):
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+ def upload_workflow_collection(self, packed):
+ collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(yaml.round_trip_dump(packed))
+
+ exists = api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=self.name,
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=self.arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
@@ -239,9 +263,10 @@ class RunnerJob(Runner):
a pipeline template or pipeline instance.
"""
- workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ wf_pdh = self.upload_workflow_collection(packed)
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+ self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 9e70a6e..01853e6 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import ruamel.yaml as yaml
-from .runner import upload_docker, upload_dependencies, trim_listing
+from .runner import upload_dependencies, trim_listing, packed_workflow
from .arvtool import ArvadosCommandTool
from .perf import Perf
@@ -22,11 +22,8 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
submit_runner_ram=0, name=None):
- upload_docker(arvRunner, tool)
- document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
-
- packed = pack(document_loader, workflowobj, uri, tool.metadata)
+ packed = packed_workflow(arvRunner, tool)
adjustDirObjs(job_order, trim_listing)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 1c3625e..d8630d4 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -4,6 +4,8 @@ from functools import partial
import logging
import json
import re
+import subprocess
+
from cStringIO import StringIO
from schema_salad.sourceline import SourceLine
@@ -118,43 +120,70 @@ def upload_docker(arvrunner, tool):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
-
-def upload_instance(arvrunner, name, tool, job_order):
- upload_docker(arvrunner, tool)
- for t in tool.tool["inputs"]:
- def setSecondary(fileobj):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+def packed_workflow(arvrunner, tool):
+ tool.visit(partial(upload_docker, arvrunner))
- if isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(e)
+ document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
+ packed = pack(document_loader, workflowobj, uri, tool.metadata)
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(job_order[shortname(t["id"])])
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H', 'path']).strip()
+ except OSError, subprocess.CalledProcessError:
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
+ mapper = {}
+ def upload_tool_deps(tool):
workflowmapper = upload_dependencies(arvrunner,
- name,
+ shortname(tool.tool["id"]),
tool.doc_loader,
tool.tool,
tool.tool["id"],
- True)
- jobmapper = upload_dependencies(arvrunner,
- os.path.basename(job_order.get("id", "#")),
- tool.doc_loader,
- job_order,
- job_order.get("id", "#"),
- False)
+ False)
+ for k,v in workflowmapper.items():
+ mapper[k] = v.resolved
- if "id" in job_order:
- del job_order["id"]
+ tool.visit(upload_tool_deps)
- return workflowmapper
+ def setloc(p):
+ if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+ p["location"] = mapper[p["location"]]
+ adjustFileObjs(packed, setloc)
+ adjustDirObjs(packed, setloc)
+
+ return packed
+
+def upload_instance(arvrunner, name, tool, job_order):
+ packed = packed_workflow(arvrunner, tool)
+
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
+ jobmapper = upload_dependencies(arvrunner,
+ os.path.basename(job_order.get("id", "#")),
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ return packed
def arvados_jobs_image(arvrunner):
img = "arvados/jobs:"+__version__
@@ -191,7 +220,7 @@ class Runner(object):
def update_pipeline_component(self, record):
pass
- def arvados_job_spec(self, *args, **kwargs):
+ def upload_instance(self, **kwargs):
if self.name is None:
self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
@@ -200,7 +229,7 @@ class Runner(object):
if "job_order" in self.job_order:
del self.job_order["job_order"]
- workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+ packed = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
adjustDirObjs(self.job_order, trim_listing)
return workflowmapper
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list