[arvados] updated: 2.7.0-6109-geb986fe200

git repository hosting git at public.arvados.org
Fri Mar 8 22:33:23 UTC 2024


Summary of changes:
 services/fuse/arvados_fuse/__init__.py | 275 +++++++++++++++++++++------------
 services/fuse/arvados_fuse/command.py  |   4 +-
 services/fuse/arvados_fuse/fresh.py    |  12 +-
 services/fuse/arvados_fuse/fusedir.py  |  45 +++---
 4 files changed, 204 insertions(+), 132 deletions(-)

       via  eb986fe200a4998052fd08323ff0a67eb05490fd (commit)
       via  310bf276572164d7d47e250f45ed1b5cf7902a2d (commit)
      from  32e56d60fd965260662b5c8d8aaafc0d793e33bd (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit eb986fe200a4998052fd08323ff0a67eb05490fd
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Fri Mar 8 17:32:44 2024 -0500

    21541: Making progress on memory management
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a966964248..0370237c1b 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -145,175 +145,99 @@ class InodeCache(object):
     """
 
     def __init__(self, cap, min_entries=4):
-        self._entries = collections.OrderedDict()
+        self._cache_entries = collections.OrderedDict()
         self._by_uuid = {}
         self.cap = cap
         self._total = 0
         self.min_entries = min_entries
-
-        self._cache_remove_queue = queue.Queue()
-        self._cache_remove_thread = threading.Thread(None, self._cache_remove)
-        self._cache_remove_thread.daemon = True
-        self._cache_remove_thread.start()
-
-        self._cache_cap_event = threading.Event()
-        self._cache_cap_thread = threading.Thread(None, self._cache_cap)
-        self._cache_cap_thread.daemon = True
-        self._cache_cap_thread.start()
-
+        self._total_lock = threading.Lock()
 
     def total(self):
         return self._total
 
-    def _cache_cap(self):
-        while True:
-            self._cache_cap_event.wait()
-            self._cache_cap_event.clear()
-
-            if self._total > self.cap:
-                with llfuse.lock:
-                    _logger.debug("InodeCache cap_cache %i, %i", self._total, self.cap)
-                    for ent in listvalues(self._entries):
-                        if self._total < self.cap or len(self._entries) < self.min_entries:
-                            break
-                        self._remove(ent)
-
-
-    def _cache_remove(self):
-        while True:
-            entry = self._cache_remove_queue.get()
-            _logger.debug("InodeCache got %s", entry)
-            if entry is None:
-                return
-
-            if entry.inode not in self._entries:
-                # removed already
-                continue
-
-            _logger.debug("InodeCache will remove inode %i", entry.inode)
-            with llfuse.lock:
-                _logger.debug("InodeCache removing inode %i", entry.inode)
-                try:
-                    parent = self._entries.get(entry.parent_inode)
-                    if parent is not None and parent.has_ref(False):
-                        continue
-
-                    if parent is not None and parent.in_use():
-                        #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
-                        return
-
-                    # Invalidate the entry for self on the parent
-                    entry.kernel_invalidate()
-
-                    # For directories, clear the contents
-                    entry.clear()
-
-                    # manage cache size running sum
-                    self._total -= entry.cache_size
+    def evict_candidates(self):
+        with self._total_lock:
+            total = self._total
+        if total <= self.cap:
+            return
 
-                    # manage the mapping of uuid to object
-                    if entry.cache_uuid:
-                        self._by_uuid[entry.cache_uuid].remove(entry)
-                        if not self._by_uuid[entry.cache_uuid]:
-                            del self._by_uuid[entry.cache_uuid]
-                        entry.cache_uuid = None
+        _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
+        for ent in listvalues(self._cache_entries):
+            with self._total_lock:
+                total = self._total
+            if total < self.cap or len(self._cache_entries) < self.min_entries:
+                break
+            yield ent
 
-                    # Now fully forget about it
-                    del self._entries[entry.inode]
+    def manage(self, obj):
+        if obj.inode in self._cache_entries:
+            return
 
-                    _logger.debug("InodeCache removed inode %i, total %i", entry.inode, self._total)
-                    entry.inode = None
+        obj.cache_size = obj.objsize()
 
-                    # stop anything else
-                    with llfuse.lock_released:
-                        entry.finalize()
-                except Exception as e:
-                    _logger.exception("failed remove")
+        with self._total_lock:
+            _logger.debug("InodeCache b4 cache_size %i total %i", obj.cache_size, self._total)
+            self._total += obj.cache_size
+            _logger.debug("InodeCache after cache_size %i total %i", obj.cache_size, self._total)
+            total = self._total
 
-    def _remove(self, entry):
-        if entry.inode not in self._entries:
-            return
+        self._cache_entries[obj.inode] = obj
 
-        # Kernel behavior seems to be that if a file is
-        # referenced, its parents remain referenced too. This
-        # means has_ref() exits early when a collection is not
-        # candidate for eviction.
-        #
-        # By contrast, in_use() doesn't increment references on
-        # parents, so it requires a full tree walk to determine if
-        # a collection is a candidate for eviction.  This takes
-        # .07s for 240000 files, which becomes a major drag when
-        # cap_cache is being called several times a second and
-        # there are multiple non-evictable collections in the
-        # cache.
-        #
-        # So it is important for performance that we do the
-        # has_ref() check first.
-
-        # if entry.has_ref(True):
-        #     #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
-        #     return
-
-        parent = self._entries.get(entry.parent_inode)
-        if parent is not None and parent.has_ref(False):
-            #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
-            return
+        obj.cache_uuid = obj.uuid()
+        if obj.cache_uuid:
+            if obj.cache_uuid not in self._by_uuid:
+                self._by_uuid[obj.cache_uuid] = [obj]
+            else:
+                if obj not in self._by_uuid[obj.cache_uuid]:
+                    self._by_uuid[obj.cache_uuid].append(obj)
 
-        # if entry.has_ref(True):
-        #     #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
-        #     return
+        _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
+                      obj.inode, obj.cache_size, obj.cache_uuid, total, len(self._cache_entries))
 
-        if parent is not None and parent.in_use():
-            #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+    def unmanage(self, entry):
+        if entry.inode not in self._cache_entries:
             return
 
-        _logger.debug("InodeCache queuing %i", entry.inode)
-        self._cache_remove_queue.put(entry)
+        # manage cache size running sum
+        # with self._total_lock:
+        #     self._total -= entry.cache_size
+        # entry.cache_size = 0
 
-    def cap_cache(self):
-        self._cache_cap_event.set()
+        # manage the mapping of uuid to object
+        if entry.cache_uuid:
+            self._by_uuid[entry.cache_uuid].remove(entry)
+            if not self._by_uuid[entry.cache_uuid]:
+                del self._by_uuid[entry.cache_uuid]
+            entry.cache_uuid = None
 
-    def manage(self, obj):
-        if obj.persisted():
-            obj.cache_size = obj.objsize()
-            self._entries[obj.inode] = obj
-            obj.cache_uuid = obj.uuid()
-            if obj.cache_uuid:
-                if obj.cache_uuid not in self._by_uuid:
-                    self._by_uuid[obj.cache_uuid] = [obj]
-                else:
-                    if obj not in self._by_uuid[obj.cache_uuid]:
-                        self._by_uuid[obj.cache_uuid].append(obj)
-            self._total += obj.cache_size
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
-                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
-            self.cap_cache()
+        # Now forget about it
+        del self._cache_entries[entry.inode]
 
     def update_cache_size(self, obj):
-        if obj.inode in self._entries:
-            self._total -= obj.cache_size
-            obj.cache_size = obj.objsize()
-            self._total += obj.cache_size
+        pass
+        # if obj.inode in self._cache_entries:
+        #     with self._total_lock:
+        #         _logger.debug("update_cache_size b4 cache_size %i total %i", obj.cache_size, self._total)
+        #         self._total -= obj.cache_size
+        #         obj.cache_size = obj.objsize()
+        #         self._total += obj.cache_size
+        #         _logger.debug("update_cache_size after cache_size %i total %i", obj.cache_size, self._total)
 
     def touch(self, obj):
-        if obj.persisted():
-            if obj.inode in self._entries:
-                self._entries.move_to_end(obj.inode)
-            else:
-                self.manage(obj)
-
-    def unmanage(self, obj):
-        if obj.persisted() and obj.inode in self._entries:
-            self._remove(obj)
+        if obj.inode in self._cache_entries:
+            self._cache_entries.move_to_end(obj.inode)
+        else:
+            self.manage(obj)
 
     def find_by_uuid(self, uuid):
         return self._by_uuid.get(uuid, [])
 
     def clear(self):
-        self._entries.clear()
+        self._cache_entries.clear()
         self._by_uuid.clear()
         self._total = 0
 
+
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
@@ -324,6 +248,11 @@ class Inodes(object):
         self.inode_cache = inode_cache
         self.encoding = encoding
 
+        self._inode_remove_queue = queue.Queue()
+        self._inode_remove_thread = threading.Thread(None, self._inode_remove)
+        self._inode_remove_thread.daemon = True
+        self._inode_remove_thread.start()
+
     def __getitem__(self, item):
         return self._entries[item]
 
@@ -343,36 +272,124 @@ class Inodes(object):
         entry._atime = time.time()
         self.inode_cache.touch(entry)
 
+    def cap_cache(self):
+        self._inode_remove_queue.put(("evict_candidates",))
+
     def add_entry(self, entry):
+        # Assign a inode to a new entry
         entry.inode = next(self._counter)
         if entry.inode == llfuse.ROOT_INODE:
             entry.inc_ref()
         self._entries[entry.inode] = entry
-        self.inode_cache.manage(entry)
+        if entry.persisted():
+            # only "persisted" items can be reloaded from the server
+            # making them safe to evict automatically.
+            self.inode_cache.manage(entry)
+        self.cap_cache()
         return entry
 
     def del_entry(self, entry):
+        # Remove entry from the inode table
         if entry.ref_count == 0:
-            self.inode_cache.unmanage(entry)
+            self._inode_remove_queue.put(("remove", entry))
         else:
             entry.dead = True
             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
 
+    def _inode_remove(self):
+        while True:
+            try:
+                entry = self._inode_remove_queue.get(True)
+                with llfuse.lock:
+                    # Process this entry
+                    _logger.debug("_inode_remove %s", entry)
+                    self._inode_op(entry)
+
+                    while True:
+                        try:
+                            # Drain the queue of any other entries
+                            entry = self._inode_remove_queue.get(False)
+                            _logger.debug("_inode_remove %s", entry)
+                            self._inode_op(entry)
+                        except queue.Empty:
+                            break
+
+                    for entry in self.inode_cache.evict_candidates():
+                        self._remove(entry)
+            except Exception as e:
+                _logger.exception("_inode_remove")
+
+    def _inode_op(self, op):
+        if op[0] == "remove":
+            self._remove(op[1])
+        if op[0] == "invalidate_inode":
+            with llfuse.lock_released:
+                _logger.debug("sending invalidate inode %i", op[1])
+                llfuse.invalidate_inode(op[1])
+        if op[0] == "invalidate_entry":
+            with llfuse.lock_released:
+                _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
+                llfuse.invalidate_entry(op[1], op[2])
+        if op[0] == "evict_candidates":
+            pass
+
+
+    def _remove(self, entry):
+        try:
+            if entry.inode is None:
+                # Removed already
+                return
+
+            if entry.has_ref():
+                # has kernel reference, can't be removed.
+                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+                return
+
+            if entry.in_use():
+                # referenced internally, stay pinned
+                #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+                return
+
+            forget_inode = True
+            parent = self._entries.get(entry.parent_inode)
+            if parent is not None and parent.has_ref():
+                # the parent is still referenced, so we'll keep the
+                # entry but wipe out the stuff under it
+                forget_inode = False
+
+            if forget_inode:
+                self.inode_cache.unmanage(entry)
+
+            _logger.debug("InodeCache removing inode %i", entry.inode)
+
+            # Invalidate the entry for self on the parent
+            entry.kernel_invalidate()
+
+            # For directories, clear the contents
+            entry.clear()
+
+            _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s",
+                          entry.inode, self.inode_cache.total(), forget_inode)
+            if forget_inode:
+                entry.inode = None
+
+            # stop anything else
+            with llfuse.lock_released:
+                entry.finalize()
+        except Exception as e:
+            _logger.exception("failed remove")
+
     def invalidate_inode(self, entry):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            with llfuse.lock_released:
-                _logger.debug("sending invalidate %i", entry.inode)
-                llfuse.invalidate_inode(entry.inode)
+            self._inode_remove_queue.put(("invalidate_inode", entry.inode))
 
     def invalidate_entry(self, entry, name):
-        if entry.has_ref(False):
+        if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            with llfuse.lock_released:
-                _logger.debug("sending invalidate to inode %i entry %s", entry.inode, name)
-                llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+            self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
 
     def clear(self):
         self.inode_cache.clear()
@@ -557,6 +574,7 @@ class Operations(llfuse.Operations):
     @catch_exceptions
     def getattr(self, inode, ctx=None):
         if inode not in self.inodes:
+            _logger.debug("arv-mount getattr: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         e = self.inodes[inode]
@@ -667,6 +685,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.debug("arv-mount open: inode %i missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if isinstance(p, Directory):
@@ -748,7 +767,7 @@ class Operations(llfuse.Operations):
             finally:
                 self._filehandles[fh].release()
                 del self._filehandles[fh]
-        self.inodes.inode_cache.cap_cache()
+        self.inodes.cap_cache()
 
     def releasedir(self, fh):
         self.release(fh)
@@ -761,7 +780,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
-            _logger.warning("arv-mount opendir: called with inode %i but it is missing", inode)
+            _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if not isinstance(p, Directory):
@@ -778,7 +797,9 @@ class Operations(llfuse.Operations):
 
         # update atime
         self.inodes.touch(p)
+        p.inc_use()
         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+        p.dec_use()
         return fh
 
     @readdir_time.time()
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 53214ee94d..366b5945bc 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -125,17 +125,11 @@ class FreshBase(object):
         self.ref_count -= n
         return self.ref_count
 
-    def has_ref(self, only_children):
+    def has_ref(self):
         """Determine if there are any kernel references to this
-        object or its children.
-
-        If only_children is True, ignore refcount of self and only consider
-        children.
+        object.
         """
-        if only_children:
-            return False
-        else:
-            return self.ref_count > 0
+        return self.ref_count > 0
 
     def objsize(self):
         return 0
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 899e3abb34..25e611e67d 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -228,21 +228,14 @@ class Directory(FreshBase):
                 return True
         return False
 
-    def has_ref(self, only_children):
-        if super(Directory, self).has_ref(only_children):
-            return True
-        for v in self._entries.values():
-            if v.has_ref(False):
-                return True
-        return False
-
     def clear(self):
         """Delete all entries"""
         oldentries = self._entries
         self._entries = {}
+        self.invalidate()
         for n in oldentries:
             self.inodes.del_entry(oldentries[n])
-        self.invalidate()
+        self.inodes.inode_cache.update_cache_size(self)
 
     def kernel_invalidate(self):
         # Invalidating the dentry on the parent implies invalidating all paths
@@ -1138,7 +1131,9 @@ class ProjectDirectory(Directory):
 
     def _add_entry(self, i, name):
         ent = self.createDirectory(i)
+        ent.inc_use()
         self._entries[name] = self.inodes.add_entry(ent)
+        ent.dec_use()
         return self._entries[name]
 
     @use_counter

commit 310bf276572164d7d47e250f45ed1b5cf7902a2d
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Mar 7 14:37:25 2024 -0500

    21541: WIP, need to not throw things out too quickly
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a70c99fde9..a966964248 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -77,16 +77,6 @@ import arvados.keep
 from prometheus_client import Summary
 import queue
 
-# Default _notify_queue has a limit of 1000 items, but it really needs to be
-# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
-# details.
-
-if hasattr(llfuse, 'capi'):
-    # llfuse < 0.42
-    llfuse.capi._notify_queue = queue.Queue()
-else:
-    # llfuse >= 0.42
-    llfuse._notify_queue = queue.Queue()
 
 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
 
@@ -161,66 +151,127 @@ class InodeCache(object):
         self._total = 0
         self.min_entries = min_entries
 
+        self._cache_remove_queue = queue.Queue()
+        self._cache_remove_thread = threading.Thread(None, self._cache_remove)
+        self._cache_remove_thread.daemon = True
+        self._cache_remove_thread.start()
+
+        self._cache_cap_event = threading.Event()
+        self._cache_cap_thread = threading.Thread(None, self._cache_cap)
+        self._cache_cap_thread.daemon = True
+        self._cache_cap_thread.start()
+
+
     def total(self):
         return self._total
 
-    def _remove(self, obj, clear):
-        if obj.inode is None:
-            return
-        if clear:
-            # Kernel behavior seems to be that if a file is
-            # referenced, its parents remain referenced too. This
-            # means has_ref() exits early when a collection is not
-            # candidate for eviction.
-            #
-            # By contrast, in_use() doesn't increment references on
-            # parents, so it requires a full tree walk to determine if
-            # a collection is a candidate for eviction.  This takes
-            # .07s for 240000 files, which becomes a major drag when
-            # cap_cache is being called several times a second and
-            # there are multiple non-evictable collections in the
-            # cache.
-            #
-            # So it is important for performance that we do the
-            # has_ref() check first.
-
-            if obj.has_ref(True):
-                #_logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
+    def _cache_cap(self):
+        while True:
+            self._cache_cap_event.wait()
+            self._cache_cap_event.clear()
+
+            if self._total > self.cap:
+                with llfuse.lock:
+                    _logger.debug("InodeCache cap_cache %i, %i", self._total, self.cap)
+                    for ent in listvalues(self._entries):
+                        if self._total < self.cap or len(self._entries) < self.min_entries:
+                            break
+                        self._remove(ent)
+
+
+    def _cache_remove(self):
+        while True:
+            entry = self._cache_remove_queue.get()
+            _logger.debug("InodeCache got %s", entry)
+            if entry is None:
                 return
 
-            if obj.in_use():
-                #_logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
-                return
+            if entry.inode not in self._entries:
+                # removed already
+                continue
+
+            _logger.debug("InodeCache will remove inode %i", entry.inode)
+            with llfuse.lock:
+                _logger.debug("InodeCache removing inode %i", entry.inode)
+                try:
+                    parent = self._entries.get(entry.parent_inode)
+                    if parent is not None and parent.has_ref(False):
+                        continue
+
+                    if parent is not None and parent.in_use():
+                        #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+                        return
+
+                    # Invalidate the entry for self on the parent
+                    entry.kernel_invalidate()
+
+                    # For directories, clear the contents
+                    entry.clear()
+
+                    # manage cache size running sum
+                    self._total -= entry.cache_size
 
-            obj.kernel_invalidate()
-            _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
-            obj.clear()
+                    # manage the mapping of uuid to object
+                    if entry.cache_uuid:
+                        self._by_uuid[entry.cache_uuid].remove(entry)
+                        if not self._by_uuid[entry.cache_uuid]:
+                            del self._by_uuid[entry.cache_uuid]
+                        entry.cache_uuid = None
 
-        # The llfuse lock is released in del_entry(), which is called by
-        # Directory.clear().  While the llfuse lock is released, it can happen
-        # that a reentrant call removes this entry before this call gets to it.
-        # Ensure that the entry is still valid before trying to remove it.
-        if obj.inode not in self._entries:
+                    # Now fully forget about it
+                    del self._entries[entry.inode]
+
+                    _logger.debug("InodeCache removed inode %i, total %i", entry.inode, self._total)
+                    entry.inode = None
+
+                    # stop anything else
+                    with llfuse.lock_released:
+                        entry.finalize()
+                except Exception as e:
+                    _logger.exception("failed remove")
+
+    def _remove(self, entry):
+        if entry.inode not in self._entries:
             return
 
-        self._total -= obj.cache_size
-        del self._entries[obj.inode]
-        if obj.cache_uuid:
-            self._by_uuid[obj.cache_uuid].remove(obj)
-            if not self._by_uuid[obj.cache_uuid]:
-                del self._by_uuid[obj.cache_uuid]
-            obj.cache_uuid = None
-        if clear:
-            _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
+        # Kernel behavior seems to be that if a file is
+        # referenced, its parents remain referenced too. This
+        # means has_ref() exits early when a collection is not
+        # candidate for eviction.
+        #
+        # By contrast, in_use() doesn't increment references on
+        # parents, so it requires a full tree walk to determine if
+        # a collection is a candidate for eviction.  This takes
+        # .07s for 240000 files, which becomes a major drag when
+        # cap_cache is being called several times a second and
+        # there are multiple non-evictable collections in the
+        # cache.
+        #
+        # So it is important for performance that we do the
+        # has_ref() check first.
+
+        # if entry.has_ref(True):
+        #     #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
+        #     return
+
+        parent = self._entries.get(entry.parent_inode)
+        if parent is not None and parent.has_ref(False):
+            #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
+            return
+
+        # if entry.has_ref(True):
+        #     #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
+        #     return
+
+        if parent is not None and parent.in_use():
+            #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+            return
+
+        _logger.debug("InodeCache queuing %i", entry.inode)
+        self._cache_remove_queue.put(entry)
 
     def cap_cache(self):
-        _logger.debug("in cap_cache %i, %i", self._total, self.cap)
-        if self._total > self.cap:
-            for ent in listvalues(self._entries):
-                if self._total < self.cap or len(self._entries) < self.min_entries:
-                    break
-                self._remove(ent, True)
-            _logger.debug("end cap_cache %i, %i", self._total, self.cap)
+        self._cache_cap_event.set()
 
     def manage(self, obj):
         if obj.persisted():
@@ -236,6 +287,7 @@ class InodeCache(object):
             self._total += obj.cache_size
             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
                           obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
+            self.cap_cache()
 
     def update_cache_size(self, obj):
         if obj.inode in self._entries:
@@ -252,7 +304,7 @@ class InodeCache(object):
 
     def unmanage(self, obj):
         if obj.persisted() and obj.inode in self._entries:
-            self._remove(obj, True)
+            self._remove(obj)
 
     def find_by_uuid(self, uuid):
         return self._by_uuid.get(uuid, [])
@@ -271,7 +323,6 @@ class Inodes(object):
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
         self.encoding = encoding
-        self.deferred_invalidations = []
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -303,10 +354,6 @@ class Inodes(object):
     def del_entry(self, entry):
         if entry.ref_count == 0:
             self.inode_cache.unmanage(entry)
-            del self._entries[entry.inode]
-            with llfuse.lock_released:
-                entry.finalize()
-            entry.inode = None
         else:
             entry.dead = True
             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
@@ -315,13 +362,17 @@ class Inodes(object):
         if entry.has_ref(False):
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_inode(entry.inode)
+            with llfuse.lock_released:
+                _logger.debug("sending invalidate %i", entry.inode)
+                llfuse.invalidate_inode(entry.inode)
 
     def invalidate_entry(self, entry, name):
         if entry.has_ref(False):
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+            with llfuse.lock_released:
+                _logger.debug("sending invalidate to inode %i entry %s", entry.inode, name)
+                llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
 
     def clear(self):
         self.inode_cache.clear()
@@ -710,6 +761,7 @@ class Operations(llfuse.Operations):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
+            _logger.warning("arv-mount opendir: called with inode %i but it is missing", inode)
             raise llfuse.FUSEError(errno.ENOENT)
 
         if not isinstance(p, Directory):
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 8c24467aeb..7c193bbb9f 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -387,7 +387,7 @@ class Mount(object):
         e = self.operations.inodes.add_entry(Directory(
             llfuse.ROOT_INODE,
             self.operations.inodes,
-            lambda: self.api.config(),
+            self.api.config(),
             self.args.enable_write,
             self.args.filters,
         ))
@@ -472,7 +472,7 @@ From here, the following directories are available:
 
     def _llfuse_main(self):
         try:
-            llfuse.main(workers=1)
+            llfuse.main(workers=8)
         except:
             llfuse.close(unmount=False)
             raise
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 0671670c9d..899e3abb34 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -67,7 +67,7 @@ class Directory(FreshBase):
     def forward_slash_subst(self):
         if not hasattr(self, '_fsns'):
             self._fsns = None
-            config = self.apiconfig()
+            config = self.apiconfig
             try:
                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
             except KeyError:
@@ -194,13 +194,13 @@ class Directory(FreshBase):
             if not name:
                 continue
             if name not in self._entries:
-                _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
                 # create new directory entry
                 ent = new_entry(i)
                 if ent is not None:
                     ent.inc_use()
                     self._entries[name] = self.inodes.add_entry(ent)
                     changed = True
+                _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
@@ -214,7 +214,6 @@ class Directory(FreshBase):
             self.inodes.invalidate_inode(self)
             self._mtime = time.time()
             self.inodes.inode_cache.update_cache_size(self)
-            self.inodes.inode_cache.cap_cache()
 
         for ent in self._entries.values():
            ent.dec_use()
@@ -248,7 +247,11 @@ class Directory(FreshBase):
     def kernel_invalidate(self):
         # Invalidating the dentry on the parent implies invalidating all paths
         # below it as well.
-        parent = self.inodes[self.parent_inode]
+        if self.parent_inode in self.inodes:
+            parent = self.inodes[self.parent_inode]
+        else:
+            # parent was removed already.
+            return
 
         # Find self on the parent in order to invalidate this path.
         # Calling the public items() method might trigger a refresh,
@@ -469,12 +472,16 @@ class CollectionDirectoryBase(Directory):
         super(CollectionDirectoryBase, self).clear()
         self.collection = None
 
+    def objsize(self):
+        # objsize for the whole thing is represented at the root,
+        # don't double-count it
+        return 0
 
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
-        super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
+        super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters, None, self)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -652,7 +659,10 @@ class CollectionDirectory(CollectionDirectoryBase):
     def finalize(self):
         if self.collection is not None:
             if self.writable():
-                self.collection.save()
+                try:
+                    self.collection.save()
+                except Exception as e:
+                    _logger.exception("Failed to save collection %s", self.collection_locator)
             self.collection.stop_threads()
 
     def clear(self):
@@ -784,7 +794,7 @@ and the directory will appear if it exists.
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
-        super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(MagicDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.pdh_only = pdh_only
@@ -883,7 +893,7 @@ class TagsDirectory(Directory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(TagsDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -963,7 +973,7 @@ class TagDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
                  poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(TagDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
@@ -1006,7 +1016,7 @@ class ProjectDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  project_object, poll=True, poll_time=3, storage_classes=None):
-        super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
@@ -1317,7 +1327,7 @@ class SharedDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  exclude, poll=False, poll_time=60, storage_classes=None):
-        super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+        super(SharedDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list