[ARVADOS] created: da32ae2a73012ce55cb89b2de9a4716b2800eee1

Git user git at public.curoverse.com
Thu Apr 13 15:13:48 EDT 2017


        at  da32ae2a73012ce55cb89b2de9a4716b2800eee1 (commit)


commit da32ae2a73012ce55cb89b2de9a4716b2800eee1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Apr 13 15:11:39 2017 -0400

    11462: Store CollectionReader objects in a central cache to avoid redundant
    fetches & storing multiple copies.  On pipeline with large inputs, reduced
    memory usage by up to 70%.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7c24310..1d2fa29 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -32,7 +32,7 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from ._version import __version__
@@ -80,6 +80,8 @@ class ArvCwlRunner(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
@@ -102,7 +104,8 @@ class ArvCwlRunner(object):
         kwargs["work_api"] = self.work_api
         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
                                                 api_client=self.api,
-                                                keep_client=self.keep_client)
+                                                fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+                                                num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
@@ -231,7 +234,6 @@ class ArvCwlRunner(object):
                                               keep_client=self.keep_client,
                                               num_retries=self.num_retries)
 
-        srccollections = {}
         for k,v in generatemapper.items():
             if k.startswith("_:"):
                 if v.type == "Directory":
@@ -245,17 +247,7 @@ class ArvCwlRunner(object):
                 raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
-            if srccollection not in srccollections:
-                try:
-                    srccollections[srccollection] = arvados.collection.CollectionReader(
-                        srccollection,
-                        api_client=self.api,
-                        keep_client=self.keep_client,
-                        num_retries=self.num_retries)
-                except arvados.errors.ArgumentError as e:
-                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
-                    raise
-            reader = srccollections[srccollection]
+            reader = self.collection_cache.get(srccollection)
             try:
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
@@ -331,8 +323,7 @@ class ArvCwlRunner(object):
         self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
-                                                                 api_client=self.api,
-                                                                 keep_client=self.keep_client)
+                                                                 collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
         if not kwargs.get("name"):
@@ -636,7 +627,6 @@ def add_arv_hints():
         "http://commonwl.org/cwltool#LoadListingRequirement"
     ])
 
-
 def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
 
@@ -702,6 +692,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     arvargs.relax_path_checks = True
     arvargs.validate = None
 
+    make_fs_access = partial(CollectionFsAccess,
+                           collection_cache=runner.collection_cache)
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
@@ -709,12 +702,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                              makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
-                             make_fs_access=partial(CollectionFsAccess,
-                                                    api_client=api_client,
-                                                    keep_client=keep_client),
+                             make_fs_access=make_fs_access,
                              fetcher_constructor=partial(CollectionFetcher,
                                                          api_client=api_client,
-                                                         keep_client=keep_client,
+                                                         fs_access=make_fs_access(""),
                                                          num_retries=runner.num_retries),
                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
                              logger_handler=arvados.log_handler,
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 3a3d160..34d9cea 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -20,24 +20,33 @@ from schema_salad.ref_resolver import DefaultFetcher
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+class CollectionCache(object):
+    def __init__(self, api_client, keep_client, num_retries):
+        self.api_client = api_client
+        self.keep_client = keep_client
+        self.collections = {}
+
+    def get(self, pdh):
+        if pdh not in self.collections:
+            logger.debug("Creating collection reader for %s", pdh)
+            self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
+                                                                        keep_client=self.keep_client)
+        return self.collections[pdh]
+
+
 class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 
-    def __init__(self, basedir, api_client=None, keep_client=None):
+    def __init__(self, basedir, collection_cache=None):
         super(CollectionFsAccess, self).__init__(basedir)
-        self.api_client = api_client
-        self.keep_client = keep_client
-        self.collections = {}
+        self.collection_cache = collection_cache
 
     def get_collection(self, path):
         sp = path.split("/", 1)
         p = sp[0]
         if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
             pdh = p[5:]
-            if pdh not in self.collections:
-                self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
-                                                                            keep_client=self.keep_client)
-            return (self.collections[pdh], sp[1] if len(sp) == 2 else None)
+            return (self.collection_cache.get(pdh), sp[1] if len(sp) == 2 else None)
         else:
             return (None, path)
 
@@ -137,10 +146,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
             return os.path.realpath(path)
 
 class CollectionFetcher(DefaultFetcher):
-    def __init__(self, cache, session, api_client=None, keep_client=None, num_retries=4):
+    def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
         super(CollectionFetcher, self).__init__(cache, session)
         self.api_client = api_client
-        self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
+        self.fsaccess = fs_access
         self.num_retries = num_retries
 
     def fetch_text(self, url):
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index 33305d9..08b50e0 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -41,7 +41,8 @@ class TestContainer(unittest.TestCase):
                 "baseCommand": "ls",
                 "arguments": [{"valueFrom": "$(runtime.outdir)"}]
             })
-            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}))
             arvtool.formatgraph = None
@@ -107,7 +108,8 @@ class TestContainer(unittest.TestCase):
             }],
             "baseCommand": "ls"
         })
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
                                                  loader=Loader({}))
@@ -208,7 +210,8 @@ class TestContainer(unittest.TestCase):
             }],
             "baseCommand": "ls"
         })
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
                                                  loader=Loader({}))
@@ -300,7 +303,8 @@ class TestContainer(unittest.TestCase):
             "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
             "arguments": [{"valueFrom": "$(runtime.outdir)"}]
         })
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}))
         arvtool.formatgraph = None
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 076514b..3061e2f 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -41,7 +41,8 @@ class TestJob(unittest.TestCase):
                 "baseCommand": "ls",
                 "arguments": [{"valueFrom": "$(runtime.outdir)"}]
             })
-            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}))
             arvtool.formatgraph = None
@@ -108,7 +109,8 @@ class TestJob(unittest.TestCase):
             }],
             "baseCommand": "ls"
         }
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
                                                  make_fs_access=make_fs_access, loader=Loader({}))
         arvtool.formatgraph = None
@@ -264,7 +266,8 @@ class TestWorkflow(unittest.TestCase):
 
         mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
 
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
                                               makeTool=runner.arv_make_tool, metadata=metadata)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list