[ARVADOS] updated: aaa964ac44f923985d6a6eb40c179f62c13ca8ce
Git user
git at public.curoverse.com
Mon Dec 5 17:06:42 EST 2016
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 3 +
sdk/cwl/arvados_cwl/arvcontainer.py | 67 +++++++++------
sdk/cwl/arvados_cwl/fsaccess.py | 20 ++---
sdk/cwl/arvados_cwl/pathmapper.py | 2 +
sdk/cwl/arvados_cwl/runner.py | 4 +-
sdk/cwl/tests/test_submit.py | 139 +++++++++++++++++++++++++++++++-
sdk/cwl/tests/wf/expect_arvworkflow.cwl | 24 ++++++
7 files changed, 223 insertions(+), 36 deletions(-)
create mode 100644 sdk/cwl/tests/wf/expect_arvworkflow.cwl
via aaa964ac44f923985d6a6eb40c179f62c13ca8ce (commit)
from deac6ece295b9c2049f8c91e1492324a97bf1b01 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit aaa964ac44f923985d6a6eb40c179f62c13ca8ce
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 5 17:06:37 2016 -0500
10576: Running jobs from keep: and arv: prefixes WIP. Tests passing, needs some integration testing.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c953b4e..f516a0b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -94,6 +94,9 @@ class ArvCwlRunner(object):
def arv_make_tool(self, toolpath_object, **kwargs):
kwargs["work_api"] = self.work_api
+ kwargs["fetcher_constructor"] = partial(CollectionFetcher,
+ api_client=self.api,
+ keep_client=self.keep_client)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 08da4ca..cea2b10 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -2,6 +2,8 @@ import logging
import json
import os
+import ruamel.yaml as yaml
+
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
from cwltool.pathmapper import adjustFiles
@@ -12,6 +14,7 @@ import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
from .runner import Runner, arvados_jobs_image
+from .fsaccess import CollectionFetcher
logger = logging.getLogger('arvados.cwl-runner')
@@ -177,28 +180,9 @@ class RunnerContainer(Runner):
json.dump(self.job_order, f, sort_keys=True, indent=4)
jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
- workflowname = os.path.basename(self.tool.tool["id"])
- workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
- workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
jobpath = "/var/lib/cwl/job/cwl.input.json"
- command = ["arvados-cwl-runner", "--local", "--api=containers"]
- if self.output_name:
- command.append("--output-name=" + self.output_name)
-
- if self.output_tags:
- command.append("--output-tags=" + self.output_tags)
-
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
-
- command.extend([workflowpath, jobpath])
-
- return {
- "command": command,
+ container_req = {
"owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": "/var/spool/cwl",
@@ -207,10 +191,6 @@ class RunnerContainer(Runner):
"state": "Committed",
"container_image": arvados_jobs_image(self.arvrunner),
"mounts": {
- "/var/lib/cwl/workflow": {
- "kind": "collection",
- "portable_data_hash": "%s" % workflowcollection
- },
jobpath: {
"kind": "collection",
"portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
@@ -231,6 +211,45 @@ class RunnerContainer(Runner):
}
}
+ workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+ if workflowcollection.startswith("keep:"):
+ workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+ workflowname = os.path.basename(self.tool.tool["id"])
+ workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+ container_req["mounts"]["/var/lib/cwl/workflow"] = {
+ "kind": "collection",
+ "portable_data_hash": "%s" % workflowcollection
+ }
+ elif workflowcollection.startswith("arvwf:"):
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ fetcher = CollectionFetcher({}, None,
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client)
+ wfobj = yaml.safe_load(fetcher.fetch_text(workflowcollection))
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": wfobj
+ }
+
+ command = ["arvados-cwl-runner", "--local", "--api=containers"]
+ if self.output_name:
+ command.append("--output-name=" + self.output_name)
+
+ if self.output_tags:
+ command.append("--output-tags=" + self.output_tags)
+
+ if self.enable_reuse:
+ command.append("--enable-reuse")
+ else:
+ command.append("--disable-reuse")
+
+ command.extend([workflowpath, jobpath])
+
+ container_req["command"] = command
+
+ return container_req
+
+
def run(self, *args, **kwargs):
kwargs["keepprefix"] = "keep:"
job_spec = self.arvados_job_spec(*args, **kwargs)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 9201ab6..d1a6463 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -4,6 +4,8 @@ import errno
import urlparse
import re
+import ruamel.yaml as yaml
+
import cwltool.stdfsaccess
from cwltool.pathmapper import abspath
import cwltool.resolver
@@ -134,16 +136,16 @@ class CollectionFetcher(DefaultFetcher):
def fetch_text(self, url):
if url.startswith("keep:"):
- with self.fsaccess.open(url) as f:
+ with self.fsaccess.open(url, "r") as f:
return f.read()
- if url.startswith("arv:"):
- return self.api_client.workflows().get(uuid=url[4:]).execute()["definition"]
+ if url.startswith("arvwf:"):
+ return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
if url.startswith("keep:"):
return self.fsaccess.exists(url)
- if url.startswith("arv:"):
+ if url.startswith("arvwf:"):
if self.fetch_text(url):
return True
return super(CollectionFetcher, self).check_exists(url)
@@ -157,7 +159,7 @@ class CollectionFetcher(DefaultFetcher):
return url
basesp = urlparse.urlsplit(base_url)
- if basesp.scheme == "keep":
+ if basesp.scheme in ("keep", "arvwf"):
if not basesp.path:
raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
@@ -166,7 +168,7 @@ class CollectionFetcher(DefaultFetcher):
pdh = baseparts.pop(0)
- if not arvados.util.keep_locator_pattern.match(pdh):
+ if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
if urlsp.path.startswith("/"):
@@ -177,7 +179,7 @@ class CollectionFetcher(DefaultFetcher):
baseparts.pop()
path = "/".join([pdh] + baseparts + urlparts)
- return urlparse.urlunsplit(("keep", "", path, "", urlsp.fragment))
+ return urlparse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
return super(CollectionFetcher, self).urljoin(base_url, url)
@@ -185,11 +187,11 @@ workflow_uuid_pattern = re.compile(r'[a-z0-9]{5}-7fd4e-[a-z0-9]{15}')
def collectionResolver(api_client, document_loader, uri):
if workflow_uuid_pattern.match(uri):
- return "arv:%s" % uri
+ return "arvwf:%s#main" % (uri)
p = uri.split("/")
if arvados.util.keep_locator_pattern.match(p[0]):
- return "keep:" + uri
+ return "keep:%s" % (uri)
if arvados.util.collection_uuid_pattern.match(p[0]):
return "keep:%s%s" % (self.api_client.collections().
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 58500d3..74d9481 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -47,6 +47,8 @@ class ArvPathMapper(PathMapper):
pass
else:
raise WorkflowException("File literal '%s' is missing contents" % src)
+ elif src.startswith("arvwf:"):
+ self._pathmap[src] = MapperEnt(src, src, "File")
else:
raise WorkflowException("Input file path '%s' is invalid" % st)
if "secondaryFiles" in srcobj:
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d7858cf..d6497c4 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -57,7 +57,7 @@ def upload_dependencies(arvrunner, name, document_loader,
loaded = set()
def loadref(b, u):
- joined = urlparse.urljoin(b, u)
+ joined = document_loader.fetcher.urljoin(b, u)
defrg, _ = urlparse.urldefrag(joined)
if defrg not in loaded:
loaded.add(defrg)
@@ -85,7 +85,7 @@ def upload_dependencies(arvrunner, name, document_loader,
sc = scandeps(uri, scanobj,
loadref_fields,
set(("$include", "$schemas", "location")),
- loadref)
+ loadref, urljoin=document_loader.fetcher.urljoin)
normalizeFilesDirs(sc)
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 70397fa..d489e69 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -492,7 +492,7 @@ class TestSubmit(unittest.TestCase):
@mock.patch("arvados.collection.CollectionReader")
@mock.patch("time.sleep")
@stubs
- def test_submit_keepref(self, stubs, tm, collectionReader):
+ def test_submit_file_keepref(self, stubs, tm, collectionReader):
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@@ -501,6 +501,143 @@ class TestSubmit(unittest.TestCase):
self.assertEqual(exited, 0)
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_keepref(self, stubs, tm, reader):
+ capture_stdout = cStringIO.StringIO()
+
+ with open("tests/wf/expect_arvworkflow.cwl") as f:
+ reader().open().__enter__().read.return_value = f.read()
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_container = {
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {
+ 'writable': True,
+ 'kind': 'collection'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/workflow': {
+ 'portable_data_hash': '99999999999999999999999999999994+99',
+ 'kind': 'collection'
+ },
+ '/var/lib/cwl/job/cwl.input.json': {
+ 'portable_data_hash': 'e5454f8ca7d5b181e21ecd45841e3373+58/cwl.input.json',
+ 'kind': 'collection'}
+ }, 'state': 'Committed',
+ 'owner_uuid': None,
+ 'output_path': '/var/spool/cwl',
+ 'name': 'expect_arvworkflow.cwl#main',
+ 'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/job/cwl.input.json'],
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'API': True,
+ 'vcpus': 1,
+ 'ram': 1073741824
+ }
+ }
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_arvworkflow(self, stubs, tm):
+ capture_stdout = cStringIO.StringIO()
+
+ with open("tests/wf/expect_arvworkflow.cwl") as f:
+ stubs.api.workflows().get().execute.return_value = {"definition": f.read()}
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "962eh-7fd4e-gkbzl62qqtfig37", "-x", "XxX"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_container = {
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {
+ 'writable': True,
+ 'kind': 'collection'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/workflow.json': {
+ 'kind': 'json',
+ 'json': {
+ 'cwlVersion': 'v1.0',
+ '$graph': [
+ {
+ 'inputs': [
+ {
+ 'inputBinding': {'position': 1},
+ 'type': 'string',
+ 'id': '#submit_tool.cwl/x'}
+ ],
+ 'requirements': [
+ {'dockerPull': 'debian:8', 'class': 'DockerRequirement'}
+ ],
+ 'id': '#submit_tool.cwl',
+ 'outputs': [],
+ 'baseCommand': 'cat',
+ 'class': 'CommandLineTool'
+ }, {
+ 'id': '#main',
+ 'inputs': [
+ {'type': 'string', 'id': '#main/x'}
+ ],
+ 'steps': [
+ {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+ 'run': '#submit_tool.cwl',
+ 'id': '#main/step1',
+ 'out': []}
+ ],
+ 'class': 'Workflow',
+ 'outputs': []
+ }
+ ]
+ }
+ },
+ '/var/lib/cwl/job/cwl.input.json': {
+ 'portable_data_hash': 'e5454f8ca7d5b181e21ecd45841e3373+58/cwl.input.json',
+ 'kind': 'collection'}
+ }, 'state': 'Committed',
+ 'owner_uuid': None,
+ 'output_path': '/var/spool/cwl',
+ 'name': 'arvwf:962eh-7fd4e-gkbzl62qqtfig37#main',
+ 'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/job/cwl.input.json'],
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'API': True,
+ 'vcpus': 1,
+ 'ram': 1073741824
+ }
+ }
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
@mock.patch("arvados.commands.keepdocker.find_one_image_hash")
@mock.patch("cwltool.docker.get_image")
@mock.patch("arvados.api")
diff --git a/sdk/cwl/tests/wf/expect_arvworkflow.cwl b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
new file mode 100644
index 0000000..56ce0d5
--- /dev/null
+++ b/sdk/cwl/tests/wf/expect_arvworkflow.cwl
@@ -0,0 +1,24 @@
+$graph:
+- baseCommand: cat
+ class: CommandLineTool
+ id: '#submit_tool.cwl'
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ inputBinding: {position: 1}
+ type: string
+ outputs: []
+ requirements:
+ - {class: DockerRequirement, dockerPull: 'debian:8'}
+- class: Workflow
+ id: '#main'
+ inputs:
+ - id: '#main/x'
+ type: string
+ outputs: []
+ steps:
+ - id: '#main/step1'
+ in:
+ - {id: '#main/step1/x', source: '#main/x'}
+ out: []
+ run: '#submit_tool.cwl'
+cwlVersion: v1.0
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list