[arvados] created: 2.1.0-2962-g67e56f190
git repository hosting
git at public.arvados.org
Tue Oct 18 18:45:45 UTC 2022
at 67e56f190b9a78e3c45cc7d90510fc631e0d04b6 (commit)
commit 67e56f190b9a78e3c45cc7d90510fc631e0d04b6
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Oct 17 20:21:06 2022 -0400
18842: Add disk cache, test python sdk/fuse disk_cache=true/false
Off by default in the SDK but enabled by default with arv-mount
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 08a05d571..aa38d7383 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -319,7 +319,8 @@ def main(args=sys.argv[1:],
# Make an API object now so errors are reported early.
api_client.users().current().execute()
if keep_client is None:
- keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+ keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4, block_cache=block_cache)
executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout)
except WorkflowException as e:
logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
new file mode 100644
index 000000000..24f249f1d
--- /dev/null
+++ b/sdk/python/arvados/diskcache.py
@@ -0,0 +1,113 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import threading
+import mmap
+import os
+import traceback
+import stat
+import tempfile
+import hashlib
+
+class DiskCacheSlot(object):
+ __slots__ = ("locator", "ready", "content", "cachedir")
+
+ def __init__(self, locator, cachedir):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+ self.cachedir = cachedir
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ try:
+ if value is None:
+ self.content = None
+ return
+
+ if len(value) == 0:
+ # Can't mmap a 0 length file
+ self.content = b''
+ return
+
+ if self.content is not None:
+ # Has been set already
+ return
+
+ blockdir = os.path.join(self.cachedir, self.locator[0:3])
+ os.makedirs(blockdir, mode=0o700, exist_ok=True)
+
+ final = os.path.join(blockdir, self.locator)
+
+ f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
+ tmpfile = f.name
+ os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
+ f.write(value)
+ f.flush()
+ os.rename(tmpfile, final)
+
+ self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+ except Exception as e:
+ traceback.print_exc()
+ finally:
+ self.ready.set()
+
+ def size(self):
+ if self.content is None:
+ return 0
+ else:
+ return len(self.content)
+
+ def evict(self):
+ if self.content is not None and len(self.content) > 0:
+ # The mmap region might be in use when we decided to evict
+ # it. This can happen if the cache is too small.
+ #
+ # If we call close() now, it'll throw an error if
+ # something tries to access it.
+ #
+ # However, we don't need to explicitly call mmap.close()
+ #
+ # I confirmed in mmapmodule.c that that both close
+ # and deallocate do the same thing:
+ #
+ # a) close the file descriptor
+ # b) unmap the memory range
+ #
+ # So we can forget it in the cache and delete the file on
+ # disk, and it will tear it down after any other
+ # lingering Python references to the mapped memory are
+ # gone.
+
+ blockdir = os.path.join(self.cachedir, self.locator[0:3])
+ final = os.path.join(blockdir, self.locator)
+ try:
+ os.remove(final)
+ except OSError:
+ pass
+
+ @staticmethod
+ def get_from_disk(locator, cachedir):
+ # Get it, check it, return it
+ blockdir = os.path.join(cachedir, locator[0:3])
+ final = os.path.join(blockdir, locator)
+
+ try:
+ filehandle = open(final, "rb")
+ content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
+ disk_md5 = hashlib.md5(content).hexdigest()
+ if disk_md5 == locator:
+ dc = DiskCacheSlot(locator, cachedir)
+ dc.content = content
+ dc.ready.set()
+ return dc
+ except FileNotFoundError:
+ pass
+ except Exception as e:
+ traceback.print_exc()
+
+ return None
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 44e915776..7f316c153 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -26,6 +26,7 @@ import socket
import ssl
import sys
import threading
+import resource
from . import timer
import urllib.parse
@@ -39,6 +40,7 @@ import arvados.config as config
import arvados.errors
import arvados.retry as retry
import arvados.util
+import arvados.diskcache
_logger = logging.getLogger('arvados.keep')
global_client_object = None
@@ -174,11 +176,36 @@ class Keep(object):
return Keep.global_client_object().put(data, **kwargs)
class KeepBlockCache(object):
- # Default RAM cache is 256MiB
- def __init__(self, cache_max=(256 * 1024 * 1024)):
+ def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
self._cache = []
self._cache_lock = threading.Lock()
+ self._max_slots = max_slots
+ self._disk_cache = disk_cache
+ self._disk_cache_dir = disk_cache_dir
+
+ if self._disk_cache and self._disk_cache_dir is None:
+ self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+ if self._max_slots == 0:
+ if self._disk_cache:
+ # default set max slots to half of maximum file handles
+ self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+ else:
+ self._max_slots = 1024
+
+ if self.cache_max == 0:
+ if self._disk_cache:
+ fs = os.statvfs(self._disk_cache_dir)
+ avail = (fs.f_bavail * fs.f_bsize) / 2
+ # Half the available space or max_slots * 64 MiB
+ self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
+ else:
+ # 256 GiB in RAM
+ self.cache_max = (256 * 1024 * 1024)
+
+ self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
@@ -202,6 +229,9 @@ class KeepBlockCache(object):
else:
return len(self.content)
+ def evict(self):
+ pass
+
def cap_cache(self):
'''Cap the cache size to self.cache_max'''
with self._cache_lock:
@@ -209,9 +239,10 @@ class KeepBlockCache(object):
# None (that means there was an error reading the block).
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and sm > self.cache_max:
+ while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
for i in range(len(self._cache)-1, -1, -1):
if self._cache[i].ready.is_set():
+ self._cache[i].evict()
del self._cache[i]
break
sm = sum([slot.size() for slot in self._cache])
@@ -226,6 +257,12 @@ class KeepBlockCache(object):
del self._cache[i]
self._cache.insert(0, n)
return n
+ if self._disk_cache:
+ # see if it exists on disk
+ n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+ if n is not None:
+ self._cache.insert(0, n)
+ return n
return None
def get(self, locator):
@@ -241,7 +278,10 @@ class KeepBlockCache(object):
return n, False
else:
# Add a new cache slot for the locator
- n = KeepBlockCache.CacheSlot(locator)
+ if self._disk_cache:
+ n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+ else:
+ n = KeepBlockCache.CacheSlot(locator)
self._cache.insert(0, n)
return n, True
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index d28df0998..af60b6d38 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -61,6 +61,6 @@ setup(name='arvados-python-client',
'Programming Language :: Python :: 3',
],
test_suite='tests',
- tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML'],
+ tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML', 'parameterized'],
zip_safe=False
)
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index d9b3ca86c..a574508cb 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -280,3 +280,18 @@ if sys.version_info < (3, 0):
return self.assertNotRegexpMatches(*args, **kwargs)
unittest.TestCase.assertRegex = assertRegex
unittest.TestCase.assertNotRegex = assertNotRegex
+
+def binary_compare(a, b):
+ if len(a) != len(b):
+ return False
+ for i in range(0, len(a)):
+ if a[i] != b[i]:
+ return False
+ return True
+
+def make_block_cache(disk_cache):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+ if disk_cache:
+ disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ return block_cache
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index b4849c21f..8aded823b 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -16,11 +16,13 @@ import datetime
import ciso8601
import time
import unittest
+import parameterized
from . import run_test_server
from arvados._ranges import Range, LocatorAndRange
from arvados.collection import Collection, CollectionReader
from . import arvados_testutil as tutil
+from .arvados_testutil import make_block_cache
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
@@ -28,9 +30,10 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
def current_state(self):
return self.dump_state(copy.deepcopy)
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
tutil.ArvadosBaseTestCase):
+ disk_cache = False
MAIN_SERVER = {}
@classmethod
@@ -40,7 +43,8 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
run_test_server.authorize_with('admin')
cls.api_client = arvados.api('v1')
cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
- local_store=cls.local_store)
+ local_store=cls.local_store,
+ block_cache=make_block_cache(cls.disk_cache))
def write_foo_bar_baz(self):
cw = arvados.CollectionWriter(self.api_client)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 87e4eefb2..126116393 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -15,12 +15,15 @@ import os
import pycurl
import random
import re
+import shutil
import socket
import sys
import time
import unittest
import urllib.parse
+import parameterized
+
import arvados
import arvados.retry
import arvados.util
@@ -28,7 +31,11 @@ from . import arvados_testutil as tutil
from . import keepstub
from . import run_test_server
+from .arvados_testutil import make_block_cache
+
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepTestCase(run_test_server.TestCaseWithServers):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
@@ -38,7 +45,8 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
run_test_server.authorize_with("admin")
cls.api_client = arvados.api('v1')
cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
- proxy='', local_store='')
+ proxy='', local_store='',
+ block_cache=make_block_cache(cls.disk_cache))
def test_KeepBasicRWTest(self):
self.assertEqual(0, self.keep_client.upload_counter.get())
@@ -52,8 +60,8 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
self.assertEqual(6, self.keep_client.upload_counter.get())
self.assertEqual(0, self.keep_client.download_counter.get())
- self.assertEqual(self.keep_client.get(foo_locator),
- b'foo',
+ self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
+ b'foo'),
'wrong content from Keep.get(md5("foo"))')
self.assertEqual(3, self.keep_client.download_counter.get())
@@ -128,13 +136,15 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
b'test_head',
'wrong content from Keep.get for "test_head"')
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {'blob_signing': True}
def test_KeepBasicRWTest(self):
run_test_server.authorize_with('active')
- keep_client = arvados.KeepClient()
+ keep_client = arvados.KeepClient(block_cache=make_block_cache(self.disk_cache))
foo_locator = keep_client.put('foo')
self.assertRegex(
foo_locator,
@@ -171,8 +181,9 @@ class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
keep_client.get,
unsigned_bar_locator)
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepProxyTestCase(run_test_server.TestCaseWithServers):
+ disk_cache = False
MAIN_SERVER = {}
KEEP_SERVER = {}
KEEP_PROXY_SERVER = {}
@@ -190,7 +201,7 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
# Will use ARVADOS_KEEP_SERVICES environment variable that
# is set by setUpClass().
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
+ local_store='', block_cache=make_block_cache(self.disk_cache))
baz_locator = keep_client.put('baz')
self.assertRegex(
baz_locator,
@@ -206,7 +217,8 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
# existing proxy setting and setting multiple proxies
arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
+ local_store='',
+ block_cache=make_block_cache(self.disk_cache))
uris = [x['_service_root'] for x in keep_client._keep_services]
self.assertEqual(uris, ['http://10.0.0.1/',
'https://foo.example.org:1234/'])
@@ -215,12 +227,15 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
with self.assertRaises(arvados.errors.ArgumentError):
keep_client = arvados.KeepClient(api_client=self.api_client,
- local_store='')
-
+ local_store='',
+ block_cache=make_block_cache(self.disk_cache))
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def get_service_roots(self, api_client):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
return [urllib.parse.urlparse(url) for url in sorted(services)]
@@ -239,7 +254,8 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_recognize_proxy_services_in_controller_response(self):
keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
- service_type='proxy', service_host='localhost', service_port=9, count=1))
+ service_type='proxy', service_host='localhost', service_port=9, count=1),
+ block_cache=make_block_cache(self.disk_cache))
try:
# this will fail, but it ensures we get the service
# discovery response
@@ -254,7 +270,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client.insecure = True
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
self.assertEqual(
mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
@@ -265,7 +281,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client.insecure = False
with tutil.mock_keep_responses(b'foo', 200) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
# getopt()==None here means we didn't change the
# default. If we were using real pycurl instead of a mock,
@@ -286,7 +302,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
headers = {'X-Keep-Locator':local_loc}
with tutil.mock_keep_responses('', 200, **headers):
# Check that the translated locator gets returned
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
# Check that refresh_signature() uses the correct method and headers
keep_client._get_or_head = mock.MagicMock()
@@ -305,7 +321,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
@@ -322,7 +338,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepWriteError):
keep_client.put(b'foo')
self.assertEqual(
@@ -339,7 +355,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
@@ -356,7 +372,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get('ffffffffffffffffffffffffffffffff')
self.assertEqual(
@@ -373,7 +389,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = self.mock_keep_services(service_type='proxy', count=1)
force_timeout = socket.timeout("timed out")
with tutil.mock_keep_responses(force_timeout, 0) as mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.head('ffffffffffffffffffffffffffffffff')
self.assertEqual(
@@ -407,7 +423,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = mock.MagicMock(name='api_client')
api_client.keep_services().accessible().execute.side_effect = (
arvados.errors.ApiError)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with self.assertRaises(exc_class) as err_check:
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
self.assertEqual(0, len(err_check.exception.request_errors()))
@@ -427,7 +443,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
"retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
num_retries=3)
self.assertEqual([502, 502], [
@@ -450,7 +466,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(2, len(exc_check.exception.request_errors()))
@@ -460,7 +476,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
keep_client.put(data)
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
@@ -469,7 +485,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
body = b'oddball service get'
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(body, 200):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
actual = keep_client.get(tutil.str_keep_locator(body))
self.assertEqual(body, actual)
@@ -478,7 +494,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
pdh = tutil.str_keep_locator(body)
api_client = self.mock_keep_services(service_type='fancynewblobstore')
with tutil.mock_keep_responses(pdh, 200):
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=1)
self.assertEqual(pdh, actual)
@@ -490,17 +506,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
headers = {'x-keep-replicas-stored': 3}
with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
**headers) as req_mock:
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
actual = keep_client.put(body, copies=2)
self.assertEqual(pdh, actual)
self.assertEqual(1, req_mock.call_count)
-
@tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
@@ -524,19 +542,22 @@ class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_head_and_then_get_return_different_responses(self, get_mock):
head_resp = None
get_resp = None
- get_mock.side_effect = ['first response', 'second response']
+ get_mock.side_effect = [b'first response', b'second response']
with tutil.mock_keep_responses(self.data, 200, 200):
head_resp = self.keep_client.head(self.locator)
get_resp = self.keep_client.get(self.locator)
- self.assertEqual('first response', head_resp)
+ self.assertEqual(b'first response', head_resp)
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
@tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
@@ -550,7 +571,7 @@ class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
}
}
api_client = self.mock_keep_services(api_mock=api_mock, count=2)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
resp_hdr = {
'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
'x-keep-replicas-stored': 1
@@ -644,10 +665,13 @@ class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
@tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
self.data = b'xyzzy'
self.locator = '1271ed5ef305aadabc605b1609e24c52'
self.test_id = arvados.util.new_request_id()
@@ -732,7 +756,9 @@ class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
@tutil.skip_sleep
+#@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
def setUp(self):
# expected_order[i] is the probe order for
@@ -755,7 +781,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
hashlib.md5(self.blocks[x]).hexdigest()
for x in range(len(self.expected_order))]
self.api_client = self.mock_keep_services(count=self.services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
def test_weighted_service_roots_against_reference_set(self):
# Confirm weighted_service_roots() returns the correct order
@@ -828,12 +854,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
initial_services = 12
self.api_client = self.mock_keep_services(count=initial_services)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
probes_before = [
self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
for added_services in range(1, 12):
api_client = self.mock_keep_services(count=initial_services+added_services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
total_penalty = 0
for hash_index in range(len(hashes)):
probe_after = keep_client.weighted_service_roots(
@@ -869,7 +895,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
# Arbitrary port number:
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=self.services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
with mock.patch('pycurl.Curl') as curl_mock, \
self.assertRaises(exc_class) as err_check:
curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
@@ -885,8 +911,10 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_put_error_shows_probe_order(self):
self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
+ disk_cache = False
+
# BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
# 1s worth of data and then trigger bandwidth errors before running
# out of data.
@@ -923,7 +951,7 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
- timeout=timeouts)
+ timeout=timeouts, block_cache=make_block_cache(self.disk_cache))
def test_timeout_slow_connect(self):
# Can't simulate TCP delays with our own socket. Leave our
@@ -1027,8 +1055,10 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
+ disk_cache = False
+
def mock_disks_and_gateways(self, disks=3, gateways=1):
self.gateways = [{
'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
@@ -1043,7 +1073,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
for gw in self.gateways]
self.api_client = self.mock_keep_services(
count=disks, additional_services=self.gateways)
- self.keepClient = arvados.KeepClient(api_client=self.api_client)
+ self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
@mock.patch('pycurl.Curl')
def test_get_with_gateway_hint_first(self, MockCurl):
@@ -1124,8 +1154,9 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
MockCurl.return_value.getopt(pycurl.URL).decode())
-
class KeepClientRetryTestMixin(object):
+ disk_cache = False
+
# Testing with a local Keep store won't exercise the retry behavior.
# Instead, our strategy is:
# * Create a client with one proxy specified (pointed at a black
@@ -1150,6 +1181,7 @@ class KeepClientRetryTestMixin(object):
def new_client(self, **caller_kwargs):
kwargs = self.client_kwargs.copy()
kwargs.update(caller_kwargs)
+ kwargs['block_cache'] = make_block_cache(self.disk_cache)
return arvados.KeepClient(**kwargs)
def run_method(self, *args, **kwargs):
@@ -1199,6 +1231,7 @@ class KeepClientRetryTestMixin(object):
@tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
@@ -1243,6 +1276,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
self.check_success(locator=self.HINTED_LOCATOR)
@tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = True
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
@@ -1281,6 +1315,7 @@ class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
self.check_success(locator=self.HINTED_LOCATOR)
@tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
@@ -1369,13 +1404,16 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
@tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
+ block_cache = False
+
# Test put()s that need two distinct servers to succeed, possibly
# requiring multiple passes through the retry loop.
def setUp(self):
self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
def test_success_after_exception(self):
with tutil.mock_keep_responses(
@@ -1402,7 +1440,10 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
self.keep_client.put('foo', num_retries=1, copies=2)
self.assertEqual(2, req_mock.call_count)
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepClientAPIErrorTest(unittest.TestCase):
+ disk_cache = False
+
def test_api_fail(self):
class ApiMock(object):
def __getattr__(self, r):
@@ -1415,7 +1456,8 @@ class KeepClientAPIErrorTest(unittest.TestCase):
else:
raise arvados.errors.KeepReadError()
keep_client = arvados.KeepClient(api_client=ApiMock(),
- proxy='', local_store='')
+ proxy='', local_store='',
+ block_cache=make_block_cache(self.disk_cache))
# The bug this is testing for is that if an API (not
# keepstore) exception is thrown as part of a get(), the next
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 5f0a1f80f..994c99882 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -83,13 +83,20 @@ class ArgumentParser(argparse.ArgumentParser):
type=str, metavar='PATH', action='append', default=[],
help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
+
self.add_argument('--debug', action='store_true', help="""Debug mode""")
self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
- self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
- self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
+ self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
+ self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
+
+ cachetype = self.add_mutually_exclusive_group()
+ cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
+ cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
+
+ self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
@@ -213,8 +220,12 @@ class Mount(object):
try:
self.api = arvados.safeapi.ThreadSafeApiCache(
apiconfig=arvados.config.settings(),
+ # default value of file_cache is 0, this tells KeepBlockCache to
+ # choose a default based on whether disk_cache is enabled or not.
keep_params={
- 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+ 'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
+ disk_cache=self.args.disk_cache,
+ disk_cache_dir=self.args.disk_cache_dir),
'num_retries': self.args.retries,
})
except KeyError as e:
diff --git a/services/fuse/setup.py b/services/fuse/setup.py
index 545b4bfa0..d0c46f132 100644
--- a/services/fuse/setup.py
+++ b/services/fuse/setup.py
@@ -59,6 +59,6 @@ setup(name='arvados_fuse',
'Programming Language :: Python :: 3',
],
test_suite='tests',
- tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
+ tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML', 'parameterized',],
zip_safe=False
)
diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py
index 7cf8aa373..a24005064 100644
--- a/services/fuse/tests/mount_test_base.py
+++ b/services/fuse/tests/mount_test_base.py
@@ -4,6 +4,7 @@
from __future__ import absolute_import
import arvados
+import arvados.keep
import arvados_fuse as fuse
import arvados.safeapi
import llfuse
@@ -24,7 +25,16 @@ logger = logging.getLogger('arvados.arv-mount')
from .integration_test import workerPool
+def make_block_cache(disk_cache):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+ if disk_cache:
+ disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+ shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ return block_cache
+
class MountTestBase(unittest.TestCase):
+ block_cache = False
+
def setUp(self, api=None, local_store=True):
# The underlying C implementation of open() makes a fstat() syscall
# with the GIL still held. When the GETATTR message comes back to
@@ -43,7 +53,8 @@ class MountTestBase(unittest.TestCase):
self.mounttmp = tempfile.mkdtemp()
run_test_server.run()
run_test_server.authorize_with("admin")
- self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+
+ self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.block_cache)})
self.llfuse_thread = None
# This is a copy of Mount's method. TODO: Refactor MountTestBase
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 1601db594..df3d42634 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -16,6 +16,7 @@ import subprocess
import time
import unittest
import tempfile
+import parameterized
import arvados
import arvados_fuse as fuse
@@ -54,7 +55,7 @@ class AssertWithTimeout(object):
else:
self.done = True
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseMountTest(MountTestBase):
def setUp(self):
super(FuseMountTest, self).setUp()
@@ -125,6 +126,7 @@ class FuseMountTest(MountTestBase):
self.assertEqual(v, f.read().decode())
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseMagicTest(MountTestBase):
def setUp(self, api=None):
super(FuseMagicTest, self).setUp(api=api)
@@ -283,6 +285,7 @@ def fuseSharedTestHelper(mounttmp):
Test().runTest()
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
@@ -343,6 +346,7 @@ def fuseModifyFileTestHelperReadEndContents(mounttmp):
self.assertEqual("plnp", f.read())
Test().runTest()
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseModifyFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
@@ -363,6 +367,7 @@ class FuseModifyFileTest(MountTestBase):
self.pool.apply(fuseModifyFileTestHelperReadEndContents, (self.mounttmp,))
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseAddFileToCollectionTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
@@ -385,6 +390,7 @@ class FuseAddFileToCollectionTest(MountTestBase):
self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseRemoveFileFromCollectionTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
@@ -416,6 +422,7 @@ def fuseCreateFileTestHelper(mounttmp):
pass
Test().runTest()
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseCreateFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
@@ -459,6 +466,7 @@ def fuseWriteFileTestHelperReadFile(mounttmp):
self.assertEqual(f.read(), "Hello world!")
Test().runTest()
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseWriteFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
@@ -507,6 +515,7 @@ def fuseUpdateFileTestHelper(mounttmp):
Test().runTest()
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class FuseUpdateFileTest(MountTestBase):
def runTest(self):
collection = arvados.collection.Collection(api_client=self.api)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list