[arvados] updated: 2.7.0-6217-gbf4193eeaa

git repository hosting git at public.arvados.org
Wed Mar 27 21:12:13 UTC 2024


Summary of changes:
 services/fuse/arvados_fuse/__init__.py | 334 ++++++++++++++++++---------------
 services/fuse/arvados_fuse/fresh.py    |   3 +-
 services/fuse/arvados_fuse/fusedir.py  |  94 ++++++----
 services/fuse/arvados_fuse/fusefile.py |  13 ++
 services/fuse/tests/mount_test_base.py |   2 +-
 services/fuse/tests/test_inodes.py     |   6 +-
 6 files changed, 263 insertions(+), 189 deletions(-)

       via  bf4193eeaa390cec08bbb8333a53fbc89edfd7f3 (commit)
      from  6dfef004d33b147cbe80bbb5ecc6922ac25f156d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit bf4193eeaa390cec08bbb8333a53fbc89edfd7f3
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Mon Mar 25 14:19:32 2024 -0400

    21541: Code cleanup and additional memory usage improvements
    
    * Add slots to major Directory classes
    
    * Disconnect FuseArvadosFile from ArvadosFile to reduce cyclic
    references.
    
    * Clean up _remove_inode loop and use dataclasses for the inode
    operations.
    
    * Now calls del_entry on collection_record_file and project_object_file.
    
    It looks like collection_record_file was holding a reference to the
    Collection object (and was remaining in the inodes table) even when
    CollectionDirectory was cleared.  I believe this is the memory leak I
    have been looking for.
    
    * Remove the "dead" flag and set parent_inode to None instead.  This
    clarifies the behavior that directory entries keep their (numeric)
    inodes until they are detached from the directory which may have
    contributed to infrequent "file not found" errors.
    
    * Adjust cache behavior to only hold objects that are cache-eligible
    and have non-zero cache_size.  This avoids filling the cache with
    entries that are just going to be skipped over.
    
    Overall: Memory usage is mostly stable but does tend to creep up over
    time.  My best guess is that this is forced because we need to keep
    inodes in RAM as long as the kernel maintains a reference to them, so
    with multiple processes accessing different filesystem locations, this
    is simply RAM required for the working set.
    
    I'm also cautiously optimistic that issues I observed with performance
    slowing down with long-lived processes are improved (e.g. fixing
    memory leaks means no more unbounded growth of cache_entries, which
    means no more time wasted iterating over huge lists).
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 08a44c9533..2dfba46473 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -56,12 +56,6 @@ inode assigned to it and appears in the Inodes._entries dictionary.
 
 from __future__ import absolute_import
 from __future__ import division
-from future.utils import viewitems
-from future.utils import native
-from future.utils import listvalues
-from future.utils import listitems
-from future import standard_library
-standard_library.install_aliases()
 from builtins import next
 from builtins import str
 from builtins import object
@@ -81,9 +75,12 @@ import functools
 import arvados.keep
 from prometheus_client import Summary
 import queue
+from dataclasses import dataclass
+import typing
+import gc
 
 from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from .fusefile import StringFile, FuseArvadosFile
+from .fusefile import File, StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -158,7 +155,6 @@ class InodeCache(object):
 
     def __init__(self, cap, min_entries=4):
         self._cache_entries = collections.OrderedDict()
-        self._by_uuid = {}
         self.cap = cap
         self._total = 0
         self.min_entries = min_entries
@@ -181,39 +177,21 @@ class InodeCache(object):
             return
 
         _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
-        for ent in listvalues(self._cache_entries):
+
+        # Copy this into a deque for two reasons:
+        #
+        # 1. _cache_entries is modified by unmanage() which is called
+        # by _remove
+        #
+        # 2. popping off the front means the reference goes away
+        # immediately intead of sticking around for the lifetime of
+        # "values"
+        values = collections.deque(self._cache_entries.values())
+
+        while len(values) > 0:
             if self._total < self.cap or len(self._cache_entries) < self.min_entries:
                 break
-            if ent.cache_size > 0 or ent.dead:
-                # if cache_size is zero it's been cleared already
-                yield ent
-
-    def manage(self, obj):
-        """Add a new object to be cache managed.
-
-        This means evict_candidates will suggest clearing and removing
-        the inode when there is memory pressure.
-
-        """
-
-        if obj.inode in self._cache_entries:
-            return
-
-        obj.cache_size = obj.objsize()
-        self._total += obj.cache_size
-
-        self._cache_entries[obj.inode] = obj
-
-        obj.cache_uuid = obj.uuid()
-        if obj.cache_uuid:
-            if obj.cache_uuid not in self._by_uuid:
-                self._by_uuid[obj.cache_uuid] = [obj]
-            else:
-                if obj not in self._by_uuid[obj.cache_uuid]:
-                    self._by_uuid[obj.cache_uuid].append(obj)
-
-        _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
-                      obj.inode, obj.cache_size, obj.cache_uuid, self._total, len(self._cache_entries))
+            yield values.popleft()
 
     def unmanage(self, entry):
         """Stop managing an object in the cache.
@@ -230,13 +208,6 @@ class InodeCache(object):
         self._total -= entry.cache_size
         entry.cache_size = 0
 
-        # manage the mapping of uuid to object
-        if entry.cache_uuid:
-            self._by_uuid[entry.cache_uuid].remove(entry)
-            if not self._by_uuid[entry.cache_uuid]:
-                del self._by_uuid[entry.cache_uuid]
-            entry.cache_uuid = None
-
         # Now forget about it
         del self._cache_entries[entry.inode]
 
@@ -245,11 +216,24 @@ class InodeCache(object):
         object changing (usually because it has been loaded or
         cleared).
 
+        Adds or removes entries to the cache list based on the object
+        cache size.
+
         """
+
+        if not obj.persisted():
+            return
+
         if obj.inode in self._cache_entries:
             self._total -= obj.cache_size
-            obj.cache_size = obj.objsize()
+
+        obj.cache_size = obj.objsize()
+
+        if obj.cache_size > 0 or obj.parent_inode is None:
             self._total += obj.cache_size
+            self._cache_entries[obj.inode] = obj
+        elif obj.cache_size == 0 and obj.inode in self._cache_entries:
+            del self._cache_entries[obj.inode]
 
     def touch(self, obj):
         """Indicate an object was used recently, making it low
@@ -258,17 +242,44 @@ class InodeCache(object):
         """
         if obj.inode in self._cache_entries:
             self._cache_entries.move_to_end(obj.inode)
-        else:
-            self.manage(obj)
-
-    def find_by_uuid(self, uuid):
-        return self._by_uuid.get(uuid, [])
+            return True
+        return False
 
     def clear(self):
         self._cache_entries.clear()
-        self._by_uuid.clear()
         self._total = 0
 
+ at dataclass
+class RemoveInode:
+    entry: typing.Union[Directory, File]
+    def inode_op(self, inodes, locked_ops):
+        if locked_ops is None:
+            inodes._remove(self.entry)
+            return True
+        else:
+            locked_ops.append(self)
+            return False
+
+ at dataclass
+class InvalidateInode:
+    inode: int
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_inode(self.inode)
+        return True
+
+ at dataclass
+class InvalidateEntry:
+    inode: int
+    name: str
+    def inode_op(self, inodes, locked_ops):
+        llfuse.invalidate_entry(self.inode, self.name)
+        return True
+
+ at dataclass
+class EvictCandidates:
+    def inode_op(self, inodes, locked_ops):
+        return True
+
 
 class Inodes(object):
     """Manage the set of inodes.
@@ -291,6 +302,9 @@ class Inodes(object):
         self._inode_remove_thread.daemon = True
         self._inode_remove_thread.start()
 
+        self.cap_cache_event = threading.Event()
+        self._by_uuid = collections.defaultdict(list)
+
     def __getitem__(self, item):
         return self._entries[item]
 
@@ -301,7 +315,7 @@ class Inodes(object):
         return iter(self._entries.keys())
 
     def items(self):
-        return viewitems(self._entries.items())
+        return self._entries.items()
 
     def __contains__(self, k):
         return k in self._entries
@@ -313,12 +327,32 @@ class Inodes(object):
         """
 
         entry._atime = time.time()
-        self.inode_cache.touch(entry)
-        self.cap_cache()
+        if self.inode_cache.touch(entry):
+            self.cap_cache()
 
     def cap_cache(self):
         """Notify the _inode_remove thread to recheck the cache."""
-        self._inode_remove_queue.put(("evict_candidates",))
+        if not self.cap_cache_event.is_set():
+            self.cap_cache_event.set()
+            self._inode_remove_queue.put(EvictCandidates())
+
+    def update_uuid(self, entry):
+        """Update the Arvados uuid associated with an inode entry.
+
+        This is used to look up inodes that need to be invalidated
+        when a websocket event indicates the object has changed on the
+        API server.
+
+        """
+        if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].remove(entry)
+
+        entry.cache_uuid = entry.uuid()
+        if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
+            self._by_uuid[entry.cache_uuid].append(entry)
+
+        if not self._by_uuid[entry.cache_uuid]:
+            del self._by_uuid[entry.cache_uuid]
 
     def add_entry(self, entry):
         """Assign a numeric inode to a new entry."""
@@ -327,23 +361,23 @@ class Inodes(object):
         if entry.inode == llfuse.ROOT_INODE:
             entry.inc_ref()
         self._entries[entry.inode] = entry
-        if entry.persisted():
-            # only "persisted" items can be reloaded from the server
-            # making them safe to evict automatically.
-            self.inode_cache.manage(entry)
+
+        self.update_uuid(entry)
+        self.inode_cache.update_cache_size(entry)
         self.cap_cache()
         return entry
 
     def del_entry(self, entry):
         """Remove entry from the inode table.
 
-        Put a tombstone marker on it and notify the _inode_remove
-        thread to try and remove it.
+        Indicate this inode entry is pending deletion by setting
+        parent_inode to None.  Notify the _inode_remove thread to try
+        and remove it.
 
         """
 
-        entry.dead = True
-        self._inode_remove_queue.put(("remove", entry))
+        entry.parent_inode = None
+        self._inode_remove_queue.put(RemoveInode(entry))
         _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
 
     def _inode_remove(self):
@@ -355,61 +389,33 @@ class Inodes(object):
 
         locked_ops = collections.deque()
         while True:
-            try:
-                entry = self._inode_remove_queue.get(True)
-                if entry is None:
+            blocking_get = True
+            while True:
+                try:
+                    qentry = self._inode_remove_queue.get(blocking_get)
+                except queue.Empty:
+                    break
+                blocking_get = False
+                if qentry is None:
                     return
+
+                if self._shutdown_started.is_set():
+                    continue
+
                 # Process this entry
-                _logger.debug("_inode_remove %s", entry)
-                if self._inode_op(entry, locked_ops):
+                if qentry.inode_op(self, locked_ops):
                     self._inode_remove_queue.task_done()
 
-                # Drain the queue of any other entries
-                while True:
-                    try:
-                        entry = self._inode_remove_queue.get(False)
-                        if entry is None:
-                            return
-                        _logger.debug("_inode_remove %s", entry)
-                        if self._inode_op(entry, locked_ops):
-                            self._inode_remove_queue.task_done()
-                    except queue.Empty:
-                        break
-
-                with llfuse.lock:
-                    while len(locked_ops) > 0:
-                        if self._inode_op(locked_ops.popleft(), None):
-                            self._inode_remove_queue.task_done()
-                    for entry in self.inode_cache.evict_candidates():
-                        self._remove(entry)
-            except Exception as e:
-                _logger.exception("_inode_remove")
+                # Give up the reference
+                qentry = None
 
-    def _inode_op(self, op, locked_ops):
-        """Process an inode operation: attempt to remove an inode
-        entry, tell the kernel to invalidate a inode metadata or
-        directory entry, or trigger a cache check.
-
-        """
-        if self._shutdown_started.is_set():
-            return True
-        if op[0] == "remove":
-            if locked_ops is None:
-                self._remove(op[1])
-                return True
-            else:
-                locked_ops.append(op)
-                return False
-        if op[0] == "invalidate_inode":
-            _logger.debug("sending invalidate inode %i", op[1])
-            llfuse.invalidate_inode(op[1])
-            return True
-        if op[0] == "invalidate_entry":
-            _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
-            llfuse.invalidate_entry(op[1], op[2])
-            return True
-        if op[0] == "evict_candidates":
-            return True
+            with llfuse.lock:
+                while len(locked_ops) > 0:
+                    if locked_ops.popleft().inode_op(self, None):
+                        self._inode_remove_queue.task_done()
+                self.cap_cache_event.clear()
+                for entry in self.inode_cache.evict_candidates():
+                    self._remove(entry)
 
     def wait_remove_queue_empty(self):
         # used by tests
@@ -430,13 +436,7 @@ class Inodes(object):
                 # Removed already
                 return
 
-            # Tell the kernel it should forget about it.
-            entry.kernel_invalidate()
-
-            if entry.has_ref():
-                # has kernel reference, could still be accessed.
-                # when the kernel forgets about it, we can delete it.
-                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+            if entry.inode == llfuse.ROOT_INODE:
                 return
 
             if entry.in_use():
@@ -444,35 +444,38 @@ class Inodes(object):
                 #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
                 return
 
-            forget_inode = True
-            parent = self._entries.get(entry.parent_inode)
-            if (parent is not None and parent.has_ref()) or entry.inode == llfuse.ROOT_INODE:
-                # the parent is still referenced, so we'll keep the
-                # entry but wipe out the stuff under it
-                forget_inode = False
+            # Tell the kernel it should forget about it
+            entry.kernel_invalidate()
 
-            if entry.cache_size == 0 and not forget_inode:
-                # Was cleared already
+            if entry.has_ref():
+                # has kernel reference, could still be accessed.
+                # when the kernel forgets about it, we can delete it.
+                #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
                 return
 
-            if forget_inode:
-                self.inode_cache.unmanage(entry)
-
-            _logger.debug("InodeCache removing inode %i", entry.inode)
+            # commit any pending changes
+            with llfuse.lock_released:
+                entry.finalize()
 
-            # For directories, clear the contents
+            # Clear the contents
             entry.clear()
 
-            _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s, inode entries %i, type %s",
-                          entry.inode, self.inode_cache.total(), forget_inode,
-                          len(self._entries), type(entry))
-            if forget_inode:
+            if entry.parent_inode is None:
+                _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
+                              entry.inode, entry.cache_size, self.inode_cache.total(),
+                              len(self._entries), type(entry))
+
+                if entry.cache_uuid:
+                    self._by_uuid[entry.cache_uuid].remove(entry)
+                    if not self._by_uuid[entry.cache_uuid]:
+                        del self._by_uuid[entry.cache_uuid]
+                    entry.cache_uuid = None
+
+                self.inode_cache.unmanage(entry)
+
                 del self._entries[entry.inode]
                 entry.inode = None
 
-            # stop anything else
-            with llfuse.lock_released:
-                entry.finalize()
         except Exception as e:
             _logger.exception("failed remove")
 
@@ -480,13 +483,13 @@ class Inodes(object):
         if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            self._inode_remove_queue.put(("invalidate_inode", entry.inode))
+            self._inode_remove_queue.put(InvalidateInode(entry.inode))
 
     def invalidate_entry(self, entry, name):
         if entry.has_ref():
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
+            self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
 
     def begin_shutdown(self):
         self._inode_remove_queue.put(None)
@@ -499,8 +502,9 @@ class Inodes(object):
             self.begin_shutdown()
 
         self.inode_cache.clear()
+        self._by_uuid.clear()
 
-        for k,v in viewitems(self._entries):
+        for k,v in self._entries.items():
             try:
                 v.finalize()
             except Exception as e:
@@ -511,6 +515,11 @@ class Inodes(object):
     def forward_slash_subst(self):
         return self._fsns
 
+    def find_by_uuid(self, uuid):
+        """Return a list of zero or more inode entries corresponding
+        to this Arvados UUID."""
+        return self._by_uuid.get(uuid, [])
+
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -617,6 +626,13 @@ class Operations(llfuse.Operations):
         self.write_ops_counter = arvados.keep.Counter()
 
         self.events = None
+l
+        # We rely on the cyclic garbage collector to deallocate
+        # Collection objects from the Python SDK.  A lower GC
+        # threshold encourages Python to be more aggressive in
+        # reclaiming these and seems to slow down the growth in memory
+        # usage over time.
+        gc.set_threshold(200)
 
     def init(self):
         # Allow threads that are waiting for the driver to be finished
@@ -653,7 +669,8 @@ class Operations(llfuse.Operations):
     def destroy(self):
         _logger.debug("arv-mount destroy: start")
 
-        self.begin_shutdown()
+        with llfuse.lock_released:
+            self.begin_shutdown()
 
         if self.events:
             self.events.close()
@@ -683,14 +700,14 @@ class Operations(llfuse.Operations):
             old_attrs = properties.get("old_attributes") or {}
             new_attrs = properties.get("new_attributes") or {}
 
-            for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
+            for item in self.inodes.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
 
             oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
-                    self.inodes.inode_cache.find_by_uuid(oldowner) +
-                    self.inodes.inode_cache.find_by_uuid(newowner)):
+                    self.inodes.find_by_uuid(oldowner) +
+                    self.inodes.find_by_uuid(newowner)):
                 parent.invalidate()
 
     @getattr_time.time()
@@ -701,11 +718,16 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.ENOENT)
 
         e = self.inodes[inode]
+        self.inodes.touch(e)
+        parent = None
+        if e.parent_inode:
+            parent = self.inodes[e.parent_inode]
+            self.inodes.touch(parent)
 
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 0
+        entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
         entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
@@ -779,11 +801,17 @@ class Operations(llfuse.Operations):
             if name == '..':
                 inode = p.parent_inode
             elif isinstance(p, Directory) and name in p:
+                if p[name].inode is None:
+                    _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
+                                  parent_inode, name)
+                    raise llfuse.FUSEError(errno.ENOENT)
+
                 inode = p[name].inode
 
         if inode != None:
             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
                       parent_inode, name, inode)
+            self.inodes.touch(self.inodes[inode])
             self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
@@ -799,7 +827,7 @@ class Operations(llfuse.Operations):
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
-            if ent.dec_ref(nlookup) == 0 and ent.dead:
+            if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
                 self.inodes.del_entry(ent)
 
     @open_time.time()
@@ -919,10 +947,10 @@ class Operations(llfuse.Operations):
         _logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
 
         # update atime
-        self.inodes.touch(p)
         p.inc_use()
-        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
         p.dec_use()
+        self.inodes.touch(p)
         return fh
 
     @readdir_time.time()
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 366b5945bc..508ee7fb73 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -62,7 +62,7 @@ class FreshBase(object):
     """
 
     __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
-                 "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+                 "ref_count", "cache_size", "cache_uuid", "allow_attr_cache")
 
     def __init__(self):
         self._stale = True
@@ -72,7 +72,6 @@ class FreshBase(object):
         self._poll_time = 60
         self.use_count = 0
         self.ref_count = 0
-        self.dead = False
         self.cache_size = 0
         self.cache_uuid = None
 
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 05b657b036..dbb7bb83f7 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -36,6 +36,8 @@ class Directory(FreshBase):
     and the value referencing a File or Directory object.
     """
 
+    __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
+
     def __init__(self, parent_inode, inodes, enable_write, filters):
         """parent_inode is the integer inode number"""
 
@@ -136,9 +138,8 @@ class Directory(FreshBase):
         super(Directory, self).fresh()
 
     def objsize(self):
-        # This is a very rough guess of the amount of overhead involved for
-        # each directory entry (128 bytes is 16 * 8-byte pointers).
-        return len(self._entries) * 128
+        # Rough estimate of memory footprint based on using pympler
+        return len(self._entries) * 1024
 
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
@@ -171,7 +172,7 @@ class Directory(FreshBase):
                 continue
             if name in oldentries:
                 ent = oldentries[name]
-                if same(ent, i):
+                if same(ent, i) and ent.parent_inode == self.inode:
                     # move existing directory entry over
                     self._entries[name] = ent
                     del oldentries[name]
@@ -186,18 +187,22 @@ class Directory(FreshBase):
                 ent = new_entry(i)
                 if ent is not None:
                     self._entries[name] = self.inodes.add_entry(ent)
+                    # need to invalidate this just in case there was a
+                    # previous entry that couldn't be moved over or a
+                    # lookup that returned file not found and cached
+                    # a negative result
+                    self.inodes.invalidate_entry(self, name)
                     changed = True
                 _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
 
         # delete any other directory entries that were not in found in 'items'
-        for i in oldentries:
-            _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
-            self.inodes.invalidate_entry(self, i)
-            self.inodes.del_entry(oldentries[i])
+        for name, ent in oldentries.items():
+            _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
+            self.inodes.invalidate_entry(self, name)
+            self.inodes.del_entry(ent)
             changed = True
 
         if changed:
-            self.inodes.invalidate_inode(self)
             self._mtime = time.time()
             self.inodes.inode_cache.update_cache_size(self)
 
@@ -213,11 +218,15 @@ class Directory(FreshBase):
 
     def clear(self):
         """Delete all entries"""
+        if len(self._entries) == 0:
+            return
         oldentries = self._entries
         self._entries = {}
         self.invalidate()
-        for n in oldentries:
-            self.inodes.del_entry(oldentries[n])
+        for name, ent in oldentries.items():
+            ent.clear()
+            self.inodes.invalidate_entry(self, name)
+            self.inodes.del_entry(ent)
         self.inodes.inode_cache.update_cache_size(self)
 
     def kernel_invalidate(self):
@@ -237,8 +246,6 @@ class Directory(FreshBase):
                 self.inodes.invalidate_entry(parent, k)
                 break
 
-        self.inodes.invalidate_inode(self)
-
     def mtime(self):
         return self._mtime
 
@@ -282,6 +289,8 @@ class CollectionDirectoryBase(Directory):
 
     """
 
+    __slots__ = ("collection", "collection_root", "collection_record_file")
+
     def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
         self.collection = collection
@@ -291,11 +300,11 @@ class CollectionDirectoryBase(Directory):
     def new_entry(self, name, item, mtime):
         name = self.sanitize_filename(name)
         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
-            if item.fuse_entry.dead is not True:
-                raise Exception("Can only reparent dead inode entry")
+            if item.fuse_entry.parent_inode is not None:
+                raise Exception("Can only reparent unparented inode entry")
             if item.fuse_entry.inode is None:
                 raise Exception("Reparented entry must still have valid inode")
-            item.fuse_entry.dead = False
+            item.fuse_entry.parent_inode = self.inode
             self._entries[name] = item.fuse_entry
         elif isinstance(item, arvados.collection.RichCollectionBase):
             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
@@ -446,6 +455,8 @@ class CollectionDirectoryBase(Directory):
 
     def clear(self):
         super(CollectionDirectoryBase, self).clear()
+        if self.collection is not None:
+            self.collection.unsubscribe()
         self.collection = None
 
     def objsize(self):
@@ -456,6 +467,9 @@ class CollectionDirectoryBase(Directory):
 class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree representing a collection."""
 
+    __slots__ = ("api", "num_retries", "collection_locator",
+                 "_manifest_size", "_writable", "_updating_lock")
+
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
         super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
         self.api = api
@@ -515,7 +529,9 @@ class CollectionDirectory(CollectionDirectoryBase):
         if self.collection_record_file is not None:
             self.collection_record_file.invalidate()
             self.inodes.invalidate_inode(self.collection_record_file)
-            _logger.debug("%s invalidated collection record file", self)
+            _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
+                          self.collection_record_file.inode)
+        self.inodes.update_uuid(self)
         self.inodes.inode_cache.update_cache_size(self)
         self.fresh()
 
@@ -594,6 +610,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         return False
 
     @use_counter
+    @check_update
     def collection_record(self):
         self.flush()
         return self.collection.api_response()
@@ -627,28 +644,31 @@ class CollectionDirectory(CollectionDirectoryBase):
         return (self.collection_locator is not None)
 
     def objsize(self):
-        # This is a very rough guess of the amount of overhead
-        # involved for a collection; you've got the manifest text
-        # itself which is not discarded by the Collection class, then
-        # the block identifiers that get copied into their own
-        # strings, then the rest of the overhead of the Python
-        # objects.
-        return self._manifest_size * 4
+        # This is a rough guess of the amount of overhead involved for
+        # a collection; the calculation is each file averages 128
+        # bytes in the manifest, but consume 1024 bytes when blown up
+        # into Python data structures.
+        return self._manifest_size * 8
 
     def finalize(self):
-        if self.collection is not None:
-            if self.writable():
-                try:
-                    self.collection.save()
-                except Exception as e:
-                    _logger.exception("Failed to save collection %s", self.collection_locator)
-            self.collection.stop_threads()
+        if self.collection is None:
+            return
+
+        if self.writable():
+            try:
+                self.collection.save()
+            except Exception as e:
+                _logger.exception("Failed to save collection %s", self.collection_locator)
+        self.collection.stop_threads()
 
     def clear(self):
         if self.collection is not None:
             self.collection.stop_threads()
         self._manifest_size = 0
         super(CollectionDirectory, self).clear()
+        if self.collection_record_file is not None:
+            self.inodes.del_entry(self.collection_record_file)
+        self.collection_record_file = None
 
 
 class TmpCollectionDirectory(CollectionDirectoryBase):
@@ -697,7 +717,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
                 with self.collection.lock:
                     self.collection_record_file.invalidate()
                     self.inodes.invalidate_inode(self.collection_record_file)
-                    _logger.debug("%s invalidated collection record", self)
+                    _logger.debug("%s invalidated collection record", self.inode)
         finally:
             while lockcount > 0:
                 self.collection.lock.acquire()
@@ -992,6 +1012,10 @@ class TagDirectory(Directory):
 class ProjectDirectory(Directory):
     """A special directory that contains the contents of a project."""
 
+    __slots__ = ("api", "num_retries", "project_object", "project_object_file",
+                 "project_uuid", "_updating_lock",
+                 "_current_user", "_full_listing", "storage_classes", "recursively_contained")
+
     def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
                  project_object, poll=True, poll_time=3, storage_classes=None):
         super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
@@ -1195,6 +1219,12 @@ class ProjectDirectory(Directory):
     def persisted(self):
         return True
 
+    def clear(self):
+        super(ProjectDirectory, self).clear()
+        if self.project_object_file is not None:
+            self.inodes.del_entry(self.project_object_file)
+        self.project_object_file = None
+
     @use_counter
     @check_update
     def mkdir(self, name):
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 45d3db16fe..9279f7d99d 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -80,9 +80,17 @@ class FuseArvadosFile(File):
             if self.writable():
                 self.arvfile.parent.root_collection().save()
 
+    def clear(self):
+        if self.parent_inode is None:
+            self.arvfile.fuse_entry = None
+            self.arvfile = None
+
 
 class StringFile(File):
     """Wrap a simple string as a file"""
+
+    __slots__ = ("contents",)
+
     def __init__(self, parent_inode, contents, _mtime):
         super(StringFile, self).__init__(parent_inode, _mtime)
         self.contents = contents
@@ -97,6 +105,8 @@ class StringFile(File):
 class ObjectFile(StringFile):
     """Wrap a dict as a serialized json object."""
 
+    __slots__ = ("object_uuid",)
+
     def __init__(self, parent_inode, obj):
         super(ObjectFile, self).__init__(parent_inode, "", 0)
         self.object_uuid = obj['uuid']
@@ -125,6 +135,9 @@ class FuncToJSONFile(StringFile):
     The function is called at the time the file is read. The result is
     cached until invalidate() is called.
     """
+
+    __slots__ = ("func",)
+
     def __init__(self, parent_inode, func):
         super(FuncToJSONFile, self).__init__(parent_inode, "", 0)
         self.func = func
diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py
index e0479d3668..02f4009724 100644
--- a/services/fuse/tests/mount_test_base.py
+++ b/services/fuse/tests/mount_test_base.py
@@ -105,7 +105,7 @@ class MountTestBase(unittest.TestCase):
             self.llfuse_thread.join(timeout=60)
             if self.llfuse_thread.is_alive():
                 logger.warning("MountTestBase.tearDown():"
-                               " llfuse thread still alive 20s after umount"
+                               " llfuse thread still alive 60s after umount"
                                " -- exiting with SIGKILL")
                 os.kill(os.getpid(), signal.SIGKILL)
             waited = time.time() - t0
diff --git a/services/fuse/tests/test_inodes.py b/services/fuse/tests/test_inodes.py
index aaef8e4b57..c5c92a9b3f 100644
--- a/services/fuse/tests/test_inodes.py
+++ b/services/fuse/tests/test_inodes.py
@@ -81,13 +81,16 @@ class InodeTests(unittest.TestCase):
 
         # Change min_entries
         cache.min_entries = 1
+        ent1.parent_inode = None
         inodes.cap_cache()
         inodes.wait_remove_queue_empty()
         self.assertEqual(600, cache.total())
         self.assertTrue(ent1.clear.called)
 
         # Touching ent1 should cause ent3 to get cleared
+        ent3.parent_inode = None
         self.assertFalse(ent3.clear.called)
+        inodes.inode_cache.update_cache_size(ent1)
         inodes.touch(ent1)
         inodes.wait_remove_queue_empty()
         self.assertTrue(ent3.clear.called)
@@ -132,6 +135,7 @@ class InodeTests(unittest.TestCase):
         ent3.has_ref.return_value = False
         ent1.clear.called = False
         ent3.clear.called = False
+        ent3.parent_inode = None
         inodes.touch(ent3)
         inodes.wait_remove_queue_empty()
         self.assertFalse(ent1.clear.called)
@@ -164,6 +168,6 @@ class InodeTests(unittest.TestCase):
         inodes.wait_remove_queue_empty()
         self.assertEqual(0, cache.total())
 
-        inodes.touch(ent3)
+        inodes.add_entry(ent3)
         inodes.wait_remove_queue_empty()
         self.assertEqual(600, cache.total())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list