[ARVADOS] updated: 39189b90bd2f73d4f4938049ffa4441a967ca24c
Git user
git at public.curoverse.com
Mon Jan 23 10:32:11 EST 2017
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 59 +++++--
sdk/cwl/arvados_cwl/arvcontainer.py | 29 ++-
sdk/cwl/arvados_cwl/arvjob.py | 48 ++++-
sdk/cwl/arvados_cwl/arvworkflow.py | 11 +-
sdk/cwl/arvados_cwl/fsaccess.py | 31 +++-
sdk/cwl/arvados_cwl/runner.py | 148 ++++++++++------
sdk/cwl/setup.py | 4 +-
sdk/cwl/tests/test_submit.py | 196 +++++++++++----------
sdk/cwl/tests/wf/expect_arvworkflow.cwl | 22 +--
sdk/cwl/tests/wf/expect_packed.cwl | 4 +-
.../wf/{expect_packed.cwl => submit_wf_packed.cwl} | 8 +-
11 files changed, 341 insertions(+), 219 deletions(-)
copy sdk/cwl/tests/wf/{expect_packed.cwl => submit_wf_packed.cwl} (53%)
via 39189b90bd2f73d4f4938049ffa4441a967ca24c (commit)
via dc5a33cbfc156c839515acb4fa6ea2f9162a0972 (commit)
via cd383b7168d9412f4f097438d590e919ff7a97d6 (commit)
via ddcaafee5dc10ee0104c108c9648f4d5024a83d9 (commit)
via 685af7fb2ae3a8ea162edd89eec61fdd4ca376f0 (commit)
via ae9f71231ed50eb35097c10c84b0070bcdcd22cf (commit)
via 01007e0a2c7cf5461ced83339f6abcfb6f9fac72 (commit)
via 6143bba5421756c78b282ee6c4da793d45a4523e (commit)
via 036c59ea6b19372e74f7ccacb5dcb2f522f99629 (commit)
via 7fa95f2db716ebfdb6312fa67b9b07bebb815b39 (commit)
via f40364c4d42e111b9da3873afcfaed2b49e7f182 (commit)
via 2f953026bc4baeccb78ca82acc4d07cad37625b8 (commit)
via 60d986b8908487c086eb4e402ac69669cb26108b (commit)
from c40389e0f064d4ea379d5f5471116936239a467a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 39189b90bd2f73d4f4938049ffa4441a967ca24c
Merge: c40389e dc5a33c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 23 10:32:05 2017 -0500
Merge branch '10812-cwl-separate-dependencies' refs #10812
commit dc5a33cbfc156c839515acb4fa6ea2f9162a0972
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 23 10:20:46 2017 -0500
10812: Handle expected NotFoundError error when checking if file exists in keep.
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 02e1656..a99b2a7 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -3,6 +3,7 @@ import os
import errno
import urlparse
import re
+import logging
import ruamel.yaml as yaml
@@ -13,9 +14,12 @@ import cwltool.resolver
import arvados.util
import arvados.collection
import arvados.arvfile
+import arvados.errors
from schema_salad.ref_resolver import DefaultFetcher
+logger = logging.getLogger('arvados.cwl-runner')
+
class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
@@ -146,11 +150,17 @@ class CollectionFetcher(DefaultFetcher):
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
- if url.startswith("keep:"):
- return self.fsaccess.exists(url)
- if url.startswith("arvwf:"):
- if self.fetch_text(url):
- return True
+ try:
+ if url.startswith("keep:"):
+ return self.fsaccess.exists(url)
+ if url.startswith("arvwf:"):
+ if self.fetch_text(url):
+ return True
+ except arvados.errors.NotFoundError:
+ return False
+ except:
+ logger.exception("Got unexpected exception checking if file exists:")
+ return False
return super(CollectionFetcher, self).check_exists(url)
def urljoin(self, base_url, url):
commit cd383b7168d9412f4f097438d590e919ff7a97d6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 23 09:52:34 2017 -0500
10812: Add num_retries to CollectionFetcher and collectionResolver.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index bf5c197..13135d0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -713,6 +713,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
keep_client=keep_client),
fetcher_constructor=partial(CollectionFetcher,
api_client=api_client,
- keep_client=keep_client),
- resolver=partial(collectionResolver, api_client),
+ keep_client=keep_client,
+ num_retries=runner.num_retries),
+ resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
logger_handler=arvados.log_handler)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index b249da7..02e1656 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -129,17 +129,18 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
return os.path.realpath(path)
class CollectionFetcher(DefaultFetcher):
- def __init__(self, cache, session, api_client=None, keep_client=None):
+ def __init__(self, cache, session, api_client=None, keep_client=None, num_retries=4):
super(CollectionFetcher, self).__init__(cache, session)
self.api_client = api_client
self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
+ self.num_retries = num_retries
def fetch_text(self, url):
if url.startswith("keep:"):
with self.fsaccess.open(url, "r") as f:
return f.read()
if url.startswith("arvwf:"):
- record = self.api_client.workflows().get(uuid=url[6:]).execute()
+ record = self.api_client.workflows().get(uuid=url[6:]).execute(num_retries=self.num_retries)
definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
return definition
return super(CollectionFetcher, self).fetch_text(url)
@@ -188,12 +189,12 @@ class CollectionFetcher(DefaultFetcher):
workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
pipeline_template_uuid_pattern = re.compile(r'[a-z0-9]{5}-p5p6p-[a-z0-9]{15}')
-def collectionResolver(api_client, document_loader, uri):
+def collectionResolver(api_client, document_loader, uri, num_retries=4):
if workflow_uuid_pattern.match(uri):
return "arvwf:%s#main" % (uri)
if pipeline_template_uuid_pattern.match(uri):
- pt = api_client.pipeline_templates().get(uuid=uri).execute()
+ pt = api_client.pipeline_templates().get(uuid=uri).execute(num_retries=num_retries)
return "keep:" + pt["components"].values()[0]["script_parameters"]["cwl:tool"]
p = uri.split("/")
commit ddcaafee5dc10ee0104c108c9648f4d5024a83d9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 20 11:36:05 2017 -0500
10812: Fix indentation typo.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 40abdd9..bf5c197 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -177,7 +177,7 @@ class ArvCwlRunner(object):
for p in proc_states["items"]:
self.on_message({
- "object_uuid": p["uuid"],
+ "object_uuid": p["uuid"],
"event_type": "update",
"properties": {
"new_attributes": p
commit 685af7fb2ae3a8ea162edd89eec61fdd4ca376f0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jan 19 13:29:11 2017 -0500
10812: Update cwltool and schema-salad dependencies.
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 55b0485..c3a5bcd 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -48,8 +48,8 @@ setup(name='arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170118141124',
- 'schema-salad==2.2.20170111180227',
+ 'cwltool==1.0.20170119234115',
+ 'schema-salad==2.2.20170119151016',
'ruamel.yaml==0.13.7',
'arvados-python-client>=0.1.20170112173420',
'setuptools'
commit ae9f71231ed50eb35097c10c84b0070bcdcd22cf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 18 17:46:54 2017 -0500
10812: Handle $schema references.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 2c028aa..d3e0a0e 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -98,6 +98,10 @@ def upload_dependencies(arvrunner, name, document_loader,
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
+ if "$schemas" in workflowobj:
+ for s in workflowobj["$schemas"]:
+ sc.append({"class": "File", "location": s})
+
mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
"keep:%s/%s",
@@ -109,6 +113,12 @@ def upload_dependencies(arvrunner, name, document_loader,
adjustFileObjs(workflowobj, setloc)
adjustDirObjs(workflowobj, setloc)
+ if "$schemas" in workflowobj:
+ sch = []
+ for s in workflowobj["$schemas"]:
+ sch.append(mapper.mapper(s).resolved)
+ workflowobj["$schemas"] = sch
+
return mapper
commit 01007e0a2c7cf5461ced83339f6abcfb6f9fac72
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 18 11:47:07 2017 -0500
10812: Don't use cStringIO which doesn't like unicode strings.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 6978b54..2c028aa 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -6,7 +6,7 @@ import json
import re
import subprocess
-from cStringIO import StringIO
+from StringIO import StringIO
from schema_salad.sourceline import SourceLine
commit 6143bba5421756c78b282ee6c4da793d45a4523e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 16 10:51:10 2017 -0500
10812: Check for dockerOutputDirectory and raise UnsupportedRequirement up front.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index e9c6bf1..40abdd9 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -211,6 +211,11 @@ class ArvCwlRunner(object):
raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
if obj.get("stderr"):
raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
+ if obj.get("class") == "DockerRequirement":
+ if obj.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 2a220ce..eaddca0 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl'
+ 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main'
},
'repository': 'arvados',
'script_version': 'master',
@@ -146,7 +146,7 @@ def stubs(func):
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
+ 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -180,7 +180,7 @@ def stubs(func):
'kind': 'collection'
},
'/var/lib/cwl/workflow.json': {
- 'json': expect_packed_workflow,
+ 'content': expect_packed_workflow,
'kind': 'json'
},
'stdout': {
@@ -680,7 +680,7 @@ class TestSubmit(unittest.TestCase):
},
'/var/lib/cwl/workflow.json': {
'kind': 'json',
- 'json': {
+ 'content': {
'cwlVersion': 'v1.0',
'$graph': [
{
@@ -1121,7 +1121,7 @@ class TestTemplateInputs(unittest.TestCase):
},
'script_parameters': {
'cwl:tool':
- '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl',
+ '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl#main',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
commit 036c59ea6b19372e74f7ccacb5dcb2f522f99629
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 16 10:22:30 2017 -0500
10812: Fix imports, reference #main in cwl:tool
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index fc38771..7b31802 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -4,13 +4,15 @@ import copy
import json
import time
-from cwltool.process import get_feature, shortname
+from cwltool.process import get_feature, shortname, UnsupportedRequirement
from cwltool.errors import WorkflowException
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
from cwltool.pathmapper import adjustDirObjs
+from schema_salad.sourceline import SourceLine
+
import ruamel.yaml as yaml
import arvados.collection
@@ -271,7 +273,7 @@ class RunnerJob(Runner):
else:
packed = packed_workflow(self.arvrunner, self.tool)
wf_pdh = self.upload_workflow_collection(packed)
- self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
+ self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
adjustDirObjs(self.job_order, trim_listing)
commit 7fa95f2db716ebfdb6312fa67b9b07bebb815b39
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 16:33:11 2017 -0500
10812: Improve check that already packed workflow collection exists with same
PDH and similar name. Don't crash when registering pipeline template that has
array inputs.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 02eca64..e9c6bf1 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 117eff4..235e9b8 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -236,7 +236,7 @@ class RunnerContainer(Runner):
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "json": packed
+ "content": packed
}
if self.tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 30056c7..fc38771 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -241,9 +241,11 @@ class RunnerJob(Runner):
with collection.open("workflow.cwl", "w") as f:
f.write(yaml.round_trip_dump(packed))
- exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]
+ if self.arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
+ exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
@@ -385,10 +387,12 @@ class RunnerTemplate(object):
if not isinstance(types, list):
types = [types]
param['required'] = 'null' not in types
- non_null_types = set(types) - set(['null'])
+ non_null_types = [t for t in types if t != "null"]
if len(non_null_types) == 1:
the_type = [c for c in non_null_types][0]
- dataclass = self.type_to_dataclass.get(the_type)
+ dataclass = None
+ if isinstance(the_type, basestring):
+ dataclass = self.type_to_dataclass.get(the_type)
if dataclass:
param['dataclass'] = dataclass
# Note: If we didn't figure out a single appropriate
commit f40364c4d42e111b9da3873afcfaed2b49e7f182
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 15:51:54 2017 -0500
10812: Don't try to upload embedded tools separately unless they have an
identifier.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index ddeac3a..6978b54 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -184,14 +184,15 @@ def upload_workflow_deps(arvrunner, tool):
document_loader = tool.doc_loader
def upload_tool_deps(deptool):
- upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- include_primary=False)
- document_loader.idx[deptool["id"]] = deptool
+ if "id" in deptool:
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
tool.visit(upload_tool_deps)
commit 2f953026bc4baeccb78ca82acc4d07cad37625b8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 11 08:41:08 2017 -0500
10812: Handle workflow keep references.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 14513c7..117eff4 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -190,8 +190,6 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- packed = packed_workflow(self.arvrunner, self.tool)
-
adjustDirObjs(self.job_order, trim_listing)
container_req = {
@@ -224,13 +222,24 @@ class RunnerContainer(Runner):
"properties": {}
}
- workflowpath = "/var/lib/cwl/workflow.json#main"
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": packed
- }
- if self.tool.tool.get("id", "").startswith("arvwf:"):
- container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+ if self.tool.tool.get("id", "").startswith("keep:"):
+ sp = self.tool.tool["id"].split('/')
+ workflowcollection = sp[0][5:]
+ workflowname = "/".join(sp[1:])
+ workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+ container_req["mounts"]["/var/lib/cwl/workflow"] = {
+ "kind": "collection",
+ "portable_data_hash": "%s" % workflowcollection
+ }
+ else:
+ packed = packed_workflow(self.arvrunner, self.tool)
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if self.tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index a4cd5cd..30056c7 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -265,7 +265,7 @@ class RunnerJob(Runner):
"""
if self.tool.tool["id"].startswith("keep:"):
- pass
+ self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
else:
packed = packed_workflow(self.arvrunner, self.tool)
wf_pdh = self.upload_workflow_collection(packed)
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 9e918fc..2a220ce 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
+ 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
@@ -238,7 +238,7 @@ class TestSubmit(unittest.TestCase):
def test_submit(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug",
+ ["--submit", "--no-wait", "--api=jobs", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
@@ -272,14 +272,14 @@ class TestSubmit(unittest.TestCase):
def test_submit_no_reuse(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--disable-reuse",
+ ["--submit", "--no-wait", "--api=jobs", "--debug", "--disable-reuse",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -290,14 +290,14 @@ class TestSubmit(unittest.TestCase):
def test_submit_on_error(self, stubs, tm):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--on-error=stop",
+ ["--submit", "--no-wait", "--api=jobs", "--debug", "--on-error=stop",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -314,9 +314,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -345,9 +345,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -364,9 +364,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["name"] = "hello job 123"
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["name"] = "hello job 123"
+
stubs.api.pipeline_instances().create.assert_called_with(
body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
@@ -384,9 +384,9 @@ class TestSubmit(unittest.TestCase):
capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
-
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
+
stubs.api.pipeline_instances().create.assert_called_with(
body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
@@ -456,11 +456,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--disable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -479,11 +479,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -503,12 +503,12 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-name="+output_name, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- stubs.expect_container_spec["output_name"] = output_name
+ expect_container["output_name"] = output_name
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -528,11 +528,11 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
- expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -550,9 +550,9 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["runtime_constraints"]["ram"] = 2048*1024*1024
-
expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["runtime_constraints"]["ram"] = 2048*1024*1024
+
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -628,6 +628,31 @@ class TestSubmit(unittest.TestCase):
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_jobs_keepref(self, stubs, tm, reader):
+ capture_stdout = cStringIO.StringIO()
+
+ with open("tests/wf/expect_arvworkflow.cwl") as f:
+ reader().open().__enter__().read.return_value = f.read()
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=jobs", "--debug",
+ "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["x"] = "XxX"
+ del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["y"]
+ del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["z"]
+ expect_pipeline["components"]["cwl-runner"]["script_parameters"]["cwl:tool"] = "99999999999999999999999999999994+99/expect_arvworkflow.cwl#main"
+ expect_pipeline["name"] = "expect_arvworkflow.cwl#main"
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=JsonDiffMatcher(expect_pipeline))
+
@mock.patch("time.sleep")
@stubs
def test_submit_arvworkflow(self, stubs, tm):
@@ -733,9 +758,9 @@ class TestSubmit(unittest.TestCase):
except:
logging.exception("")
- stubs.expect_container_spec["name"] = "hello container 123"
-
expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["name"] = "hello container 123"
+
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
@@ -1096,8 +1121,7 @@ class TestTemplateInputs(unittest.TestCase):
},
'script_parameters': {
'cwl:tool':
- '99999999999999999999999999999991+99/'
- 'wf/inputs_test.cwl',
+ '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
commit 60d986b8908487c086eb4e402ac69669cb26108b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 23 10:31:08 2017 -0500
10812: Use packed workflows for all run modes.
Conflicts:
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/runner.py
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index b766cec..02eca64 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from arvados.errors import ApiError
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_instance
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
@@ -177,7 +177,7 @@ class ArvCwlRunner(object):
for p in proc_states["items"]:
self.on_message({
- "object_uuid": p["uuid"],
+ "object_uuid": p["uuid"],
"event_type": "update",
"properties": {
"new_attributes": p
@@ -334,23 +334,44 @@ class ArvCwlRunner(object):
keep_client=self.keep_client)
self.fs_access = make_fs_access(kwargs["basedir"])
+ if not kwargs.get("name"):
+ kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+ # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+ # Also uploads docker images.
+ upload_workflow_deps(self, tool)
+
+ # Reload tool object which may have been updated by
+ # upload_workflow_deps
+ tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+ makeTool=self.arv_make_tool,
+ loader=tool.doc_loader,
+ avsc_names=tool.doc_schema,
+ metadata=tool.metadata)
+
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ tool, job_order)
+
existing_uuid = kwargs.get("update_workflow")
if existing_uuid or kwargs.get("create_workflow"):
+ # Create a pipeline template or workflow record and exit.
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
kwargs.get("enable_reuse"),
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"))
+ name=kwargs["name"])
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
- else:
+ elif self.work_api == "containers":
return (upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name")), "success")
+ self.project_uuid,
+ uuid=existing_uuid,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"]),
+ "success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
@@ -360,9 +381,6 @@ class ArvCwlRunner(object):
kwargs["tmpdir_prefix"] = "tmp"
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
- if not kwargs["name"]:
- del kwargs["name"]
-
if self.work_api == "containers":
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
@@ -373,13 +391,18 @@ class ArvCwlRunner(object):
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
-
runnerjob = None
if kwargs.get("submit"):
+ # Submit a runner job to run the workflow for us.
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool":
kwargs["runnerjob"] = tool.tool["id"]
+ upload_dependencies(self,
+ kwargs["name"],
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ False)
runnerjob = tool.job(job_order,
self.output_callback,
**kwargs).next()
@@ -400,7 +423,7 @@ class ArvCwlRunner(object):
on_error=kwargs.get("on_error"),
submit_runner_image=kwargs.get("submit_runner_image"))
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index a3a26aa..14513c7 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -6,14 +6,14 @@ import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles
+from cwltool.pathmapper import adjustFiles, adjustDirObjs
from cwltool.utils import aslist
import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .fsaccess import CollectionFetcher
logger = logging.getLogger('arvados.cwl-runner')
@@ -190,7 +190,9 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = packed_workflow(self.arvrunner, self.tool)
+
+ adjustDirObjs(self.job_order, trim_listing)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,
@@ -222,27 +224,13 @@ class RunnerContainer(Runner):
"properties": {}
}
- workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- if workflowcollection.startswith("keep:"):
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
- workflowname = os.path.basename(self.tool.tool["id"])
- workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
- container_req["mounts"]["/var/lib/cwl/workflow"] = {
- "kind": "collection",
- "portable_data_hash": "%s" % workflowcollection
- }
- elif workflowcollection.startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- wfuuid = workflowcollection[6:workflowcollection.index("#")]
- wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
- wfobj = yaml.safe_load(wfrecord["definition"])
- if container_req["name"].startswith("arvwf:"):
- container_req["name"] = wfrecord["name"]
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": wfobj
- }
- container_req["properties"]["template_uuid"] = wfuuid
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if self.tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 4bbbda2..a4cd5cd 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -9,11 +9,14 @@ from cwltool.errors import WorkflowException
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
+from cwltool.pathmapper import adjustDirObjs
+
+import ruamel.yaml as yaml
import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .pathmapper import InitialWorkDirPathMapper
from .perf import Perf
from . import done
@@ -231,6 +234,28 @@ class ArvadosJob(object):
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+ def upload_workflow_collection(self, packed):
+ collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(yaml.round_trip_dump(packed))
+
+ exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=self.name,
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=self.arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
@@ -239,9 +264,14 @@ class RunnerJob(Runner):
a pipeline template or pipeline instance.
"""
- workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ if self.tool.tool["id"].startswith("keep:"):
+ pass
+ else:
+ packed = packed_workflow(self.arvrunner, self.tool)
+ wf_pdh = self.upload_workflow_collection(packed)
+ self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+ adjustDirObjs(self.job_order, trim_listing)
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 9e70a6e..cd7e41a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import ruamel.yaml as yaml
-from .runner import upload_docker, upload_dependencies, trim_listing
+from .runner import upload_dependencies, trim_listing, packed_workflow
from .arvtool import ArvadosCommandTool
from .perf import Perf
@@ -22,11 +22,8 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
submit_runner_ram=0, name=None):
- upload_docker(arvRunner, tool)
- document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
-
- packed = pack(document_loader, workflowobj, uri, tool.metadata)
+ packed = packed_workflow(arvRunner, tool)
adjustDirObjs(job_order, trim_listing)
@@ -39,8 +36,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
if not name:
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
- upload_dependencies(arvRunner, name, document_loader,
- packed, uri, False)
+ upload_dependencies(arvRunner, name, tool.doc_loader,
+ packed, tool.tool["id"], False)
# TODO nowhere for submit_runner_ram to go.
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 500ea0f..b249da7 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -139,7 +139,9 @@ class CollectionFetcher(DefaultFetcher):
with self.fsaccess.open(url, "r") as f:
return f.read()
if url.startswith("arvwf:"):
- return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
+ record = self.api_client.workflows().get(uuid=url[6:]).execute()
+ definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
+ return definition
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 09ca63b..ddeac3a 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -4,6 +4,8 @@ from functools import partial
import logging
import json
import re
+import subprocess
+
from cStringIO import StringIO
from schema_salad.sourceline import SourceLine
@@ -16,6 +18,7 @@ from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
from cwltool.builder import substitute
+from cwltool.pack import pack
import arvados.collection
import ruamel.yaml as yaml
@@ -45,7 +48,7 @@ def trim_listing(obj):
del obj["location"]
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run):
+ workflowobj, uri, loadref_run, include_primary=True):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
@@ -92,7 +95,7 @@ def upload_dependencies(arvrunner, name, document_loader,
normalizeFilesDirs(sc)
- if "id" in workflowobj:
+ if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
mapper = ArvPathMapper(arvrunner, sc, "",
@@ -110,6 +113,7 @@ def upload_dependencies(arvrunner, name, document_loader,
def upload_docker(arvrunner, tool):
+ """Visitor which uploads Docker images referenced in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
@@ -118,43 +122,78 @@ def upload_docker(arvrunner, tool):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
-
-def upload_instance(arvrunner, name, tool, job_order):
- upload_docker(arvrunner, tool)
-
- for t in tool.tool["inputs"]:
- def setSecondary(fileobj):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
-
- if isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(e)
-
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(job_order[shortname(t["id"])])
-
- workflowmapper = upload_dependencies(arvrunner,
- name,
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
- True)
- jobmapper = upload_dependencies(arvrunner,
- os.path.basename(job_order.get("id", "#")),
- tool.doc_loader,
- job_order,
- job_order.get("id", "#"),
- False)
-
- if "id" in job_order:
- del job_order["id"]
-
- return workflowmapper
+
+def packed_workflow(arvrunner, tool):
+ """Create a packed workflow.
+
+ A "packed" workflow is one where all the components have been combined into a single document."""
+
+ return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+ tool.tool["id"], tool.metadata)
+
+def tag_git_version(packed):
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ except (OSError, subprocess.CalledProcessError):
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
+
+
+def upload_job_order(arvrunner, name, tool, job_order):
+ """Upload local files referenced in the input object and return updated input
+ object with 'location' updated to the proper keep references.
+ """
+
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
+ jobmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line.
+ if "job_order" in job_order:
+ del job_order["job_order"]
+
+ return job_order
+
+def upload_workflow_deps(arvrunner, tool):
+ # Ensure that Docker images needed by this workflow are available
+ tool.visit(partial(upload_docker, arvrunner))
+
+ document_loader = tool.doc_loader
+
+ def upload_tool_deps(deptool):
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
+
+ tool.visit(upload_tool_deps)
def arvados_jobs_image(arvrunner, img):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
@@ -166,6 +205,9 @@ def arvados_jobs_image(arvrunner, img):
return img
class Runner(object):
+ """Base class for runner processes, which submit an instance of
+ arvados-cwl-runner and wait for the final result."""
+
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None):
@@ -193,20 +235,9 @@ class Runner(object):
def update_pipeline_component(self, record):
pass
- def arvados_job_spec(self, *args, **kwargs):
- if self.name is None:
- self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
-
- # Need to filter this out, gets added by cwltool when providing
- # parameters on the command line.
- if "job_order" in self.job_order:
- del self.job_order["job_order"]
-
- workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
- adjustDirObjs(self.job_order, trim_listing)
- return workflowmapper
-
def done(self, record):
+ """Base method for handling a completed runner."""
+
try:
if record["state"] == "Complete":
if record.get("exit_code") is not None:
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 042272e..9e918fc 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -17,6 +17,8 @@ import arvados.keep
from .matcher import JsonDiffMatcher
from .mock_discovery import get_rootDesc
+import ruamel.yaml as yaml
+
_rootDesc = None
def stubs(func):
@@ -104,7 +106,7 @@ def stubs(func):
'script_parameters': {
'x': {
'basename': 'blorp.txt',
- 'location': 'keep:99999999999999999999999999999994+99/blorp.txt',
+ 'location': 'keep:99999999999999999999999999999992+99/blorp.txt',
'class': 'File'
},
'y': {
@@ -122,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
@@ -139,12 +141,12 @@ def stubs(func):
'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
'script_parameters': {
'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+ 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999992+99/blorp.txt'}},
'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+ 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -167,6 +169,9 @@ def stubs(func):
stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+ with open("tests/wf/submit_wf_packed.cwl") as f:
+ expect_packed_workflow = yaml.round_trip_load(f)
+
stubs.expect_container_spec = {
'priority': 1,
'mounts': {
@@ -174,9 +179,9 @@ def stubs(func):
'writable': True,
'kind': 'collection'
},
- '/var/lib/cwl/workflow': {
- 'portable_data_hash': '99999999999999999999999999999991+99',
- 'kind': 'collection'
+ '/var/lib/cwl/workflow.json': {
+ 'json': expect_packed_workflow,
+ 'kind': 'json'
},
'stdout': {
'path': '/var/spool/cwl/cwl.output.json',
@@ -186,7 +191,7 @@ def stubs(func):
'kind': 'json',
'content': {
'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
- 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999994+99/blorp.txt'},
+ 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999992+99/blorp.txt'},
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}
@@ -198,7 +203,7 @@ def stubs(func):
'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'output_path': '/var/spool/cwl',
@@ -240,32 +245,24 @@ class TestSubmit(unittest.TestCase):
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'replication_desired': None,
- 'name': 'New collection'
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
- }, ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input',
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
stubs.api.pipeline_instances().create.assert_called_with(
- body=expect_pipeline)
+ body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
@@ -426,27 +423,19 @@ class TestSubmit(unittest.TestCase):
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'name': 'New collection',
- 'replication_desired': None,
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
- }, ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input',
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -469,7 +458,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--disable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -492,7 +481,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=stop',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -516,7 +505,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-name="+output_name, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_spec["output_name"] = output_name
expect_container = copy.deepcopy(stubs.expect_container_spec)
@@ -541,7 +530,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
@@ -670,6 +659,20 @@ class TestSubmit(unittest.TestCase):
'cwlVersion': 'v1.0',
'$graph': [
{
+ 'id': '#main',
+ 'inputs': [
+ {'type': 'string', 'id': '#main/x'}
+ ],
+ 'steps': [
+ {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+ 'run': '#submit_tool.cwl',
+ 'id': '#main/step1',
+ 'out': []}
+ ],
+ 'class': 'Workflow',
+ 'outputs': []
+ },
+ {
'inputs': [
{
'inputBinding': {'position': 1},
@@ -683,19 +686,6 @@ class TestSubmit(unittest.TestCase):
'outputs': [],
'baseCommand': 'cat',
'class': 'CommandLineTool'
- }, {
- 'id': '#main',
- 'inputs': [
- {'type': 'string', 'id': '#main/x'}
- ],
- 'steps': [
- {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
- 'run': '#submit_tool.cwl',
- 'id': '#main/step1',
- 'out': []}
- ],
- 'class': 'Workflow',
- 'outputs': []
}
]
}
@@ -747,7 +737,7 @@ class TestSubmit(unittest.TestCase):
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
- body=expect_container)
+ body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
@@ -844,7 +834,7 @@ class TestCreateTemplate(unittest.TestCase):
'dataclass': 'File',
'required': True,
'type': 'File',
- 'value': '99999999999999999999999999999994+99/blorp.txt',
+ 'value': '99999999999999999999999999999992+99/blorp.txt',
}
expect_component['script_parameters']['y'] = {
'dataclass': 'Collection',
@@ -1167,7 +1157,7 @@ class TestTemplateInputs(unittest.TestCase):
expect_template = copy.deepcopy(self.expect_template)
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
+ params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
params["floatInput"]["value"] = 1.234
params["boolInput"]["value"] = True
diff --git a/sdk/cwl/tests/wf/expect_arvworkflow.cwl b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
index 56ce0d5..4a3a037 100644
--- a/sdk/cwl/tests/wf/expect_arvworkflow.cwl
+++ b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
@@ -1,14 +1,5 @@
+cwlVersion: v1.0
$graph:
-- baseCommand: cat
- class: CommandLineTool
- id: '#submit_tool.cwl'
- inputs:
- - id: '#submit_tool.cwl/x'
- inputBinding: {position: 1}
- type: string
- outputs: []
- requirements:
- - {class: DockerRequirement, dockerPull: 'debian:8'}
- class: Workflow
id: '#main'
inputs:
@@ -21,4 +12,13 @@ $graph:
- {id: '#main/step1/x', source: '#main/x'}
out: []
run: '#submit_tool.cwl'
-cwlVersion: v1.0
+- baseCommand: cat
+ class: CommandLineTool
+ id: '#submit_tool.cwl'
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ inputBinding: {position: 1}
+ type: string
+ outputs: []
+ requirements:
+ - {class: DockerRequirement, dockerPull: 'debian:8'}
diff --git a/sdk/cwl/tests/wf/expect_packed.cwl b/sdk/cwl/tests/wf/expect_packed.cwl
index f4d60db..d7b9d61 100644
--- a/sdk/cwl/tests/wf/expect_packed.cwl
+++ b/sdk/cwl/tests/wf/expect_packed.cwl
@@ -9,7 +9,7 @@ $graph:
type: File
default:
class: File
- location: keep:99999999999999999999999999999991+99/tool/blub.txt
+ location: keep:99999999999999999999999999999991+99/blub.txt
inputBinding:
position: 1
outputs: []
@@ -19,7 +19,7 @@ $graph:
inputs:
- id: '#main/x'
type: File
- default: {class: File, location: 'keep:99999999999999999999999999999991+99/input/blorp.txt',
+ default: {class: File, location: 'keep:99999999999999999999999999999992+99/blorp.txt',
basename: blorp.txt}
- id: '#main/y'
type: Directory
diff --git a/sdk/cwl/tests/wf/expect_packed.cwl b/sdk/cwl/tests/wf/submit_wf_packed.cwl
similarity index 53%
copy from sdk/cwl/tests/wf/expect_packed.cwl
copy to sdk/cwl/tests/wf/submit_wf_packed.cwl
index f4d60db..1f7fee0 100644
--- a/sdk/cwl/tests/wf/expect_packed.cwl
+++ b/sdk/cwl/tests/wf/submit_wf_packed.cwl
@@ -9,7 +9,7 @@ $graph:
type: File
default:
class: File
- location: keep:99999999999999999999999999999991+99/tool/blub.txt
+ location: keep:99999999999999999999999999999991+99/blub.txt
inputBinding:
position: 1
outputs: []
@@ -19,16 +19,10 @@ $graph:
inputs:
- id: '#main/x'
type: File
- default: {class: File, location: 'keep:99999999999999999999999999999991+99/input/blorp.txt',
- basename: blorp.txt}
- id: '#main/y'
type: Directory
- default: {class: Directory, location: 'keep:99999999999999999999999999999998+99',
- basename: 99999999999999999999999999999998+99}
- id: '#main/z'
type: Directory
- default: {class: Directory, basename: anonymous, listing: [{basename: renamed.txt,
- class: File, location: 'keep:99999999999999999999999999999998+99/file1.txt'}]}
outputs: []
steps:
- id: '#main/step1'
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list