[ARVADOS] created: da32ae2a73012ce55cb89b2de9a4716b2800eee1
Git user
git at public.curoverse.com
Thu Apr 13 15:13:48 EDT 2017
at da32ae2a73012ce55cb89b2de9a4716b2800eee1 (commit)
commit da32ae2a73012ce55cb89b2de9a4716b2800eee1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Apr 13 15:11:39 2017 -0400
11462: Store CollectionReader objects in a central cache to avoid redundant
fetches & storing multiple copies. On pipeline with large inputs, reduced
memory usage by up to 70%.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7c24310..1d2fa29 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -32,7 +32,7 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from ._version import __version__
@@ -80,6 +80,8 @@ class ArvCwlRunner(object):
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
self.work_api = None
expected_api = ["jobs", "containers"]
for api in expected_api:
@@ -102,7 +104,8 @@ class ArvCwlRunner(object):
kwargs["work_api"] = self.work_api
kwargs["fetcher_constructor"] = partial(CollectionFetcher,
api_client=self.api,
- keep_client=self.keep_client)
+ fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+ num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
@@ -231,7 +234,6 @@ class ArvCwlRunner(object):
keep_client=self.keep_client,
num_retries=self.num_retries)
- srccollections = {}
for k,v in generatemapper.items():
if k.startswith("_:"):
if v.type == "Directory":
@@ -245,17 +247,7 @@ class ArvCwlRunner(object):
raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
- if srccollection not in srccollections:
- try:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
- except arvados.errors.ArgumentError as e:
- logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
- raise
- reader = srccollections[srccollection]
+ reader = self.collection_cache.get(srccollection)
try:
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
@@ -331,8 +323,7 @@ class ArvCwlRunner(object):
self.project_uuid = kwargs.get("project_uuid")
self.pipeline = None
make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
- api_client=self.api,
- keep_client=self.keep_client)
+ collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
if not kwargs.get("name"):
@@ -636,7 +627,6 @@ def add_arv_hints():
"http://commonwl.org/cwltool#LoadListingRequirement"
])
-
def main(args, stdout, stderr, api_client=None, keep_client=None):
parser = arg_parser()
@@ -702,6 +692,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
arvargs.relax_path_checks = True
arvargs.validate = None
+ make_fs_access = partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
@@ -709,12 +702,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=partial(CollectionFsAccess,
- api_client=api_client,
- keep_client=keep_client),
+ make_fs_access=make_fs_access,
fetcher_constructor=partial(CollectionFetcher,
api_client=api_client,
- keep_client=keep_client,
+ fs_access=make_fs_access(""),
num_retries=runner.num_retries),
resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
logger_handler=arvados.log_handler,
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 3a3d160..34d9cea 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -20,24 +20,33 @@ from schema_salad.ref_resolver import DefaultFetcher
logger = logging.getLogger('arvados.cwl-runner')
+class CollectionCache(object):
+ def __init__(self, api_client, keep_client, num_retries):
+ self.api_client = api_client
+ self.keep_client = keep_client
+ self.collections = {}
+
+ def get(self, pdh):
+ if pdh not in self.collections:
+ logger.debug("Creating collection reader for %s", pdh)
+ self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
+ keep_client=self.keep_client)
+ return self.collections[pdh]
+
+
class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
- def __init__(self, basedir, api_client=None, keep_client=None):
+ def __init__(self, basedir, collection_cache=None):
super(CollectionFsAccess, self).__init__(basedir)
- self.api_client = api_client
- self.keep_client = keep_client
- self.collections = {}
+ self.collection_cache = collection_cache
def get_collection(self, path):
sp = path.split("/", 1)
p = sp[0]
if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
pdh = p[5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
- keep_client=self.keep_client)
- return (self.collections[pdh], sp[1] if len(sp) == 2 else None)
+ return (self.collection_cache.get(pdh), sp[1] if len(sp) == 2 else None)
else:
return (None, path)
@@ -137,10 +146,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
return os.path.realpath(path)
class CollectionFetcher(DefaultFetcher):
- def __init__(self, cache, session, api_client=None, keep_client=None, num_retries=4):
+ def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
super(CollectionFetcher, self).__init__(cache, session)
self.api_client = api_client
- self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
+ self.fsaccess = fs_access
self.num_retries = num_retries
def fetch_text(self, url):
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index 33305d9..08b50e0 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -41,7 +41,8 @@ class TestContainer(unittest.TestCase):
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
@@ -107,7 +108,8 @@ class TestContainer(unittest.TestCase):
}],
"baseCommand": "ls"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
loader=Loader({}))
@@ -208,7 +210,8 @@ class TestContainer(unittest.TestCase):
}],
"baseCommand": "ls"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
loader=Loader({}))
@@ -300,7 +303,8 @@ class TestContainer(unittest.TestCase):
"stdin": "/keep/99999999999999999999999999999996+99/file.txt",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 076514b..3061e2f 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -41,7 +41,8 @@ class TestJob(unittest.TestCase):
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
@@ -108,7 +109,8 @@ class TestJob(unittest.TestCase):
}],
"baseCommand": "ls"
}
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
@@ -264,7 +266,8 @@ class TestWorkflow(unittest.TestCase):
mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=document_loader,
makeTool=runner.arv_make_tool, metadata=metadata)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list