[arvados] updated: 2.7.1-51-gc06d01222a
git repository hosting
git at public.arvados.org
Fri Apr 5 13:48:08 UTC 2024
Summary of changes:
lib/crunchrun/integration_test.go | 2 -
sdk/python/arvados/arvfile.py | 19 ++--
sdk/python/arvados/diskcache.py | 129 ++++++++++++------------
sdk/python/arvados/keep.py | 88 ++++++++--------
sdk/python/tests/test_keep_client.py | 166 +++++++------------------------
sdk/python/tests/test_storage_classes.py | 128 ++++++++++++++++++++++++
services/fuse/arvados_fuse/command.py | 12 +--
7 files changed, 292 insertions(+), 252 deletions(-)
create mode 100644 sdk/python/tests/test_storage_classes.py
via c06d01222a65a21674b31d76f0bfbcc4facb253a (commit)
via 70e89bdbe674c53f39625523341c92dd6ecd5109 (commit)
from 0c5bbc10c79f5ed4a67c475762fd2e1c574a9b71 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c06d01222a65a21674b31d76f0bfbcc4facb253a
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Apr 4 12:36:15 2024 -0400
Merge branch '21639-keep-cache-dict' refs #21639
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 4b95835aac..0cc7d25a33 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1060,7 +1060,8 @@ class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
+ if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
locs = set()
data = []
@@ -1068,17 +1069,21 @@ class ArvadosFile(object):
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
blockview = memoryview(block)
- data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
locs.add(lr.locator)
else:
break
- for lr in prefetch:
- if lr.locator not in locs:
- self.parent._my_block_manager().block_prefetch(lr.locator)
- locs.add(lr.locator)
+ if self.parent._my_block_manager()._keep.num_prefetch_threads > 0:
+ for lr in prefetch:
+ if lr.locator not in locs:
+ self.parent._my_block_manager().block_prefetch(lr.locator)
+ locs.add(lr.locator)
- return b''.join(data)
+ if len(data) == 1:
+ return data[0]
+ else:
+ return b''.join(data)
@must_be_writable
@synchronized
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index f8fca57803..528a7d28b5 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -13,6 +13,7 @@ import time
import errno
import logging
import weakref
+import collections
_logger = logging.getLogger('arvados.keep')
@@ -31,6 +32,15 @@ class DiskCacheSlot(object):
def get(self):
self.ready.wait()
+ # 'content' can None, an empty byte string, or a nonempty mmap
+ # region. If it is an mmap region, we want to advise the
+ # kernel we're going to use it. This nudges the kernel to
+ # re-read most or all of the block if necessary (instead of
+ # just a few pages at a time), reducing the number of page
+ # faults and improving performance by 4x compared to not
+ # calling madvise.
+ if self.content:
+ self.content.madvise(mmap.MADV_WILLNEED)
return self.content
def set(self, value):
@@ -39,18 +49,18 @@ class DiskCacheSlot(object):
if value is None:
self.content = None
self.ready.set()
- return
+ return False
if len(value) == 0:
# Can't mmap a 0 length file
self.content = b''
self.ready.set()
- return
+ return True
if self.content is not None:
# Has been set already
self.ready.set()
- return
+ return False
blockdir = os.path.join(self.cachedir, self.locator[0:3])
os.makedirs(blockdir, mode=0o700, exist_ok=True)
@@ -73,6 +83,7 @@ class DiskCacheSlot(object):
self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
# only set the event when mmap is successful
self.ready.set()
+ return True
finally:
if tmpfile is not None:
# If the tempfile hasn't been renamed on disk yet, try to delete it.
@@ -95,65 +106,61 @@ class DiskCacheSlot(object):
return len(self.content)
def evict(self):
- if self.content is not None and len(self.content) > 0:
- # The mmap region might be in use when we decided to evict
- # it. This can happen if the cache is too small.
- #
- # If we call close() now, it'll throw an error if
- # something tries to access it.
- #
- # However, we don't need to explicitly call mmap.close()
- #
- # I confirmed in mmapmodule.c that that both close
- # and deallocate do the same thing:
+ if not self.content:
+ return
+
+ # The mmap region might be in use when we decided to evict
+ # it. This can happen if the cache is too small.
+ #
+ # If we call close() now, it'll throw an error if
+ # something tries to access it.
+ #
+ # However, we don't need to explicitly call mmap.close()
+ #
+ # I confirmed in mmapmodule.c that that both close
+ # and deallocate do the same thing:
+ #
+ # a) close the file descriptor
+ # b) unmap the memory range
+ #
+ # So we can forget it in the cache and delete the file on
+ # disk, and it will tear it down after any other
+ # lingering Python references to the mapped memory are
+ # gone.
+
+ blockdir = os.path.join(self.cachedir, self.locator[0:3])
+ final = os.path.join(blockdir, self.locator) + cacheblock_suffix
+ try:
+ fcntl.flock(self.filehandle, fcntl.LOCK_UN)
+
+ # try to get an exclusive lock, this ensures other
+ # processes are not using the block. It is
+ # nonblocking and will throw an exception if we
+ # can't get it, which is fine because that means
+ # we just won't try to delete it.
#
- # a) close the file descriptor
- # b) unmap the memory range
+ # I should note here, the file locking is not
+ # strictly necessary, we could just remove it and
+ # the kernel would ensure that the underlying
+ # inode remains available as long as other
+ # processes still have the file open. However, if
+ # you have multiple processes sharing the cache
+ # and deleting each other's files, you'll end up
+ # with a bunch of ghost files that don't show up
+ # in the file system but are still taking up
+ # space, which isn't particularly user friendly.
+ # The locking strategy ensures that cache blocks
+ # in use remain visible.
#
- # So we can forget it in the cache and delete the file on
- # disk, and it will tear it down after any other
- # lingering Python references to the mapped memory are
- # gone.
-
- blockdir = os.path.join(self.cachedir, self.locator[0:3])
- final = os.path.join(blockdir, self.locator) + cacheblock_suffix
- try:
- fcntl.flock(self.filehandle, fcntl.LOCK_UN)
-
- # try to get an exclusive lock, this ensures other
- # processes are not using the block. It is
- # nonblocking and will throw an exception if we
- # can't get it, which is fine because that means
- # we just won't try to delete it.
- #
- # I should note here, the file locking is not
- # strictly necessary, we could just remove it and
- # the kernel would ensure that the underlying
- # inode remains available as long as other
- # processes still have the file open. However, if
- # you have multiple processes sharing the cache
- # and deleting each other's files, you'll end up
- # with a bunch of ghost files that don't show up
- # in the file system but are still taking up
- # space, which isn't particularly user friendly.
- # The locking strategy ensures that cache blocks
- # in use remain visible.
- #
- fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
-
- os.remove(final)
- return True
- except OSError:
- pass
- finally:
- self.filehandle = None
- self.linger = weakref.ref(self.content)
- self.content = None
- return False
+ fcntl.flock(self.filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
- def gone(self):
- # Test if an evicted object is lingering
- return self.content is None and (self.linger is None or self.linger() is None)
+ os.remove(final)
+ return True
+ except OSError:
+ pass
+ finally:
+ self.filehandle = None
+ self.content = None
@staticmethod
def get_from_disk(locator, cachedir):
@@ -237,13 +244,13 @@ class DiskCacheSlot(object):
# Map in all the files we found, up to maxslots, if we exceed
# maxslots, start throwing things out.
- cachelist = []
+ cachelist: collections.OrderedDict = collections.OrderedDict()
for b in blocks:
got = DiskCacheSlot.get_from_disk(b[0], cachedir)
if got is None:
continue
if len(cachelist) < maxslots:
- cachelist.append(got)
+ cachelist[got.locator] = got
else:
# we found more blocks than maxslots, try to
# throw it out of the cache.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4b00f7df8b..a824621079 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -182,7 +182,7 @@ class Keep(object):
class KeepBlockCache(object):
def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
self.cache_max = cache_max
- self._cache = []
+ self._cache = collections.OrderedDict()
self._cache_lock = threading.Lock()
self._max_slots = max_slots
self._disk_cache = disk_cache
@@ -233,11 +233,13 @@ class KeepBlockCache(object):
self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+ self.cache_total = 0
if self._disk_cache:
self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ for slot in self._cache.values():
+ self.cache_total += slot.size()
self.cap_cache()
-
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
@@ -251,8 +253,11 @@ class KeepBlockCache(object):
return self.content
def set(self, value):
+ if self.content is not None:
+ return False
self.content = value
self.ready.set()
+ return True
def size(self):
if self.content is None:
@@ -262,42 +267,25 @@ class KeepBlockCache(object):
def evict(self):
self.content = None
- return self.gone()
- def gone(self):
- return (self.content is None)
def _resize_cache(self, cache_max, max_slots):
# Try and make sure the contents of the cache do not exceed
# the supplied maximums.
- # Select all slots except those where ready.is_set() and content is
- # None (that means there was an error reading the block).
- self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
- sm = sum([slot.size() for slot in self._cache])
- while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
- for i in range(len(self._cache)-1, -1, -1):
- # start from the back, find a slot that is a candidate to evict
- if self._cache[i].ready.is_set():
- sz = self._cache[i].size()
-
- # If evict returns false it means the
- # underlying disk cache couldn't lock the file
- # for deletion because another process was using
- # it. Don't count it as reducing the amount
- # of data in the cache, find something else to
- # throw out.
- if self._cache[i].evict():
- sm -= sz
-
- # check to make sure the underlying data is gone
- if self._cache[i].gone():
- # either way we forget about it. either the
- # other process will delete it, or if we need
- # it again and it is still there, we'll find
- # it on disk.
- del self._cache[i]
- break
+ if self.cache_total <= cache_max and len(self._cache) <= max_slots:
+ return
+
+ _evict_candidates = collections.deque(self._cache.values())
+ while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
+ slot = _evict_candidates.popleft()
+ if not slot.ready.is_set():
+ continue
+
+ sz = slot.size()
+ slot.evict()
+ self.cache_total -= sz
+ del self._cache[slot.locator]
def cap_cache(self):
@@ -308,19 +296,19 @@ class KeepBlockCache(object):
def _get(self, locator):
# Test if the locator is already in the cache
- for i in range(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n
+ if locator in self._cache:
+ n = self._cache[locator]
+ if n.ready.is_set() and n.content is None:
+ del self._cache[n.locator]
+ return None
+ self._cache.move_to_end(locator)
+ return n
if self._disk_cache:
# see if it exists on disk
n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
if n is not None:
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
+ self.cache_total += n.size()
return n
return None
@@ -350,12 +338,13 @@ class KeepBlockCache(object):
n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
else:
n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
+ self._cache[n.locator] = n
return n, True
def set(self, slot, blob):
try:
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
return
except OSError as e:
if e.errno == errno.ENOMEM:
@@ -365,7 +354,7 @@ class KeepBlockCache(object):
elif e.errno == errno.ENOSPC:
# Reduce disk max space to current - 256 MiB, cap cache and retry
with self._cache_lock:
- sm = sum([st.size() for st in self._cache])
+ sm = sum(st.size() for st in self._cache.values())
self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
elif e.errno == errno.ENODEV:
_logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
@@ -383,7 +372,8 @@ class KeepBlockCache(object):
# exception handler adjusts limits downward in some cases
# to free up resources, which would make the operation
# succeed.
- slot.set(blob)
+ if slot.set(blob):
+ self.cache_total += slot.size()
except Exception as e:
# It failed again. Give up.
slot.set(None)
@@ -924,7 +914,10 @@ class KeepClient(object):
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
- self.num_prefetch_threads = num_prefetch_threads or 2
+ if num_prefetch_threads is not None:
+ self.num_prefetch_threads = num_prefetch_threads
+ else:
+ self.num_prefetch_threads = 2
self._prefetch_queue = None
self._prefetch_threads = None
@@ -1426,6 +1419,9 @@ class KeepClient(object):
does not block.
"""
+ if self.block_cache.get(locator) is not None:
+ return
+
self._start_prefetch_threads()
self._prefetch_queue.put(locator)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index f472c0830e..ebc8431dc2 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -11,6 +11,7 @@ from builtins import range
from builtins import object
import hashlib
import mock
+from mock import patch
import os
import errno
import pycurl
@@ -24,6 +25,7 @@ import tempfile
import time
import unittest
import urllib.parse
+import mmap
import parameterized
@@ -625,122 +627,6 @@ class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheB
- at tutil.skip_sleep
- at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
-class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
- disk_cache = False
-
- def setUp(self):
- self.api_client = self.mock_keep_services(count=2)
- self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
- self.data = b'xyzzy'
- self.locator = '1271ed5ef305aadabc605b1609e24c52'
-
- def tearDown(self):
- DiskCacheBase.tearDown(self)
-
- def test_multiple_default_storage_classes_req_header(self):
- api_mock = self.api_client_mock()
- api_mock.config.return_value = {
- 'StorageClasses': {
- 'foo': { 'Default': True },
- 'bar': { 'Default': True },
- 'baz': { 'Default': False }
- }
- }
- api_client = self.mock_keep_services(api_mock=api_mock, count=2)
- keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
- resp_hdr = {
- 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
- 'x-keep-replicas-stored': 1
- }
- with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
- keep_client.put(self.data, copies=1)
- req_hdr = mock.responses[0]
- self.assertIn(
- 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
-
- def test_storage_classes_req_header(self):
- self.assertEqual(
- self.api_client.config()['StorageClasses'],
- {'default': {'Default': True}})
- cases = [
- # requested, expected
- [['foo'], 'X-Keep-Storage-Classes: foo'],
- [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
- [[], 'X-Keep-Storage-Classes: default'],
- [None, 'X-Keep-Storage-Classes: default'],
- ]
- for req_classes, expected_header in cases:
- headers = {'x-keep-replicas-stored': 1}
- if req_classes is None or len(req_classes) == 0:
- confirmed_hdr = 'default=1'
- elif len(req_classes) > 0:
- confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
- headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
- with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
- self.keep_client.put(self.data, copies=1, classes=req_classes)
- req_hdr = mock.responses[0]
- self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
-
- def test_partial_storage_classes_put(self):
- headers = {
- 'x-keep-replicas-stored': 1,
- 'x-keep-storage-classes-confirmed': 'foo=1'}
- with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
- with self.assertRaises(arvados.errors.KeepWriteError):
- self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
- # 1st request, both classes pending
- req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
- self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
- # 2nd try, 'foo' class already satisfied
- req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
- self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
-
- def test_successful_storage_classes_put_requests(self):
- cases = [
- # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
- [ 1, ['foo'], 1, 'foo=1', 1],
- [ 1, ['foo'], 2, 'foo=2', 1],
- [ 2, ['foo'], 2, 'foo=2', 1],
- [ 2, ['foo'], 1, 'foo=1', 2],
- [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
- [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
- [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
- [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
- [ 1, ['foo', 'bar'], 1, None, 1],
- [ 1, ['foo'], 1, None, 1],
- [ 2, ['foo'], 2, None, 1],
- [ 2, ['foo'], 1, None, 2],
- ]
- for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
- headers = {'x-keep-replicas-stored': c_copies}
- if c_classes is not None:
- headers.update({'x-keep-storage-classes-confirmed': c_classes})
- with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
- case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
- self.assertEqual(self.locator,
- self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
- case_desc)
- self.assertEqual(e_reqs, mock.call_count, case_desc)
-
- def test_failed_storage_classes_put_requests(self):
- cases = [
- # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
- [ 1, ['foo'], 1, 'bar=1', 200],
- [ 1, ['foo'], 1, None, 503],
- [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
- [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
- [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
- ]
- for w_copies, w_classes, c_copies, c_classes, return_code in cases:
- headers = {'x-keep-replicas-stored': c_copies}
- if c_classes is not None:
- headers.update({'x-keep-storage-classes-confirmed': c_classes})
- with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
- case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
- with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
- self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
@@ -1757,21 +1643,31 @@ class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
keep_client.get(self.locator)
- @mock.patch('mmap.mmap')
- def test_disk_cache_retry_write_error(self, mockmmap):
+ def test_disk_cache_retry_write_error(self):
block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
disk_cache_dir=self.disk_cache_dir)
keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
- mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
+ called = False
+ realmmap = mmap.mmap
+ def sideeffect_mmap(*args, **kwargs):
+ nonlocal called
+ if not called:
+ called = True
+ raise OSError(errno.ENOSPC, "no space")
+ else:
+ return realmmap(*args, **kwargs)
- cache_max_before = block_cache.cache_max
+ with patch('mmap.mmap') as mockmmap:
+ mockmmap.side_effect = sideeffect_mmap
- with tutil.mock_keep_responses(self.data, 200) as mock:
- self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+ cache_max_before = block_cache.cache_max
- self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
self.assertTrue(tutil.binary_compare(f.read(), self.data))
@@ -1780,21 +1676,31 @@ class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertTrue(cache_max_before > block_cache.cache_max)
- @mock.patch('mmap.mmap')
- def test_disk_cache_retry_write_error2(self, mockmmap):
+ def test_disk_cache_retry_write_error2(self):
block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
disk_cache_dir=self.disk_cache_dir)
keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
- mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
+ called = False
+ realmmap = mmap.mmap
+ def sideeffect_mmap(*args, **kwargs):
+ nonlocal called
+ if not called:
+ called = True
+ raise OSError(errno.ENOMEM, "no memory")
+ else:
+ return realmmap(*args, **kwargs)
- slots_before = block_cache._max_slots
+ with patch('mmap.mmap') as mockmmap:
+ mockmmap.side_effect = sideeffect_mmap
- with tutil.mock_keep_responses(self.data, 200) as mock:
- self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+ slots_before = block_cache._max_slots
- self.assertIsNotNone(keep_client.get_from_cache(self.locator))
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ self.assertIsNotNone(keep_client.get_from_cache(self.locator))
with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
self.assertTrue(tutil.binary_compare(f.read(), self.data))
diff --git a/sdk/python/tests/test_storage_classes.py b/sdk/python/tests/test_storage_classes.py
new file mode 100644
index 0000000000..21bacc310a
--- /dev/null
+++ b/sdk/python/tests/test_storage_classes.py
@@ -0,0 +1,128 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import pycurl
+
+import unittest
+import parameterized
+from . import arvados_testutil as tutil
+from .arvados_testutil import DiskCacheBase
+
+ at tutil.skip_sleep
+ at parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
+ disk_cache = False
+
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+ def tearDown(self):
+ DiskCacheBase.tearDown(self)
+
+ def test_multiple_default_storage_classes_req_header(self):
+ api_mock = self.api_client_mock()
+ api_mock.config.return_value = {
+ 'StorageClasses': {
+ 'foo': { 'Default': True },
+ 'bar': { 'Default': True },
+ 'baz': { 'Default': False }
+ }
+ }
+ api_client = self.mock_keep_services(api_mock=api_mock, count=2)
+ keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
+ resp_hdr = {
+ 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
+ 'x-keep-replicas-stored': 1
+ }
+ with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
+ keep_client.put(self.data, copies=1)
+ req_hdr = mock.responses[0]
+ self.assertIn(
+ 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_storage_classes_req_header(self):
+ self.assertEqual(
+ self.api_client.config()['StorageClasses'],
+ {'default': {'Default': True}})
+ cases = [
+ # requested, expected
+ [['foo'], 'X-Keep-Storage-Classes: foo'],
+ [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
+ [[], 'X-Keep-Storage-Classes: default'],
+ [None, 'X-Keep-Storage-Classes: default'],
+ ]
+ for req_classes, expected_header in cases:
+ headers = {'x-keep-replicas-stored': 1}
+ if req_classes is None or len(req_classes) == 0:
+ confirmed_hdr = 'default=1'
+ elif len(req_classes) > 0:
+ confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
+ headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
+ with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
+ self.keep_client.put(self.data, copies=1, classes=req_classes)
+ req_hdr = mock.responses[0]
+ self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_partial_storage_classes_put(self):
+ headers = {
+ 'x-keep-replicas-stored': 1,
+ 'x-keep-storage-classes-confirmed': 'foo=1'}
+ with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
+ # 1st request, both classes pending
+ req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
+ # 2nd try, 'foo' class already satisfied
+ req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
+
+ def test_successful_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
+ [ 1, ['foo'], 1, 'foo=1', 1],
+ [ 1, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 1, 'foo=1', 2],
+ [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
+ [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
+ [ 1, ['foo', 'bar'], 1, None, 1],
+ [ 1, ['foo'], 1, None, 1],
+ [ 2, ['foo'], 2, None, 1],
+ [ 2, ['foo'], 1, None, 2],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
+ self.assertEqual(self.locator,
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
+ case_desc)
+ self.assertEqual(e_reqs, mock.call_count, case_desc)
+
+ def test_failed_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
+ [ 1, ['foo'], 1, 'bar=1', 200],
+ [ 1, ['foo'], 1, None, 503],
+ [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
+ [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
+ [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, return_code in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
+ with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 45847fde81..1398b92e87 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -490,12 +490,12 @@ class Mount(object):
disk_cache=self.args.disk_cache,
disk_cache_dir=self.args.disk_cache_dir)
- # If there's too many prefetch threads and you
- # max out the CPU, delivering data to the FUSE
- # layer actually ends up being slower.
- # Experimentally, capping 7 threads seems to
- # be a sweet spot.
- prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+ # Profiling indicates that prefetching has more of a
+ # negative impact on the read() fast path (by requiring it
+ # to do more work and take additional locks) than benefit.
+ # Also, the kernel does some readahead itself, which has a
+ # similar effect.
+ prefetch_threads = 0
self.api = arvados.safeapi.ThreadSafeApiCache(
apiconfig=arvados.config.settings(),
commit 70e89bdbe674c53f39625523341c92dd6ecd5109
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Apr 5 09:41:34 2024 -0400
2.7 doesn't log "not running worker" so don't test for it, refs #21598
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go
index e24a7b1317..d569020824 100644
--- a/lib/crunchrun/integration_test.go
+++ b/lib/crunchrun/integration_test.go
@@ -220,8 +220,6 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
if trial.logConfig == "none" {
c.Check(logExists, Equals, false)
} else {
- c.Check(log, Matches, `(?ms).*not running trash worker.*`)
- c.Check(log, Matches, `(?ms).*not running trash emptier.*`)
c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list