[arvados] created: 2.1.0-2962-g67e56f190

git repository hosting git at public.arvados.org
Tue Oct 18 18:45:45 UTC 2022


        at  67e56f190b9a78e3c45cc7d90510fc631e0d04b6 (commit)


commit 67e56f190b9a78e3c45cc7d90510fc631e0d04b6
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Oct 17 20:21:06 2022 -0400

    18842: Add disk cache, test python sdk/fuse disk_cache=true/false
    
    Off by default in the SDK but enabled by default with arv-mount
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 08a05d571..aa38d7383 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -319,7 +319,8 @@ def main(args=sys.argv[1:],
             # Make an API object now so errors are reported early.
             api_client.users().current().execute()
         if keep_client is None:
-            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+            block_cache = arvados.keep.KeepBlockCache(disk_cache=True)
+            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4, block_cache=block_cache)
         executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4, stdout=stdout)
     except WorkflowException as e:
         logger.error(e, exc_info=(sys.exc_info()[1] if arvargs.debug else False))
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
new file mode 100644
index 000000000..24f249f1d
--- /dev/null
+++ b/sdk/python/arvados/diskcache.py
@@ -0,0 +1,113 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import threading
+import mmap
+import os
+import traceback
+import stat
+import tempfile
+import hashlib
+
+class DiskCacheSlot(object):
+    __slots__ = ("locator", "ready", "content", "cachedir")
+
+    def __init__(self, locator, cachedir):
+        self.locator = locator
+        self.ready = threading.Event()
+        self.content = None
+        self.cachedir = cachedir
+
+    def get(self):
+        self.ready.wait()
+        return self.content
+
+    def set(self, value):
+        try:
+            if value is None:
+                self.content = None
+                return
+
+            if len(value) == 0:
+                # Can't mmap a 0 length file
+                self.content = b''
+                return
+
+            if self.content is not None:
+                # Has been set already
+                return
+
+            blockdir = os.path.join(self.cachedir, self.locator[0:3])
+            os.makedirs(blockdir, mode=0o700, exist_ok=True)
+
+            final = os.path.join(blockdir, self.locator)
+
+            f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
+            tmpfile = f.name
+            os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
+            f.write(value)
+            f.flush()
+            os.rename(tmpfile, final)
+
+            self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
+        except Exception as e:
+            traceback.print_exc()
+        finally:
+            self.ready.set()
+
+    def size(self):
+        if self.content is None:
+            return 0
+        else:
+            return len(self.content)
+
+    def evict(self):
+        if self.content is not None and len(self.content) > 0:
+            # The mmap region might be in use when we decided to evict
+            # it.  This can happen if the cache is too small.
+            #
+            # If we call close() now, it'll throw an error if
+            # something tries to access it.
+            #
+            # However, we don't need to explicitly call mmap.close()
+            #
+            # I confirmed in mmapmodule.c that that both close
+            # and deallocate do the same thing:
+            #
+            # a) close the file descriptor
+            # b) unmap the memory range
+            #
+            # So we can forget it in the cache and delete the file on
+            # disk, and it will tear it down after any other
+            # lingering Python references to the mapped memory are
+            # gone.
+
+            blockdir = os.path.join(self.cachedir, self.locator[0:3])
+            final = os.path.join(blockdir, self.locator)
+            try:
+                os.remove(final)
+            except OSError:
+                pass
+
+    @staticmethod
+    def get_from_disk(locator, cachedir):
+        # Get it, check it, return it
+        blockdir = os.path.join(cachedir, locator[0:3])
+        final = os.path.join(blockdir, locator)
+
+        try:
+            filehandle = open(final, "rb")
+            content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
+            disk_md5 = hashlib.md5(content).hexdigest()
+            if disk_md5 == locator:
+                dc = DiskCacheSlot(locator, cachedir)
+                dc.content = content
+                dc.ready.set()
+                return dc
+        except FileNotFoundError:
+            pass
+        except Exception as e:
+            traceback.print_exc()
+
+        return None
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 44e915776..7f316c153 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -26,6 +26,7 @@ import socket
 import ssl
 import sys
 import threading
+import resource
 from . import timer
 import urllib.parse
 
@@ -39,6 +40,7 @@ import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
+import arvados.diskcache
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -174,11 +176,36 @@ class Keep(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
-    # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
+        self._max_slots = max_slots
+        self._disk_cache = disk_cache
+        self._disk_cache_dir = disk_cache_dir
+
+        if self._disk_cache and self._disk_cache_dir is None:
+            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+        if self._max_slots == 0:
+            if self._disk_cache:
+                # default set max slots to half of maximum file handles
+                self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+            else:
+                self._max_slots = 1024
+
+        if self.cache_max == 0:
+            if self._disk_cache:
+                fs = os.statvfs(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize) / 2
+                # Half the available space or max_slots * 64 MiB
+                self.cache_max = min(avail, (self._max_slots * 64 * 1024 * 1024))
+            else:
+                # 256 GiB in RAM
+                self.cache_max = (256 * 1024 * 1024)
+
+        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
@@ -202,6 +229,9 @@ class KeepBlockCache(object):
             else:
                 return len(self.content)
 
+        def evict(self):
+            pass
+
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
         with self._cache_lock:
@@ -209,9 +239,10 @@ class KeepBlockCache(object):
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and sm > self.cache_max:
+            while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
                 for i in range(len(self._cache)-1, -1, -1):
                     if self._cache[i].ready.is_set():
+                        self._cache[i].evict()
                         del self._cache[i]
                         break
                 sm = sum([slot.size() for slot in self._cache])
@@ -226,6 +257,12 @@ class KeepBlockCache(object):
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
+        if self._disk_cache:
+            # see if it exists on disk
+            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+            if n is not None:
+                self._cache.insert(0, n)
+                return n
         return None
 
     def get(self, locator):
@@ -241,7 +278,10 @@ class KeepBlockCache(object):
                 return n, False
             else:
                 # Add a new cache slot for the locator
-                n = KeepBlockCache.CacheSlot(locator)
+                if self._disk_cache:
+                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+                else:
+                    n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
 
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index d28df0998..af60b6d38 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -61,6 +61,6 @@ setup(name='arvados-python-client',
           'Programming Language :: Python :: 3',
       ],
       test_suite='tests',
-      tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML'],
+      tests_require=['pbr<1.7.0', 'mock>=1.0,<4', 'PyYAML', 'parameterized'],
       zip_safe=False
       )
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index d9b3ca86c..a574508cb 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -280,3 +280,18 @@ if sys.version_info < (3, 0):
         return self.assertNotRegexpMatches(*args, **kwargs)
     unittest.TestCase.assertRegex = assertRegex
     unittest.TestCase.assertNotRegex = assertNotRegex
+
+def binary_compare(a, b):
+    if len(a) != len(b):
+        return False
+    for i in range(0, len(a)):
+        if a[i] != b[i]:
+            return False
+    return True
+
+def make_block_cache(disk_cache):
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+    if disk_cache:
+        disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+        shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    return block_cache
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index b4849c21f..8aded823b 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -16,11 +16,13 @@ import datetime
 import ciso8601
 import time
 import unittest
+import parameterized
 
 from . import run_test_server
 from arvados._ranges import Range, LocatorAndRange
 from arvados.collection import Collection, CollectionReader
 from . import arvados_testutil as tutil
+from .arvados_testutil import make_block_cache
 
 class TestResumableWriter(arvados.ResumableCollectionWriter):
     KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
@@ -28,9 +30,10 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
     def current_state(self):
         return self.dump_state(copy.deepcopy)
 
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                              tutil.ArvadosBaseTestCase):
+    disk_cache = False
     MAIN_SERVER = {}
 
     @classmethod
@@ -40,7 +43,8 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         run_test_server.authorize_with('admin')
         cls.api_client = arvados.api('v1')
         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
-                                             local_store=cls.local_store)
+                                             local_store=cls.local_store,
+                                             block_cache=make_block_cache(cls.disk_cache))
 
     def write_foo_bar_baz(self):
         cw = arvados.CollectionWriter(self.api_client)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 87e4eefb2..126116393 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -15,12 +15,15 @@ import os
 import pycurl
 import random
 import re
+import shutil
 import socket
 import sys
 import time
 import unittest
 import urllib.parse
 
+import parameterized
+
 import arvados
 import arvados.retry
 import arvados.util
@@ -28,7 +31,11 @@ from . import arvados_testutil as tutil
 from . import keepstub
 from . import run_test_server
 
+from .arvados_testutil import make_block_cache
+
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepTestCase(run_test_server.TestCaseWithServers):
+    disk_cache = False
     MAIN_SERVER = {}
     KEEP_SERVER = {}
 
@@ -38,7 +45,8 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
         run_test_server.authorize_with("admin")
         cls.api_client = arvados.api('v1')
         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
-                                             proxy='', local_store='')
+                                             proxy='', local_store='',
+                                             block_cache=make_block_cache(cls.disk_cache))
 
     def test_KeepBasicRWTest(self):
         self.assertEqual(0, self.keep_client.upload_counter.get())
@@ -52,8 +60,8 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
         self.assertEqual(6, self.keep_client.upload_counter.get())
 
         self.assertEqual(0, self.keep_client.download_counter.get())
-        self.assertEqual(self.keep_client.get(foo_locator),
-                         b'foo',
+        self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
+                         b'foo'),
                          'wrong content from Keep.get(md5("foo"))')
         self.assertEqual(3, self.keep_client.download_counter.get())
 
@@ -128,13 +136,15 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
                          b'test_head',
                          'wrong content from Keep.get for "test_head"')
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
+    disk_cache = False
     MAIN_SERVER = {}
     KEEP_SERVER = {'blob_signing': True}
 
     def test_KeepBasicRWTest(self):
         run_test_server.authorize_with('active')
-        keep_client = arvados.KeepClient()
+        keep_client = arvados.KeepClient(block_cache=make_block_cache(self.disk_cache))
         foo_locator = keep_client.put('foo')
         self.assertRegex(
             foo_locator,
@@ -171,8 +181,9 @@ class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
                           keep_client.get,
                           unsigned_bar_locator)
 
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
+    disk_cache = False
     MAIN_SERVER = {}
     KEEP_SERVER = {}
     KEEP_PROXY_SERVER = {}
@@ -190,7 +201,7 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
         # Will use ARVADOS_KEEP_SERVICES environment variable that
         # is set by setUpClass().
         keep_client = arvados.KeepClient(api_client=self.api_client,
-                                         local_store='')
+                                         local_store='', block_cache=make_block_cache(self.disk_cache))
         baz_locator = keep_client.put('baz')
         self.assertRegex(
             baz_locator,
@@ -206,7 +217,8 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
         # existing proxy setting and setting multiple proxies
         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
         keep_client = arvados.KeepClient(api_client=self.api_client,
-                                         local_store='')
+                                         local_store='',
+                                         block_cache=make_block_cache(self.disk_cache))
         uris = [x['_service_root'] for x in keep_client._keep_services]
         self.assertEqual(uris, ['http://10.0.0.1/',
                                 'https://foo.example.org:1234/'])
@@ -215,12 +227,15 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
         with self.assertRaises(arvados.errors.ArgumentError):
             keep_client = arvados.KeepClient(api_client=self.api_client,
-                                             local_store='')
-
+                                             local_store='',
+                                             block_cache=make_block_cache(self.disk_cache))
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def get_service_roots(self, api_client):
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
         return [urllib.parse.urlparse(url) for url in sorted(services)]
 
@@ -239,7 +254,8 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_recognize_proxy_services_in_controller_response(self):
         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
-            service_type='proxy', service_host='localhost', service_port=9, count=1))
+            service_type='proxy', service_host='localhost', service_port=9, count=1),
+                                         block_cache=make_block_cache(self.disk_cache))
         try:
             # this will fail, but it ensures we get the service
             # discovery response
@@ -254,7 +270,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
         api_client.insecure = True
         with tutil.mock_keep_responses(b'foo', 200) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
@@ -265,7 +281,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
         api_client.insecure = False
         with tutil.mock_keep_responses(b'foo', 200) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
             # getopt()==None here means we didn't change the
             # default. If we were using real pycurl instead of a mock,
@@ -286,7 +302,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         headers = {'X-Keep-Locator':local_loc}
         with tutil.mock_keep_responses('', 200, **headers):
             # Check that the translated locator gets returned
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
             # Check that refresh_signature() uses the correct method and headers
             keep_client._get_or_head = mock.MagicMock()
@@ -305,7 +321,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.get('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -322,7 +338,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepWriteError):
                 keep_client.put(b'foo')
             self.assertEqual(
@@ -339,7 +355,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.head('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -356,7 +372,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.get('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -373,7 +389,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
         force_timeout = socket.timeout("timed out")
         with tutil.mock_keep_responses(force_timeout, 0) as mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             with self.assertRaises(arvados.errors.KeepReadError):
                 keep_client.head('ffffffffffffffffffffffffffffffff')
             self.assertEqual(
@@ -407,7 +423,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = mock.MagicMock(name='api_client')
         api_client.keep_services().accessible().execute.side_effect = (
             arvados.errors.ApiError)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         with self.assertRaises(exc_class) as err_check:
             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
         self.assertEqual(0, len(err_check.exception.request_errors()))
@@ -427,7 +443,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
         with req_mock, tutil.skip_sleep, \
                 self.assertRaises(exc_class) as err_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
                                        num_retries=3)
         self.assertEqual([502, 502], [
@@ -450,7 +466,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(count=3)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             keep_client.put(data)
         self.assertEqual(2, len(exc_check.exception.request_errors()))
 
@@ -460,7 +476,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-          keep_client = arvados.KeepClient(api_client=api_client)
+          keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
           keep_client.put(data)
         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
         self.assertEqual(0, len(exc_check.exception.request_errors()))
@@ -469,7 +485,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         body = b'oddball service get'
         api_client = self.mock_keep_services(service_type='fancynewblobstore')
         with tutil.mock_keep_responses(body, 200):
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             actual = keep_client.get(tutil.str_keep_locator(body))
         self.assertEqual(body, actual)
 
@@ -478,7 +494,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         pdh = tutil.str_keep_locator(body)
         api_client = self.mock_keep_services(service_type='fancynewblobstore')
         with tutil.mock_keep_responses(pdh, 200):
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             actual = keep_client.put(body, copies=1)
         self.assertEqual(pdh, actual)
 
@@ -490,17 +506,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         headers = {'x-keep-replicas-stored': 3}
         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
                                        **headers) as req_mock:
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             actual = keep_client.put(body, copies=2)
         self.assertEqual(pdh, actual)
         self.assertEqual(1, req_mock.call_count)
 
-
 @tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         self.data = b'xyzzy'
         self.locator = '1271ed5ef305aadabc605b1609e24c52'
 
@@ -524,19 +542,22 @@ class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_head_and_then_get_return_different_responses(self, get_mock):
         head_resp = None
         get_resp = None
-        get_mock.side_effect = ['first response', 'second response']
+        get_mock.side_effect = [b'first response', b'second response']
         with tutil.mock_keep_responses(self.data, 200, 200):
             head_resp = self.keep_client.head(self.locator)
             get_resp = self.keep_client.get(self.locator)
-        self.assertEqual('first response', head_resp)
+        self.assertEqual(b'first response', head_resp)
         # First reponse was not cached because it was from a HEAD request.
         self.assertNotEqual(head_resp, get_resp)
 
 @tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         self.data = b'xyzzy'
         self.locator = '1271ed5ef305aadabc605b1609e24c52'
 
@@ -550,7 +571,7 @@ class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
             }
         }
         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         resp_hdr = {
             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
             'x-keep-replicas-stored': 1
@@ -644,10 +665,13 @@ class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
 
 @tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         self.data = b'xyzzy'
         self.locator = '1271ed5ef305aadabc605b1609e24c52'
         self.test_id = arvados.util.new_request_id()
@@ -732,7 +756,9 @@ class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
 
 
 @tutil.skip_sleep
+#@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
 
     def setUp(self):
         # expected_order[i] is the probe order for
@@ -755,7 +781,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
             hashlib.md5(self.blocks[x]).hexdigest()
             for x in range(len(self.expected_order))]
         self.api_client = self.mock_keep_services(count=self.services)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
 
     def test_weighted_service_roots_against_reference_set(self):
         # Confirm weighted_service_roots() returns the correct order
@@ -828,12 +854,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
         initial_services = 12
         self.api_client = self.mock_keep_services(count=initial_services)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
         probes_before = [
             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
         for added_services in range(1, 12):
             api_client = self.mock_keep_services(count=initial_services+added_services)
-            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
             total_penalty = 0
             for hash_index in range(len(hashes)):
                 probe_after = keep_client.weighted_service_roots(
@@ -869,7 +895,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
         # Arbitrary port number:
         aport = random.randint(1024,65535)
         api_client = self.mock_keep_services(service_port=aport, count=self.services)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
         with mock.patch('pycurl.Curl') as curl_mock, \
              self.assertRaises(exc_class) as err_check:
             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
@@ -885,8 +911,10 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_put_error_shows_probe_order(self):
         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
 
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
+    disk_cache = False
+
     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
     # 1s worth of data and then trigger bandwidth errors before running
     # out of data.
@@ -923,7 +951,7 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
         return arvados.KeepClient(
             api_client=self.api_client,
-            timeout=timeouts)
+            timeout=timeouts, block_cache=make_block_cache(self.disk_cache))
 
     def test_timeout_slow_connect(self):
         # Can't simulate TCP delays with our own socket. Leave our
@@ -1027,8 +1055,10 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
 
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
+    disk_cache = False
+
     def mock_disks_and_gateways(self, disks=3, gateways=1):
         self.gateways = [{
                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
@@ -1043,7 +1073,7 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
             for gw in self.gateways]
         self.api_client = self.mock_keep_services(
             count=disks, additional_services=self.gateways)
-        self.keepClient = arvados.KeepClient(api_client=self.api_client)
+        self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
 
     @mock.patch('pycurl.Curl')
     def test_get_with_gateway_hint_first(self, MockCurl):
@@ -1124,8 +1154,9 @@ class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
                          MockCurl.return_value.getopt(pycurl.URL).decode())
 
-
 class KeepClientRetryTestMixin(object):
+    disk_cache = False
+
     # Testing with a local Keep store won't exercise the retry behavior.
     # Instead, our strategy is:
     # * Create a client with one proxy specified (pointed at a black
@@ -1150,6 +1181,7 @@ class KeepClientRetryTestMixin(object):
     def new_client(self, **caller_kwargs):
         kwargs = self.client_kwargs.copy()
         kwargs.update(caller_kwargs)
+        kwargs['block_cache'] = make_block_cache(self.disk_cache)
         return arvados.KeepClient(**kwargs)
 
     def run_method(self, *args, **kwargs):
@@ -1199,6 +1231,7 @@ class KeepClientRetryTestMixin(object):
 
 
 @tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
@@ -1243,6 +1276,7 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_success(locator=self.HINTED_LOCATOR)
 
 @tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = True
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
@@ -1281,6 +1315,7 @@ class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_success(locator=self.HINTED_LOCATOR)
 
 @tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
@@ -1369,13 +1404,16 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
 
 
 @tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
+    block_cache = False
+
     # Test put()s that need two distinct servers to succeed, possibly
     # requiring multiple passes through the retry loop.
 
     def setUp(self):
         self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
 
     def test_success_after_exception(self):
         with tutil.mock_keep_responses(
@@ -1402,7 +1440,10 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
                 self.keep_client.put('foo', num_retries=1, copies=2)
         self.assertEqual(2, req_mock.call_count)
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class KeepClientAPIErrorTest(unittest.TestCase):
+    disk_cache = False
+
     def test_api_fail(self):
         class ApiMock(object):
             def __getattr__(self, r):
@@ -1415,7 +1456,8 @@ class KeepClientAPIErrorTest(unittest.TestCase):
                 else:
                     raise arvados.errors.KeepReadError()
         keep_client = arvados.KeepClient(api_client=ApiMock(),
-                                             proxy='', local_store='')
+                                         proxy='', local_store='',
+                                         block_cache=make_block_cache(self.disk_cache))
 
         # The bug this is testing for is that if an API (not
         # keepstore) exception is thrown as part of a get(), the next
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 5f0a1f80f..994c99882 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -83,13 +83,20 @@ class ArgumentParser(argparse.ArgumentParser):
                             type=str, metavar='PATH', action='append', default=[],
                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
 
+
         self.add_argument('--debug', action='store_true', help="""Debug mode""")
         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
 
-        self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
-        self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
+        self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
+        self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
+
+        cachetype = self.add_mutually_exclusive_group()
+        cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
+        cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
+
+        self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
 
         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
 
@@ -213,8 +220,12 @@ class Mount(object):
         try:
             self.api = arvados.safeapi.ThreadSafeApiCache(
                 apiconfig=arvados.config.settings(),
+                # default value of file_cache is 0, this tells KeepBlockCache to
+                # choose a default based on whether disk_cache is enabled or not.
                 keep_params={
-                    'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+                    'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
+                                                               disk_cache=self.args.disk_cache,
+                                                               disk_cache_dir=self.args.disk_cache_dir),
                     'num_retries': self.args.retries,
                 })
         except KeyError as e:
diff --git a/services/fuse/setup.py b/services/fuse/setup.py
index 545b4bfa0..d0c46f132 100644
--- a/services/fuse/setup.py
+++ b/services/fuse/setup.py
@@ -59,6 +59,6 @@ setup(name='arvados_fuse',
           'Programming Language :: Python :: 3',
       ],
       test_suite='tests',
-      tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
+      tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML', 'parameterized',],
       zip_safe=False
       )
diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py
index 7cf8aa373..a24005064 100644
--- a/services/fuse/tests/mount_test_base.py
+++ b/services/fuse/tests/mount_test_base.py
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 import arvados
+import arvados.keep
 import arvados_fuse as fuse
 import arvados.safeapi
 import llfuse
@@ -24,7 +25,16 @@ logger = logging.getLogger('arvados.arv-mount')
 
 from .integration_test import workerPool
 
+def make_block_cache(disk_cache):
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
+    if disk_cache:
+        disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+        shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    return block_cache
+
 class MountTestBase(unittest.TestCase):
+    block_cache = False
+
     def setUp(self, api=None, local_store=True):
         # The underlying C implementation of open() makes a fstat() syscall
         # with the GIL still held.  When the GETATTR message comes back to
@@ -43,7 +53,8 @@ class MountTestBase(unittest.TestCase):
         self.mounttmp = tempfile.mkdtemp()
         run_test_server.run()
         run_test_server.authorize_with("admin")
-        self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
+
+        self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings(), keep_params={"block_cache": make_block_cache(self.block_cache)})
         self.llfuse_thread = None
 
     # This is a copy of Mount's method.  TODO: Refactor MountTestBase
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 1601db594..df3d42634 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -16,6 +16,7 @@ import subprocess
 import time
 import unittest
 import tempfile
+import parameterized
 
 import arvados
 import arvados_fuse as fuse
@@ -54,7 +55,7 @@ class AssertWithTimeout(object):
         else:
             self.done = True
 
-
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseMountTest(MountTestBase):
     def setUp(self):
         super(FuseMountTest, self).setUp()
@@ -125,6 +126,7 @@ class FuseMountTest(MountTestBase):
                 self.assertEqual(v, f.read().decode())
 
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseMagicTest(MountTestBase):
     def setUp(self, api=None):
         super(FuseMagicTest, self).setUp(api=api)
@@ -283,6 +285,7 @@ def fuseSharedTestHelper(mounttmp):
 
     Test().runTest()
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.SharedDirectory,
@@ -343,6 +346,7 @@ def fuseModifyFileTestHelperReadEndContents(mounttmp):
                 self.assertEqual("plnp", f.read())
     Test().runTest()
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseModifyFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -363,6 +367,7 @@ class FuseModifyFileTest(MountTestBase):
         self.pool.apply(fuseModifyFileTestHelperReadEndContents, (self.mounttmp,))
 
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseAddFileToCollectionTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -385,6 +390,7 @@ class FuseAddFileToCollectionTest(MountTestBase):
         self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
 
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseRemoveFileFromCollectionTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -416,6 +422,7 @@ def fuseCreateFileTestHelper(mounttmp):
                 pass
     Test().runTest()
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseCreateFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -459,6 +466,7 @@ def fuseWriteFileTestHelperReadFile(mounttmp):
                 self.assertEqual(f.read(), "Hello world!")
     Test().runTest()
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseWriteFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)
@@ -507,6 +515,7 @@ def fuseUpdateFileTestHelper(mounttmp):
 
     Test().runTest()
 
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
 class FuseUpdateFileTest(MountTestBase):
     def runTest(self):
         collection = arvados.collection.Collection(api_client=self.api)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list