[arvados] updated: 2.7.1-51-gc06d01222a

git repository hosting git at public.arvados.org
Fri Apr 5 13:48:08 UTC 2024


Summary of changes:
 lib/crunchrun/integration_test.go        |   2 -
 sdk/python/arvados/arvfile.py            |  19 ++--
 sdk/python/arvados/diskcache.py          | 129 ++++++++++++------------
 sdk/python/arvados/keep.py               |  88 ++++++++--------
 sdk/python/tests/test_keep_client.py     | 166 +++++++------------------------
 sdk/python/tests/test_storage_classes.py | 128 ++++++++++++++++++++++++
 services/fuse/arvados_fuse/command.py    |  12 +--
 7 files changed, 292 insertions(+), 252 deletions(-)
 create mode 100644 sdk/python/tests/test_storage_classes.py

       via  c06d01222a65a21674b31d76f0bfbcc4facb253a (commit)
       via  70e89bdbe674c53f39625523341c92dd6ecd5109 (commit)
      from  0c5bbc10c79f5ed4a67c475762fd2e1c574a9b71 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c06d01222a65a21674b31d76f0bfbcc4facb253a
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Apr 4 12:36:15 2024 -0400

    Merge branch '21639-keep-cache-dict' refs #21639
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 4b95835aac..0cc7d25a33 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1060,7 +1060,8 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
+            if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+                prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
 
         locs = set()
         data = []
@@ -1068,17 +1069,21 @@ class ArvadosFile(object):
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
                 blockview = memoryview(block)
-                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
                 locs.add(lr.locator)
             else:
                 break
 
-        for lr in prefetch:
-            if lr.locator not in locs:
-                self.parent._my_block_manager().block_prefetch(lr.locator)
-                locs.add(lr.locator)
+        if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+            for lr in prefetch:
+                if lr.locator not in locs:
+                    self.parent._my_block_manager().block_prefetch(lr.locator)
+                    locs.add(lr.locator)
 
-        return b''.join(data)
+        if len(data) == 1:
+            return data[0]
+        else:
+            return b''.join(data)
 
     @must_be_writable
     @synchronized
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index f8fca57803..528a7d28b5 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -13,6 +13,7 @@ import time
 import errno
 import logging
 import weakref
+import collections
 
 _logger = logging.getLogger('arvados.keep')
 
@@ -31,6 +32,15 @@ class DiskCacheSlot(object):
 
     def get(self):
         self.ready.wait()
+        # 'content' can None, an empty byte string, or a nonempty mmap
+        # region.  If it is an mmap region, we want to advise the
+        # kernel we're going to use it.  This nudges the kernel to
+        # re-read most or all of the block if necessary (instead of
+        # just a few pages at a time), reducing the number of page
+        # faults and improving performance by 4x compared to not
+        # calling madvise.
+        if self.content:
+            self.content.madvise(mmap.MADV_WILLNEED)
         return self.content
 
     def set(self, value):
@@ -39,18 +49,18 @@ class DiskCacheSlot(object):
             if value is None:
                 self.content = None
                 self.ready.set()
-                return
+                return False
 
             if len(value) == 0:
                 # Can't mmap a 0 length file
                 self.content = b''
                 self.ready.set()
-                return
+                return True
 
             if self.content is not None:
                 # Has been set already
                 self.ready.set()
-                return
+                return False
 
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             os.makedirs(blockdir, mode=0o700, exist_ok=True)
@@ -73,6 +83,7 @@ class DiskCacheSlot(object):
             self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
             # only set the event when mmap is successful
             self.ready.set()
+            return True
         finally:
             if tmpfile is not None:
                 # If the tempfile hasn't been renamed on disk yet, try to delete it.
@@ -95,65 +106,61 @@ class DiskCacheSlot(object):
             return len(self.content)
 
     def evict(self):
-        if self.content is not None and len(self.content) > 0:
-            # The mmap region might be in use when we decided to evict
-            # it.  This can happen if the cache is too small.
-            #
-            # If we call close() now, it'll throw an error if
-            # something tries to access it.
-            #
-            # However, we don't need to explicitly call mmap.close()
-            #
-            # I confirmed in mmapmodule.c that that both close
-            # and deallocate do the same thing:
+        if not self.content:
+            return
+
+        # The mmap region might be in use when we decided to evict
+        # it.  This can happen if the cache is too small.
+        #
+        # If we call close() now, it'll throw an error if
+        # something tries to access it.
+        #
+        # However, we don't need to explicitly call mmap.close()
+        #
+        # I confirmed in mmapmodule.c that that both close
+        # and deallocate do the same thing:
+        #
+        # a) close the file descriptor
+        # b) unmap the memory range
+        #
+        # So we can forget it in the cache and delete the file on
+        # disk, and it will tear it down after any other
+        # lingering Python references to the mapped memory are
+        # gone.
+
+        blockdir = os.path.join(self.cachedir, self.locator[0:3])
+        final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+        try:
+            fcntl.flock(self.filehandle, fcntl.LOCK_UN)
+
+            # try to get an exclusive lock, this ensures other
+            # processes are not using the block.  It is
+            # nonblocking and will throw an exception if we
+            # can't get it, which is fine because that means
+            # we just won't try to delete it.
             #
-            # a) close the file descriptor
-            # b) unmap the memory range
+            # I should note here, the file locking is not
+            # strictly necessary, we could just remove it and
+            # the kernel would ensure that the underlying
+            # inode remains available as long as other
+            # processes still have the file open.  However, if
+            # you have multiple processes sharing the cache
+            # and deleting each other's files, you'll end up
+            # with a bunch of ghost files that don't show up
+            # in the file system but are still taking up
+            # space, which isn't particularly user friendly.
+            # The locking strategy ensures that cache blocks
+            # in use remain visible.
             #
-            # So we can forget it in the cache and delete the file on
-            # disk, and it will tear it down after any other
-            # lingering Python references to the mapped memory are
-            # gone.
-
-            blockdir = os.path.join(self.cachedir, self.locator[0:3])
-            final = os.path.join(blockdir, self.locator) + cacheblock_suffix
-            try:
-                fcntl.flock(self.filehandle, fcntl.LOCK_UN)
-
-                # try to get an exclusive lock, this ensures other
-                # processes are not using the block.  It is
-                # nonblocking and will throw an exception if we
-                # can't get it, which is fine because that means
-                # we just won't try to delete it.
-                #
-                # I should note here, the file locking is not
-                # strictly necessary, we could just remove it and
-                # the kernel would ensure that the underlying
-                # inode remains available as long as other
-                # processes still have the file open.  However, if
-                # you have multiple processes sharing the cache
-                # and deleting each other's files, you'll end up
-                # with a bunch of ghost files that don't show up
-                # in the file system but are still taking up
-                # space, which isn't particularly user friendly.
-                # The locking strategy ensures that cache blocks
-                # in use remain visible.
-                #
-                fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
-
-                os.remove(final)
-                return True
-            except OSError:
-                pass
-            finally:
-                self.filehandle = None
-                self.linger = weakref.ref(self.content)
-                self.content = None
-        return False
+            fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
 
-    def gone(self):
-        # Test if an evicted object is lingering
-        return self.content is None and (self.linger is None or self.linger() is None)
+            os.remove(final)
+            return True
+        except OSError:
+            pass
+        finally:
+            self.filehandle = None
+            self.content = None
 
     @staticmethod
     def get_from_disk(locator, cachedir):
@@ -237,13 +244,13 @@ class DiskCacheSlot(object):
 
         # Map in all the files we found, up to maxslots, if we exceed
         # maxslots, start throwing things out.
-        cachelist = []
+        cachelist: collections.OrderedDict = collections.OrderedDict()
         for b in blocks:
             got = DiskCacheSlot.get_from_disk(b[0], cachedir)
             if got is None:
                 continue
             if len(cachelist) < maxslots:
-                cachelist.append(got)
+                cachelist[got.locator] = got
             else:
                 # we found more blocks than maxslots, try to
                 # throw it out of the cache.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4b00f7df8b..a824621079 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -182,7 +182,7 @@ class Keep(object):
 class KeepBlockCache(object):
     def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
-        self._cache = []
+        self._cache = collections.OrderedDict()
         self._cache_lock = threading.Lock()
         self._max_slots = max_slots
         self._disk_cache = disk_cache
@@ -233,11 +233,13 @@ class KeepBlockCache(object):
 
         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 
+        self.cache_total = 0
         if self._disk_cache:
             self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            for slot in self._cache.values():
+                self.cache_total += slot.size()
             self.cap_cache()
 
-
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
 
@@ -251,8 +253,11 @@ class KeepBlockCache(object):
             return self.content
 
         def set(self, value):
+            if self.content is not None:
+                return False
             self.content = value
             self.ready.set()
+            return True
 
         def size(self):
             if self.content is None:
@@ -262,42 +267,25 @@ class KeepBlockCache(object):
 
         def evict(self):
             self.content = None
-            return self.gone()
 
-        def gone(self):
-            return (self.content is None)
 
     def _resize_cache(self, cache_max, max_slots):
         # Try and make sure the contents of the cache do not exceed
         # the supplied maximums.
 
-        # Select all slots except those where ready.is_set() and content is
-        # None (that means there was an error reading the block).
-        self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-        sm = sum([slot.size() for slot in self._cache])
-        while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
-            for i in range(len(self._cache)-1, -1, -1):
-                # start from the back, find a slot that is a candidate to evict
-                if self._cache[i].ready.is_set():
-                    sz = self._cache[i].size()
-
-                    # If evict returns false it means the
-                    # underlying disk cache couldn't lock the file
-                    # for deletion because another process was using
-                    # it. Don't count it as reducing the amount
-                    # of data in the cache, find something else to
-                    # throw out.
-                    if self._cache[i].evict():
-                        sm -= sz
-
-                    # check to make sure the underlying data is gone
-                    if self._cache[i].gone():
-                        # either way we forget about it.  either the
-                        # other process will delete it, or if we need
-                        # it again and it is still there, we'll find
-                        # it on disk.
-                        del self._cache[i]
-                    break
+        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
+            return
+
+        _evict_candidates = collections.deque(self._cache.values())
+        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
+            slot = _evict_candidates.popleft()
+            if not slot.ready.is_set():
+                continue
+
+            sz = slot.size()
+            slot.evict()
+            self.cache_total -= sz
+            del self._cache[slot.locator]
 
 
     def cap_cache(self):
@@ -308,19 +296,19 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in range(0, len(self._cache)):
-            if self._cache[i].locator == locator:
-                n = self._cache[i]
-                if i != 0:
-                    # move it to the front
-                    del self._cache[i]
-                    self._cache.insert(0, n)
-                return n
+        if locator in self._cache:
+            n = self._cache[locator]
+            if n.ready.is_set() and n.content is None:
+                del self._cache[n.locator]
+                return None
+            self._cache.move_to_end(locator)
+            return n
         if self._disk_cache:
             # see if it exists on disk
             n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
             if n is not None:
-                self._cache.insert(0, n)
+                self._cache[n.locator] = n
+                self.cache_total += n.size()
                 return n
         return None
 
@@ -350,12 +338,13 @@ class KeepBlockCache(object):
                     n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
                 else:
                     n = KeepBlockCache.CacheSlot(locator)
-                self._cache.insert(0, n)
+                self._cache[n.locator] = n
                 return n, True
 
     def set(self, slot, blob):
         try:
-            slot.set(blob)
+            if slot.set(blob):
+                self.cache_total += slot.size()
             return
         except OSError as e:
             if e.errno == errno.ENOMEM:
@@ -365,7 +354,7 @@ class KeepBlockCache(object):
             elif e.errno == errno.ENOSPC:
                 # Reduce disk max space to current - 256 MiB, cap cache and retry
                 with self._cache_lock:
-                    sm = sum([st.size() for st in self._cache])
+                    sm = sum(st.size() for st in self._cache.values())
                     self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
             elif e.errno == errno.ENODEV:
                 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
@@ -383,7 +372,8 @@ class KeepBlockCache(object):
             # exception handler adjusts limits downward in some cases
             # to free up resources, which would make the operation
             # succeed.
-            slot.set(blob)
+            if slot.set(blob):
+                self.cache_total += slot.size()
         except Exception as e:
             # It failed again.  Give up.
             slot.set(None)
@@ -924,7 +914,10 @@ class KeepClient(object):
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
-        self.num_prefetch_threads = num_prefetch_threads or 2
+        if num_prefetch_threads is not None:
+            self.num_prefetch_threads = num_prefetch_threads
+        else:
+            self.num_prefetch_threads = 2
         self._prefetch_queue = None
         self._prefetch_threads = None
 
@@ -1426,6 +1419,9 @@ class KeepClient(object):
         does not block.
         """
 
+        if self.block_cache.get(locator) is not None:
+            return
+
         self._start_prefetch_threads()
         self._prefetch_queue.put(locator)
 
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index f472c0830e..ebc8431dc2 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -11,6 +11,7 @@ from builtins import range
 from builtins import object
 import hashlib
 import mock
+from mock import patch
 import os
 import errno
 import pycurl
@@ -24,6 +25,7 @@ import tempfile
 import time
 import unittest
 import urllib.parse
+import mmap
 
 import parameterized
 
@@ -625,122 +627,6 @@ class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheB
 
 
 
- at tutil.skip_sleep
- at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
-    disk_cache = False
-
-    def setUp(self):
-        self.api_client = self.mock_keep_services(count=2)
-        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
-        self.data = b'xyzzy'
-        self.locator = '1271ed5ef305aadabc605b1609e24c52'
-
-    def tearDown(self):
-        DiskCacheBase.tearDown(self)
-
-    def test_multiple_default_storage_classes_req_header(self):
-        api_mock = self.api_client_mock()
-        api_mock.config.return_value = {
-            'StorageClasses': {
-                'foo': { 'Default': True },
-                'bar': { 'Default': True },
-                'baz': { 'Default': False }
-            }
-        }
-        api_client = self.mock_keep_services(api_mock=api_mock, count=2)
-        keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
-        resp_hdr = {
-            'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
-            'x-keep-replicas-stored': 1
-        }
-        with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
-            keep_client.put(self.data, copies=1)
-            req_hdr = mock.responses[0]
-            self.assertIn(
-                'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
-
-    def test_storage_classes_req_header(self):
-        self.assertEqual(
-            self.api_client.config()['StorageClasses'],
-            {'default': {'Default': True}})
-        cases = [
-            # requested, expected
-            [['foo'], 'X-Keep-Storage-Classes: foo'],
-            [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
-            [[], 'X-Keep-Storage-Classes: default'],
-            [None, 'X-Keep-Storage-Classes: default'],
-        ]
-        for req_classes, expected_header in cases:
-            headers = {'x-keep-replicas-stored': 1}
-            if req_classes is None or len(req_classes) == 0:
-                confirmed_hdr = 'default=1'
-            elif len(req_classes) > 0:
-                confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
-            headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
-            with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
-                self.keep_client.put(self.data, copies=1, classes=req_classes)
-                req_hdr = mock.responses[0]
-                self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
-
-    def test_partial_storage_classes_put(self):
-        headers = {
-            'x-keep-replicas-stored': 1,
-            'x-keep-storage-classes-confirmed': 'foo=1'}
-        with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
-            with self.assertRaises(arvados.errors.KeepWriteError):
-                self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
-            # 1st request, both classes pending
-            req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
-            self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
-            # 2nd try, 'foo' class already satisfied
-            req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
-            self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
-
-    def test_successful_storage_classes_put_requests(self):
-        cases = [
-            # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
-            [ 1, ['foo'], 1, 'foo=1', 1],
-            [ 1, ['foo'], 2, 'foo=2', 1],
-            [ 2, ['foo'], 2, 'foo=2', 1],
-            [ 2, ['foo'], 1, 'foo=1', 2],
-            [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
-            [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
-            [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
-            [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
-            [ 1, ['foo', 'bar'], 1, None, 1],
-            [ 1, ['foo'], 1, None, 1],
-            [ 2, ['foo'], 2, None, 1],
-            [ 2, ['foo'], 1, None, 2],
-        ]
-        for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
-            headers = {'x-keep-replicas-stored': c_copies}
-            if c_classes is not None:
-                headers.update({'x-keep-storage-classes-confirmed': c_classes})
-            with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
-                case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
-                self.assertEqual(self.locator,
-                    self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
-                    case_desc)
-                self.assertEqual(e_reqs, mock.call_count, case_desc)
-
-    def test_failed_storage_classes_put_requests(self):
-        cases = [
-            # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
-            [ 1, ['foo'], 1, 'bar=1', 200],
-            [ 1, ['foo'], 1, None, 503],
-            [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
-            [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
-            [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
-        ]
-        for w_copies, w_classes, c_copies, c_classes, return_code in cases:
-            headers = {'x-keep-replicas-stored': c_copies}
-            if c_classes is not None:
-                headers.update({'x-keep-storage-classes-confirmed': c_classes})
-            with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
-                case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
-                with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
-                    self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
 
 @tutil.skip_sleep
 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
@@ -1757,21 +1643,31 @@ class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
                 keep_client.get(self.locator)
 
 
-    @mock.patch('mmap.mmap')
-    def test_disk_cache_retry_write_error(self, mockmmap):
+    def test_disk_cache_retry_write_error(self):
         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
                                                   disk_cache_dir=self.disk_cache_dir)
 
         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
 
-        mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
+        called = False
+        realmmap = mmap.mmap
+        def sideeffect_mmap(*args, **kwargs):
+            nonlocal called
+            if not called:
+                called = True
+                raise OSError(errno.ENOSPC, "no space")
+            else:
+                return realmmap(*args, **kwargs)
 
-        cache_max_before = block_cache.cache_max
+        with patch('mmap.mmap') as mockmmap:
+            mockmmap.side_effect = sideeffect_mmap
 
-        with tutil.mock_keep_responses(self.data, 200) as mock:
-            self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+            cache_max_before = block_cache.cache_max
 
-        self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+            with tutil.mock_keep_responses(self.data, 200) as mock:
+                self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+            self.assertIsNotNone(keep_client.get_from_cache(self.locator))
 
         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
             self.assertTrue(tutil.binary_compare(f.read(), self.data))
@@ -1780,21 +1676,31 @@ class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertTrue(cache_max_before > block_cache.cache_max)
 
 
-    @mock.patch('mmap.mmap')
-    def test_disk_cache_retry_write_error2(self, mockmmap):
+    def test_disk_cache_retry_write_error2(self):
         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
                                                   disk_cache_dir=self.disk_cache_dir)
 
         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
 
-        mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
+        called = False
+        realmmap = mmap.mmap
+        def sideeffect_mmap(*args, **kwargs):
+            nonlocal called
+            if not called:
+                called = True
+                raise OSError(errno.ENOMEM, "no memory")
+            else:
+                return realmmap(*args, **kwargs)
 
-        slots_before = block_cache._max_slots
+        with patch('mmap.mmap') as mockmmap:
+            mockmmap.side_effect = sideeffect_mmap
 
-        with tutil.mock_keep_responses(self.data, 200) as mock:
-            self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+            slots_before = block_cache._max_slots
 
-        self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+            with tutil.mock_keep_responses(self.data, 200) as mock:
+                self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+            self.assertIsNotNone(keep_client.get_from_cache(self.locator))
 
         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
             self.assertTrue(tutil.binary_compare(f.read(), self.data))
diff --git a/sdk/python/tests/test_storage_classes.py b/sdk/python/tests/test_storage_classes.py
new file mode 100644
index 0000000000..21bacc310a
--- /dev/null
+++ b/sdk/python/tests/test_storage_classes.py
@@ -0,0 +1,128 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import pycurl
+
+import unittest
+import parameterized
+from . import arvados_testutil as tutil
+from .arvados_testutil import DiskCacheBase
+
+ at tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+    disk_cache = False
+
+    def setUp(self):
+        self.api_client = self.mock_keep_services(count=2)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
+        self.data = b'xyzzy'
+        self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+    def tearDown(self):
+        DiskCacheBase.tearDown(self)
+
+    def test_multiple_default_storage_classes_req_header(self):
+        api_mock = self.api_client_mock()
+        api_mock.config.return_value = {
+            'StorageClasses': {
+                'foo': { 'Default': True },
+                'bar': { 'Default': True },
+                'baz': { 'Default': False }
+            }
+        }
+        api_client = self.mock_keep_services(api_mock=api_mock, count=2)
+        keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+        resp_hdr = {
+            'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
+            'x-keep-replicas-stored': 1
+        }
+        with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
+            keep_client.put(self.data, copies=1)
+            req_hdr = mock.responses[0]
+            self.assertIn(
+                'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
+
+    def test_storage_classes_req_header(self):
+        self.assertEqual(
+            self.api_client.config()['StorageClasses'],
+            {'default': {'Default': True}})
+        cases = [
+            # requested, expected
+            [['foo'], 'X-Keep-Storage-Classes: foo'],
+            [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
+            [[], 'X-Keep-Storage-Classes: default'],
+            [None, 'X-Keep-Storage-Classes: default'],
+        ]
+        for req_classes, expected_header in cases:
+            headers = {'x-keep-replicas-stored': 1}
+            if req_classes is None or len(req_classes) == 0:
+                confirmed_hdr = 'default=1'
+            elif len(req_classes) > 0:
+                confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
+            headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
+            with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
+                self.keep_client.put(self.data, copies=1, classes=req_classes)
+                req_hdr = mock.responses[0]
+                self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
+
+    def test_partial_storage_classes_put(self):
+        headers = {
+            'x-keep-replicas-stored': 1,
+            'x-keep-storage-classes-confirmed': 'foo=1'}
+        with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
+            # 1st request, both classes pending
+            req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
+            self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
+            # 2nd try, 'foo' class already satisfied
+            req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
+            self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
+
+    def test_successful_storage_classes_put_requests(self):
+        cases = [
+            # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
+            [ 1, ['foo'], 1, 'foo=1', 1],
+            [ 1, ['foo'], 2, 'foo=2', 1],
+            [ 2, ['foo'], 2, 'foo=2', 1],
+            [ 2, ['foo'], 1, 'foo=1', 2],
+            [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
+            [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+            [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+            [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
+            [ 1, ['foo', 'bar'], 1, None, 1],
+            [ 1, ['foo'], 1, None, 1],
+            [ 2, ['foo'], 2, None, 1],
+            [ 2, ['foo'], 1, None, 2],
+        ]
+        for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
+            headers = {'x-keep-replicas-stored': c_copies}
+            if c_classes is not None:
+                headers.update({'x-keep-storage-classes-confirmed': c_classes})
+            with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
+                case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
+                self.assertEqual(self.locator,
+                    self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
+                    case_desc)
+                self.assertEqual(e_reqs, mock.call_count, case_desc)
+
+    def test_failed_storage_classes_put_requests(self):
+        cases = [
+            # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
+            [ 1, ['foo'], 1, 'bar=1', 200],
+            [ 1, ['foo'], 1, None, 503],
+            [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
+            [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
+            [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
+        ]
+        for w_copies, w_classes, c_copies, c_classes, return_code in cases:
+            headers = {'x-keep-replicas-stored': c_copies}
+            if c_classes is not None:
+                headers.update({'x-keep-storage-classes-confirmed': c_classes})
+            with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
+                case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
+                with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
+                    self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 45847fde81..1398b92e87 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -490,12 +490,12 @@ class Mount(object):
                                                       disk_cache=self.args.disk_cache,
                                                       disk_cache_dir=self.args.disk_cache_dir)
 
-            # If there's too many prefetch threads and you
-            # max out the CPU, delivering data to the FUSE
-            # layer actually ends up being slower.
-            # Experimentally, capping 7 threads seems to
-            # be a sweet spot.
-            prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+            # Profiling indicates that prefetching has more of a
+            # negative impact on the read() fast path (by requiring it
+            # to do more work and take additional locks) than benefit.
+            # Also, the kernel does some readahead itself, which has a
+            # similar effect.
+            prefetch_threads = 0
 
             self.api = arvados.safeapi.ThreadSafeApiCache(
                 apiconfig=arvados.config.settings(),

commit 70e89bdbe674c53f39625523341c92dd6ecd5109
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Fri Apr 5 09:41:34 2024 -0400

    2.7 doesn't log "not running worker" so don't test for it, refs #21598
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go
index e24a7b1317..d569020824 100644
--- a/lib/crunchrun/integration_test.go
+++ b/lib/crunchrun/integration_test.go
@@ -220,8 +220,6 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
 		if trial.logConfig == "none" {
 			c.Check(logExists, Equals, false)
 		} else {
-			c.Check(log, Matches, `(?ms).*not running trash worker.*`)
-			c.Check(log, Matches, `(?ms).*not running trash emptier.*`)
 			c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
 			c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
 		}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list