[ARVADOS] created: 8a1a96b62a5c91926ed0bbb01042cdda7b16887c
Git user
git at public.curoverse.com
Wed Feb 8 11:12:20 EST 2017
at 8a1a96b62a5c91926ed0bbb01042cdda7b16887c (commit)
commit 8a1a96b62a5c91926ed0bbb01042cdda7b16887c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Feb 8 11:11:42 2017 -0500
9397: arvados-cwl-runner implementation of InitialWorkDir on container API.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 2842e8a..024e9d0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -33,7 +33,7 @@ from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
from .perf import Perf
-from .pathmapper import FinalOutputPathMapper
+from .pathmapper import NoFollowPathMapper
from ._version import __version__
from cwltool.pack import pack
@@ -200,9 +200,6 @@ class ArvCwlRunner(object):
def check_features(self, obj):
if isinstance(obj, dict):
- if obj.get("class") == "InitialWorkDirRequirement":
- if self.work_api == "containers":
- raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
if obj.get("writable"):
raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
if obj.get("class") == "CommandLineTool":
@@ -233,7 +230,7 @@ class ArvCwlRunner(object):
adjustDirObjs(outputObj, capture)
adjustFileObjs(outputObj, capture)
- generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
+ generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
final = arvados.collection.Collection(api_client=self.api,
keep_client=self.keep_client,
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c4db17..1216f2d 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -15,8 +15,11 @@ from .arvdocker import arv_docker_get_image
from . import done
from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .fsaccess import CollectionFetcher
+from .pathmapper import NoFollowPathMapper
+from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
class ArvadosContainer(object):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
@@ -50,23 +53,54 @@ class ArvadosContainer(object):
dirs = set()
for f in self.pathmapper.files():
- _, p, tp = self.pathmapper.mapper(f)
- if tp == "Directory" and '/' not in p[6:]:
+ pdh, p, tp = self.pathmapper.mapper(f)
+ if tp == "Directory" and '/' not in pdh:
mounts[p] = {
"kind": "collection",
- "portable_data_hash": p[6:]
+ "portable_data_hash": pdh[5:]
}
- dirs.add(p[6:])
+ dirs.add(pdh)
+
for f in self.pathmapper.files():
- _, p, tp = self.pathmapper.mapper(f)
- if p[6:].split("/")[0] not in dirs:
+ res, p, tp = self.pathmapper.mapper(f)
+ pdh, path = res[5:].split("/", 1)
+ if pdh not in dirs:
mounts[p] = {
"kind": "collection",
- "portable_data_hash": p[6:]
+ "portable_data_hash": pdh
}
-
- if self.generatefiles["listing"]:
- raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
+ if rest:
+ mounts[p]["path"] = path
+
+ with Perf(metrics, "generatefiles %s" % self.name):
+ if self.generatefiles["listing"]:
+ vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
+ separateDirs=False)
+
+ with Perf(metrics, "createfiles %s" % self.name):
+ for f, p in generatemapper.items():
+ if not p.target:
+ pass
+ elif p.type in ("File", "Directory"):
+ source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+ vwd.copy(path, p.target, source_collection=source)
+ elif p.type == "CreateFile":
+ with vwd.open(p.target, "w") as n:
+ n.write(p.resolved.encode("utf-8"))
+
+ with Perf(metrics, "generatefiles.save_new %s" % self.name):
+ vwd.save_new()
+
+ for f, p in generatemapper.items():
+ if not p.target:
+ continue
+ mountpoint = "%s/%s" % (self.outdir, p.target)
+ mounts[mountpoint] = {"kind": "collection",
+ "portable_data_hash": vwd.portable_data_hash(),
+ "path": p.target}
container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
if self.environment:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index b7f72a9..04a6295 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -19,7 +19,7 @@ import arvados.collection
from .arvdocker import arv_docker_get_image
from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
-from .pathmapper import InitialWorkDirPathMapper
+from .pathmapper import VwdPathMapper
from .perf import Perf
from . import done
from ._version import __version__
@@ -51,8 +51,8 @@ class ArvadosJob(object):
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
script_parameters["task.vwd"] = {}
- generatemapper = InitialWorkDirPathMapper([self.generatefiles], "", "",
- separateDirs=False)
+ generatemapper = VwdPathMapper([self.generatefiles], "", "",
+ separateDirs=False)
with Perf(metrics, "createfiles %s" % self.name):
for f, p in generatemapper.items():
@@ -60,8 +60,9 @@ class ArvadosJob(object):
with vwd.open(p.target, "w") as n:
n.write(p.resolved.encode("utf-8"))
- with Perf(metrics, "generatefiles.save_new %s" % self.name):
- vwd.save_new()
+ if vwd:
+ with Perf(metrics, "generatefiles.save_new %s" % self.name):
+ vwd.save_new()
for f, p in generatemapper.items():
if p.type == "File":
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index a99b2a7..a18388f 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -30,13 +30,13 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
self.collections = {}
def get_collection(self, path):
- p = path.split("/")
- if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
- pdh = p[0][5:]
+ p, rest = path.split("/", 1)
+ if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
+ pdh = p[5:]
if pdh not in self.collections:
self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
keep_client=self.keep_client)
- return (self.collections[pdh], "/".join(p[1:]))
+ return (self.collections[pdh], rest)
else:
return (None, path)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 63d36f5..1f6aa57 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -183,7 +183,7 @@ class StagingPathMapper(PathMapper):
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
-class InitialWorkDirPathMapper(StagingPathMapper):
+class VwdPathMapper(StagingPathMapper):
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
@@ -196,7 +196,7 @@ class InitialWorkDirPathMapper(StagingPathMapper):
self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
-class FinalOutputPathMapper(StagingPathMapper):
+class NoFollowPathMapper(StagingPathMapper):
_follow_dirs = False
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index 45e2c7c..8a0fdf6 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -152,6 +152,102 @@ class TestContainer(unittest.TestCase):
for key in call_body:
self.assertEqual(call_body_expected.get(key), call_body.get(key))
+
+ # The test passes some fields in builder.resources
+ # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ @mock.patch("arvados.collection.Collection")
+ def test_initial_work_dir(self, collection_mock, keepdocker):
+ arv_docker_clear_cache()
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ sourcemock = mock.MagicMock()
+ runner.fs_access.get_collection.return_value = (sourcemock, "bar")
+
+ vwdmock = mock.MagicMock()
+ collection_mock.return_value = vwdmock
+ vwdmock.portable_data_hash.return_value = "99999999999999999999999999999996+99"
+
+ tool = cmap({
+ "inputs": [],
+ "outputs": [],
+ "hints": [{
+ "class": "InitialWorkDirRequirement",
+ "listing": [{
+ "class": "File",
+ "basename": "foo",
+ "location": "keep:99999999999999999999999999999995+99/bar"
+ },
+ {
+ "class": "Directory",
+ "basename": "foo2",
+ "location": "keep:99999999999999999999999999999997+99"
+ }]
+ }],
+ "baseCommand": "ls"
+ })
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
+ avsc_names=avsc_names, make_fs_access=make_fs_access,
+ loader=Loader({}))
+ arvtool.formatgraph = None
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
+ make_fs_access=make_fs_access, tmpdir="/tmp"):
+ j.run()
+
+ call_args, call_kwargs = runner.api.container_requests().create.call_args
+
+ vwdmock.copy.assert_has_calls([mock.call('bar', 'foo', source_collection=sourcemock)],
+ [mock.call('', 'foo2', source_collection=sourcemock)])
+
+ call_body_expected = {
+ 'environment': {
+ 'HOME': '/var/spool/cwl',
+ 'TMPDIR': '/tmp'
+ },
+ 'name': 'test_initial_work_dir',
+ 'runtime_constraints': {
+ 'vcpus': 1,
+ 'ram': 1073741824
+ },
+ 'use_existing': True,
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'},
+ '/var/spool/cwl/foo': {
+ 'kind': 'collection',
+ 'path': 'foo',
+ 'portable_data_hash': '99999999999999999999999999999996+99'
+ },
+ '/var/spool/cwl/foo2': {
+ 'kind': 'collection',
+ 'path': 'foo2',
+ 'portable_data_hash': '99999999999999999999999999999996+99'
+ }
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': ['ls'],
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {
+ },
+ 'properties': {}
+ }
+
+ call_body = call_kwargs.get('body', None)
+ self.assertNotEqual(None, call_body)
+ for key in call_body:
+ self.assertEqual(call_body_expected.get(key), call_body.get(key))
+
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
api = mock.MagicMock()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list