[arvados] updated: 2.1.0-2964-g2a16e128a

git repository hosting git at public.arvados.org
Thu Oct 20 20:23:19 UTC 2022


Summary of changes:
 sdk/python/arvados/diskcache.py        | 49 ++++++++++++++++++++++++++++------
 sdk/python/arvados/keep.py             |  5 ++++
 sdk/python/tests/arvados_testutil.py   |  2 +-
 services/fuse/tests/mount_test_base.py |  2 +-
 4 files changed, 48 insertions(+), 10 deletions(-)

       via  2a16e128ab63f87aa4656bf860ae3f6b8633c4ca (commit)
       via  563325b7d7336e3bfe9248b40c766d83e8b19bf3 (commit)
      from  67e56f190b9a78e3c45cc7d90510fc631e0d04b6 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2a16e128ab63f87aa4656bf860ae3f6b8633c4ca
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Oct 20 16:18:00 2022 -0400

    18842: When starting up the disk cache, map in everything
    
    When a process starts, the first thing it will do is map in the
    existing blocks and prune any excess from the cache.
    
    If there are multiple processes using the same cache dir, at worst
    you'd end up with N*M usage where N is the number of processes and M
    is the cache limit.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 9f218e56c..c2afd3bfc 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -8,8 +8,6 @@ import os
 import traceback
 import stat
 import tempfile
-import hashlib
-import fcntl
 
 class DiskCacheSlot(object):
     __slots__ = ("locator", "ready", "content", "cachedir")
@@ -48,10 +46,6 @@ class DiskCacheSlot(object):
             tmpfile = f.name
             os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
 
-            # aquire a shared lock, this tells other processes that
-            # we're using this block and to please not delete it.
-            fcntl.flock(f, fcntl.LOCK_SH)
-
             f.write(value)
             f.flush()
             os.rename(tmpfile, final)
@@ -92,35 +86,23 @@ class DiskCacheSlot(object):
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             final = os.path.join(blockdir, self.locator)
             try:
-                # If we can't upgrade our shared lock to an exclusive
-                # lock, it'll throw an error, that's fine and
-                # desirable, it means another process has a lock and
-                # we shouldn't delete the block.
-                fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
                 os.remove(final)
             except OSError:
                 pass
 
     @staticmethod
     def get_from_disk(locator, cachedir):
-        # Get it, check it, return it
         blockdir = os.path.join(cachedir, locator[0:3])
         final = os.path.join(blockdir, locator)
 
         try:
             filehandle = open(final, "rb")
 
-            # aquire a shared lock, this tells other processes that
-            # we're using this block and to please not delete it.
-            fcntl.flock(f, fcntl.LOCK_SH)
-
             content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
-            disk_md5 = hashlib.md5(content).hexdigest()
-            if disk_md5 == locator:
-                dc = DiskCacheSlot(locator, cachedir)
-                dc.content = content
-                dc.ready.set()
-                return dc
+            dc = DiskCacheSlot(locator, cachedir)
+            dc.content = content
+            dc.ready.set()
+            return dc
         except FileNotFoundError:
             pass
         except Exception as e:
@@ -129,36 +111,36 @@ class DiskCacheSlot(object):
         return None
 
     @staticmethod
-    def cleanup_cachedir(cachedir, maxsize):
+    def init_cache(cachedir, maxslots):
+        # map in all the files in the cache directory, up to max slots.
+        # after max slots, try to delete the excess blocks.
+        #
+        # this gives the calling process ownership of all the blocks
+
         blocks = []
-        totalsize = 0
         for root, dirs, files in os.walk(cachedir):
             for name in files:
                 blockpath = os.path.join(root, name)
                 res = os.stat(blockpath)
-                blocks.append((blockpath, res.st_size, res.st_atime))
-                totalsize += res.st_size
-
-        if totalsize <= maxsize:
-            return
-
-        # sort by atime, so the blocks accessed the longest time in
-        # the past get deleted first.
-        blocks.sort(key=lambda x: x[2])
-
-        # go through the list and try deleting blocks until we're
-        # below the target size and/or we run out of blocks
-        i = 0
-        while i < len(blocks) and totalsize > maxsize:
-            try:
-                with open(blocks[i][0], "rb") as f:
-                    # If we can't get an exclusive lock, it'll
-                    # throw an error, that's fine and desirable,
-                    # it means another process has a lock and we
-                    # shouldn't delete the block.
-                    fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
-                    os.remove(block)
-                    totalsize -= blocks[i][1]
-            except OSError:
-                pass
-            i += 1
+                blocks.append((name, res.st_atime))
+
+        # sort by access time (atime), going from most recently
+        # accessed (highest timestamp) to least recently accessed
+        # (lowest timestamp).
+        blocks.sort(key=lambda x: x[1], reverse=True)
+
+        # Map in all the files we found, up to maxslots, if we exceed
+        # maxslots, start throwing things out.
+        cachelist = []
+        for b in blocks:
+            got = DiskCacheSlot.get_from_disk(b[0], cachedir)
+            if got is None:
+                continue
+            if len(cachelist) < maxslots:
+                cachelist.append(got)
+            else:
+                # we found more blocks than maxslots, try to
+                # throw it out of the cache.
+                got.evict()
+
+        return cachelist
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 7f316c153..8d95b2dc7 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -207,6 +207,11 @@ class KeepBlockCache(object):
 
         self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            self.cap_cache()
+
+
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
 
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index a574508cb..3772761b8 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -290,8 +290,8 @@ def binary_compare(a, b):
     return True
 
 def make_block_cache(disk_cache):
-    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     if disk_cache:
         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
         shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     return block_cache
diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py
index a24005064..b1383d36b 100644
--- a/services/fuse/tests/mount_test_base.py
+++ b/services/fuse/tests/mount_test_base.py
@@ -26,10 +26,10 @@ logger = logging.getLogger('arvados.arv-mount')
 from .integration_test import workerPool
 
 def make_block_cache(disk_cache):
-    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     if disk_cache:
         disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
         shutil.rmtree(disk_cache_dir, ignore_errors=True)
+    block_cache = arvados.keep.KeepBlockCache(disk_cache=disk_cache)
     return block_cache
 
 class MountTestBase(unittest.TestCase):

commit 563325b7d7336e3bfe9248b40c766d83e8b19bf3
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Wed Oct 19 17:18:20 2022 -0400

    18842: Add locking and cachedir cleanup, needs testing
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/python/arvados/diskcache.py b/sdk/python/arvados/diskcache.py
index 24f249f1d..9f218e56c 100644
--- a/sdk/python/arvados/diskcache.py
+++ b/sdk/python/arvados/diskcache.py
@@ -9,6 +9,7 @@ import traceback
 import stat
 import tempfile
 import hashlib
+import fcntl
 
 class DiskCacheSlot(object):
     __slots__ = ("locator", "ready", "content", "cachedir")
@@ -46,6 +47,11 @@ class DiskCacheSlot(object):
             f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
             tmpfile = f.name
             os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
+
+            # aquire a shared lock, this tells other processes that
+            # we're using this block and to please not delete it.
+            fcntl.flock(f, fcntl.LOCK_SH)
+
             f.write(value)
             f.flush()
             os.rename(tmpfile, final)
@@ -86,6 +92,11 @@ class DiskCacheSlot(object):
             blockdir = os.path.join(self.cachedir, self.locator[0:3])
             final = os.path.join(blockdir, self.locator)
             try:
+                # If we can't upgrade our shared lock to an exclusive
+                # lock, it'll throw an error, that's fine and
+                # desirable, it means another process has a lock and
+                # we shouldn't delete the block.
+                fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
                 os.remove(final)
             except OSError:
                 pass
@@ -98,6 +109,11 @@ class DiskCacheSlot(object):
 
         try:
             filehandle = open(final, "rb")
+
+            # aquire a shared lock, this tells other processes that
+            # we're using this block and to please not delete it.
+            fcntl.flock(f, fcntl.LOCK_SH)
+
             content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
             disk_md5 = hashlib.md5(content).hexdigest()
             if disk_md5 == locator:
@@ -111,3 +127,38 @@ class DiskCacheSlot(object):
             traceback.print_exc()
 
         return None
+
+    @staticmethod
+    def cleanup_cachedir(cachedir, maxsize):
+        blocks = []
+        totalsize = 0
+        for root, dirs, files in os.walk(cachedir):
+            for name in files:
+                blockpath = os.path.join(root, name)
+                res = os.stat(blockpath)
+                blocks.append((blockpath, res.st_size, res.st_atime))
+                totalsize += res.st_size
+
+        if totalsize <= maxsize:
+            return
+
+        # sort by atime, so the blocks accessed the longest time in
+        # the past get deleted first.
+        blocks.sort(key=lambda x: x[2])
+
+        # go through the list and try deleting blocks until we're
+        # below the target size and/or we run out of blocks
+        i = 0
+        while i < len(blocks) and totalsize > maxsize:
+            try:
+                with open(blocks[i][0], "rb") as f:
+                    # If we can't get an exclusive lock, it'll
+                    # throw an error, that's fine and desirable,
+                    # it means another process has a lock and we
+                    # shouldn't delete the block.
+                    fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+                    os.remove(block)
+                    totalsize -= blocks[i][1]
+            except OSError:
+                pass
+            i += 1

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list