[arvados] updated: 2.1.0-2964-g2a16e128a
git repository hosting
git at public.arvados.org
Thu Oct 20 20:23:19 UTC 2022
Summary of changes:
sdk/python/arvados/diskcache.py | 49 ++++++++++++++++++++++++++++------
sdk/python/arvados/keep.py | 5 ++++
sdk/python/tests/arvados_testutil.py | 2 +-
services/fuse/tests/mount_test_base.py | 2 +-
4 files changed, 48 insertions(+), 10 deletions(-)
via 2a16e128ab63f87aa4656bf860ae3f6b8633c4ca (commit)
via 563325b7d7336e3bfe9248b40c766d83e8b19bf3 (commit)
from 67e56f190b9a78e3c45cc7d90510fc631e0d04b6 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 2a16e128ab63f87aa4656bf860ae3f6b8633c4ca
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Oct 20 16:18:00 2022 -0400
18842: When starting up the disk cache, map in everything
When a process starts, the first thing it will do is map in the
existing blocks and prune any excess from the cache.
If there are multiple processes using the same cache dir, at worst
you'd end up with N*M usage where N is the number of processes and M
is the cache limit.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 9f218e56c..c2afd3bfc 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -8,8 +8,6 @@ import os
import traceback
import stat
import tempfile
-import hashlib
-import fcntl
class DiskCacheSlot(object):
__slots__ = ("locator", "ready", "content", "cachedir")
@@ -48,10 +46,6 @@ class DiskCacheSlot(object):
tmpfile = f.name
os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
- # aquire a shared lock, this tells other processes that
- # we're using this block and to please not delete it.
- fcntl.flock(f, fcntl.LOCK_SH)
-
f.write(value)
f.flush()
os.rename(tmpfile, final)
@@ -92,35 +86,23 @@ class DiskCacheSlot(object):
blockdir = os.path.join(self.cachedir, self.locator[0:3])
final = os.path.join(blockdir, self.locator)
try:
- # If we can't upgrade our shared lock to an exclusive
- # lock, it'll throw an error, that's fine and
- # desirable, it means another process has a lock and
- # we shouldn't delete the block.
- fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
os.remove(final)
except OSError:
pass
@staticmethod
def get_from_disk(locator, cachedir):
- # Get it, check it, return it
blockdir = os.path.join(cachedir, locator[0:3])
final = os.path.join(blockdir, locator)
try:
filehandle = open(final, "rb")
- # aquire a shared lock, this tells other processes that
- # we're using this block and to please not delete it.
- fcntl.flock(f, fcntl.LOCK_SH)
-
content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
- disk_md5 = hashlib.md5(content).hexdigest()
- if disk_md5 == locator:
- dc = DiskCacheSlot(locator, cachedir)
- dc.content = content
- dc.ready.set()
- return dc
+ dc = DiskCacheSlot(locator, cachedir)
+ dc.content = content
+ dc.ready.set()
+ return dc
except FileNotFoundError:
pass
except Exception as e:
@@ -129,36 +111,36 @@ class DiskCacheSlot(object):
return None
@staticmethod
- def cleanup_cachedir(cachedir, maxsize):
+ def init_cache(cachedir, maxslots):
+ # map in all the files in the cache directory, up to max slots.
+ # after max slots, try to delete the excess blocks.
+ #
+ # this gives the calling process ownership of all the blocks
+
blocks = []
- totalsize = 0
for root, dirs, files in os.walk(cachedir):
for name in files:
blockpath = os.path.join(root, name)
res = os.stat(blockpath)
- blocks.append((blockpath, res.st_size, res.st_atime))
- totalsize += res.st_size
-
- if totalsize <= maxsize:
- return
-
- # sort by atime, so the blocks accessed the longest time in
- # the past get deleted first.
- blocks.sort(key=lambda x: x[2])
-
- # go through the list and try deleting blocks until we're
- # below the target size and/or we run out of blocks
- i = 0
- while i < len(blocks) and totalsize > maxsize:
- try:
- with open(blocks[i][0], "rb") as f:
- # If we can't get an exclusive lock, it'll
- # throw an error, that's fine and desirable,
- # it means another process has a lock and we
- # shouldn't delete the block.
- fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
- os.remove(block)
- totalsize -= blocks[i][1]
- except OSError:
- pass
- i += 1
+ blocks.append((name, res.st_atime))
+
+ # sort by access time (atime), going from most recently
+ # accessed (highest timestamp) to least recently accessed
+ # (lowest timestamp).
+ blocks.sort(key=lambda x: x[1], reverse=True)
+
+ # Map in all the files we found, up to maxslots, if we exceed
+ # maxslots, start throwing things out.
+ cachelist = []
+ for b in blocks:
+ got = DiskCacheSlot.get_from_disk(b[0], cachedir)
+ if got is None:
+ continue
+ if len(cachelist) < maxslots:
+ cachelist.append(got)
+ else:
+ # we found more blocks than maxslots, try to
+ # throw it out of the cache.
+ got.evict()
+
+ return cachelist
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 7f316c153..8d95b2dc7 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -207,6 +207,11 @@ class KeepBlockCache(object):
self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+ if self._disk_cache:
+ self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+ self.cap_cache()
+
+
class CacheSlot(object):
__slots__ = ("locator", "ready", "content")
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index a574508cb..3772761b8 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -290,8 +290,8 @@ def binary_compare(a, b):
return True
def make_block_cache(disk_cache):
- block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
if disk_cache:
disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
return block_cache
diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py
index a24005064..b1383d36b 100644
--- a/services/fuse/tests/mount_test_base.py
+++ b/services/fuse/tests/mount_test_base.py
@@ -26,10 +26,10 @@ logger = logging.getLogger('arvados.arv-mount')
from .integration_test import workerPool
def make_block_cache(disk_cache):
- block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
if disk_cache:
disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
shutil.rmtree(disk_cache_dir, ignore_errors=True)
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
return block_cache
class MountTestBase(unittest.TestCase):
commit 563325b7d7336e3bfe9248b40c766d83e8b19bf3
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed Oct 19 17:18:20 2022 -0400
18842: Add locking and cachedir cleanup, needs testing
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 24f249f1d..9f218e56c 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -9,6 +9,7 @@ import traceback
import stat
import tempfile
import hashlib
+import fcntl
class DiskCacheSlot(object):
__slots__ = ("locator", "ready", "content", "cachedir")
@@ -46,6 +47,11 @@ class DiskCacheSlot(object):
f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
tmpfile = f.name
os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
+
+ # aquire a shared lock, this tells other processes that
+ # we're using this block and to please not delete it.
+ fcntl.flock(f, fcntl.LOCK_SH)
+
f.write(value)
f.flush()
os.rename(tmpfile, final)
@@ -86,6 +92,11 @@ class DiskCacheSlot(object):
blockdir = os.path.join(self.cachedir, self.locator[0:3])
final = os.path.join(blockdir, self.locator)
try:
+ # If we can't upgrade our shared lock to an exclusive
+ # lock, it'll throw an error, that's fine and
+ # desirable, it means another process has a lock and
+ # we shouldn't delete the block.
+ fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
os.remove(final)
except OSError:
pass
@@ -98,6 +109,11 @@ class DiskCacheSlot(object):
try:
filehandle = open(final, "rb")
+
+ # aquire a shared lock, this tells other processes that
+ # we're using this block and to please not delete it.
+ fcntl.flock(f, fcntl.LOCK_SH)
+
content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
disk_md5 = hashlib.md5(content).hexdigest()
if disk_md5 == locator:
@@ -111,3 +127,38 @@ class DiskCacheSlot(object):
traceback.print_exc()
return None
+
+ @staticmethod
+ def cleanup_cachedir(cachedir, maxsize):
+ blocks = []
+ totalsize = 0
+ for root, dirs, files in os.walk(cachedir):
+ for name in files:
+ blockpath = os.path.join(root, name)
+ res = os.stat(blockpath)
+ blocks.append((blockpath, res.st_size, res.st_atime))
+ totalsize += res.st_size
+
+ if totalsize <= maxsize:
+ return
+
+ # sort by atime, so the blocks accessed the longest time in
+ # the past get deleted first.
+ blocks.sort(key=lambda x: x[2])
+
+ # go through the list and try deleting blocks until we're
+ # below the target size and/or we run out of blocks
+ i = 0
+ while i < len(blocks) and totalsize > maxsize:
+ try:
+ with open(blocks[i][0], "rb") as f:
+ # If we can't get an exclusive lock, it'll
+ # throw an error, that's fine and desirable,
+ # it means another process has a lock and we
+ # shouldn't delete the block.
+ fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ os.remove(block)
+ totalsize -= blocks[i][1]
+ except OSError:
+ pass
+ i += 1
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list