[ARVADOS] created: 8a1a96b62a5c91926ed0bbb01042cdda7b16887c

Git user git at public.curoverse.com
Wed Feb 8 11:12:20 EST 2017


        at  8a1a96b62a5c91926ed0bbb01042cdda7b16887c (commit)


commit 8a1a96b62a5c91926ed0bbb01042cdda7b16887c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Feb 8 11:11:42 2017 -0500

    9397: arvados-cwl-runner implementation of InitialWorkDir on container API.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 2842e8a..024e9d0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -33,7 +33,7 @@ from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
 from .perf import Perf
-from .pathmapper import FinalOutputPathMapper
+from .pathmapper import NoFollowPathMapper
 from ._version import __version__
 
 from cwltool.pack import pack
@@ -200,9 +200,6 @@ class ArvCwlRunner(object):
 
     def check_features(self, obj):
         if isinstance(obj, dict):
-            if obj.get("class") == "InitialWorkDirRequirement":
-                if self.work_api == "containers":
-                    raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
             if obj.get("writable"):
                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
             if obj.get("class") == "CommandLineTool":
@@ -233,7 +230,7 @@ class ArvCwlRunner(object):
         adjustDirObjs(outputObj, capture)
         adjustFileObjs(outputObj, capture)
 
-        generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
+        generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
 
         final = arvados.collection.Collection(api_client=self.api,
                                               keep_client=self.keep_client,
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c4db17..1216f2d 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -15,8 +15,11 @@ from .arvdocker import arv_docker_get_image
 from . import done
 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
 from .fsaccess import CollectionFetcher
+from .pathmapper import NoFollowPathMapper
+from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 class ArvadosContainer(object):
     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
@@ -50,23 +53,54 @@ class ArvadosContainer(object):
 
         dirs = set()
         for f in self.pathmapper.files():
-            _, p, tp = self.pathmapper.mapper(f)
-            if tp == "Directory" and '/' not in p[6:]:
+            pdh, p, tp = self.pathmapper.mapper(f)
+            if tp == "Directory" and '/' not in pdh:
                 mounts[p] = {
                     "kind": "collection",
-                    "portable_data_hash": p[6:]
+                    "portable_data_hash": pdh[5:]
                 }
-                dirs.add(p[6:])
+                dirs.add(pdh)
+
         for f in self.pathmapper.files():
-            _, p, tp = self.pathmapper.mapper(f)
-            if p[6:].split("/")[0] not in dirs:
+            res, p, tp = self.pathmapper.mapper(f)
+            pdh, path = res[5:].split("/", 1)
+            if pdh not in dirs:
                 mounts[p] = {
                     "kind": "collection",
-                    "portable_data_hash": p[6:]
+                    "portable_data_hash": pdh
                 }
-
-        if self.generatefiles["listing"]:
-            raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
+                if rest:
+                    mounts[p]["path"] = path
+
+        with Perf(metrics, "generatefiles %s" % self.name):
+            if self.generatefiles["listing"]:
+                vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                    keep_client=self.arvrunner.keep_client,
+                                                    num_retries=self.arvrunner.num_retries)
+                generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
+                                                    separateDirs=False)
+
+                with Perf(metrics, "createfiles %s" % self.name):
+                    for f, p in generatemapper.items():
+                        if not p.target:
+                            pass
+                        elif p.type in ("File", "Directory"):
+                            source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+                            vwd.copy(path, p.target, source_collection=source)
+                        elif p.type == "CreateFile":
+                            with vwd.open(p.target, "w") as n:
+                                n.write(p.resolved.encode("utf-8"))
+
+                with Perf(metrics, "generatefiles.save_new %s" % self.name):
+                    vwd.save_new()
+
+                for f, p in generatemapper.items():
+                    if not p.target:
+                        continue
+                    mountpoint = "%s/%s" % (self.outdir, p.target)
+                    mounts[mountpoint] = {"kind": "collection",
+                                          "portable_data_hash": vwd.portable_data_hash(),
+                                          "path": p.target}
 
         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
         if self.environment:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index b7f72a9..04a6295 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -19,7 +19,7 @@ import arvados.collection
 
 from .arvdocker import arv_docker_get_image
 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
-from .pathmapper import InitialWorkDirPathMapper
+from .pathmapper import VwdPathMapper
 from .perf import Perf
 from . import done
 from ._version import __version__
@@ -51,8 +51,8 @@ class ArvadosJob(object):
                                                     keep_client=self.arvrunner.keep_client,
                                                     num_retries=self.arvrunner.num_retries)
                 script_parameters["task.vwd"] = {}
-                generatemapper = InitialWorkDirPathMapper([self.generatefiles], "", "",
-                                                          separateDirs=False)
+                generatemapper = VwdPathMapper([self.generatefiles], "", "",
+                                               separateDirs=False)
 
                 with Perf(metrics, "createfiles %s" % self.name):
                     for f, p in generatemapper.items():
@@ -60,8 +60,9 @@ class ArvadosJob(object):
                             with vwd.open(p.target, "w") as n:
                                 n.write(p.resolved.encode("utf-8"))
 
-                with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                    vwd.save_new()
+                if vwd:
+                    with Perf(metrics, "generatefiles.save_new %s" % self.name):
+                        vwd.save_new()
 
                 for f, p in generatemapper.items():
                     if p.type == "File":
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index a99b2a7..a18388f 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -30,13 +30,13 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
         self.collections = {}
 
     def get_collection(self, path):
-        p = path.split("/")
-        if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
-            pdh = p[0][5:]
+        p, rest = path.split("/", 1)
+        if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
+            pdh = p[5:]
             if pdh not in self.collections:
                 self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
                                                                             keep_client=self.keep_client)
-            return (self.collections[pdh], "/".join(p[1:]))
+            return (self.collections[pdh], rest)
         else:
             return (None, path)
 
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 63d36f5..1f6aa57 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -183,7 +183,7 @@ class StagingPathMapper(PathMapper):
                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
 
 
-class InitialWorkDirPathMapper(StagingPathMapper):
+class VwdPathMapper(StagingPathMapper):
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
 
@@ -196,7 +196,7 @@ class InitialWorkDirPathMapper(StagingPathMapper):
                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
 
 
-class FinalOutputPathMapper(StagingPathMapper):
+class NoFollowPathMapper(StagingPathMapper):
     _follow_dirs = False
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index 45e2c7c..8a0fdf6 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -152,6 +152,102 @@ class TestContainer(unittest.TestCase):
         for key in call_body:
             self.assertEqual(call_body_expected.get(key), call_body.get(key))
 
+
+    # The test passes some fields in builder.resources
+    # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    @mock.patch("arvados.collection.Collection")
+    def test_initial_work_dir(self, collection_mock, keepdocker):
+        arv_docker_clear_cache()
+        runner = mock.MagicMock()
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.ignore_docker_for_reuse = False
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        runner.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
+
+        sourcemock = mock.MagicMock()
+        runner.fs_access.get_collection.return_value = (sourcemock, "bar")
+
+        vwdmock = mock.MagicMock()
+        collection_mock.return_value = vwdmock
+        vwdmock.portable_data_hash.return_value = "99999999999999999999999999999996+99"
+
+        tool = cmap({
+            "inputs": [],
+            "outputs": [],
+            "hints": [{
+                "class": "InitialWorkDirRequirement",
+                "listing": [{
+                    "class": "File",
+                    "basename": "foo",
+                    "location": "keep:99999999999999999999999999999995+99/bar"
+                },
+                {
+                    "class": "Directory",
+                    "basename": "foo2",
+                    "location": "keep:99999999999999999999999999999997+99"
+                }]
+            }],
+            "baseCommand": "ls"
+        })
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
+                                                 avsc_names=avsc_names, make_fs_access=make_fs_access,
+                                                 loader=Loader({}))
+        arvtool.formatgraph = None
+        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
+                             make_fs_access=make_fs_access, tmpdir="/tmp"):
+            j.run()
+
+        call_args, call_kwargs = runner.api.container_requests().create.call_args
+
+        vwdmock.copy.assert_has_calls([mock.call('bar', 'foo', source_collection=sourcemock)],
+                                      [mock.call('', 'foo2', source_collection=sourcemock)])
+
+        call_body_expected = {
+                'environment': {
+                    'HOME': '/var/spool/cwl',
+                    'TMPDIR': '/tmp'
+                },
+                'name': 'test_initial_work_dir',
+                'runtime_constraints': {
+                    'vcpus': 1,
+                    'ram': 1073741824
+                },
+                'use_existing': True,
+                'priority': 1,
+                'mounts': {
+                    '/var/spool/cwl': {'kind': 'tmp'},
+                    '/var/spool/cwl/foo': {
+                        'kind': 'collection',
+                        'path': 'foo',
+                        'portable_data_hash': '99999999999999999999999999999996+99'
+                    },
+                    '/var/spool/cwl/foo2': {
+                        'kind': 'collection',
+                        'path': 'foo2',
+                        'portable_data_hash': '99999999999999999999999999999996+99'
+                    }
+                },
+                'state': 'Committed',
+                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                'output_path': '/var/spool/cwl',
+                'container_image': '99999999999999999999999999999993+99',
+                'command': ['ls'],
+                'cwd': '/var/spool/cwl',
+                'scheduling_parameters': {
+                },
+                'properties': {}
+        }
+
+        call_body = call_kwargs.get('body', None)
+        self.assertNotEqual(None, call_body)
+        for key in call_body:
+            self.assertEqual(call_body_expected.get(key), call_body.get(key))
+
     @mock.patch("arvados.collection.Collection")
     def test_done(self, col):
         api = mock.MagicMock()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list