[arvados] updated: 2.7.0-6217-gbf4193eeaa
git repository hosting
git at public.arvados.org
Wed Mar 27 21:12:13 UTC 2024
Summary of changes:
services/fuse/arvados_fuse/__init__.py | 334 ++++++++++++++++++---------------
services/fuse/arvados_fuse/fresh.py | 3 +-
services/fuse/arvados_fuse/fusedir.py | 94 ++++++----
services/fuse/arvados_fuse/fusefile.py | 13 ++
services/fuse/tests/mount_test_base.py | 2 +-
services/fuse/tests/test_inodes.py | 6 +-
6 files changed, 263 insertions(+), 189 deletions(-)
via bf4193eeaa390cec08bbb8333a53fbc89edfd7f3 (commit)
from 6dfef004d33b147cbe80bbb5ecc6922ac25f156d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit bf4193eeaa390cec08bbb8333a53fbc89edfd7f3
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Mon Mar 25 14:19:32 2024 -0400
21541: Code cleanup and additional memory usage improvements
* Add slots to major Directory classes
* Disconnect FuseArvadosFile from ArvadosFile to reduce cyclic
references.
* Clean up _remove_inode loop and use dataclasses for the inode
operations.
* Now calls del_entry on collection_record_file and project_object_file.
It looks like collection_record_file was holding a reference to the
Collection object (and was remaining in the inodes table) even when
CollectionDirectory was cleared. I believe this is the memory leak I
have been looking for.
* Remove the "dead" flag and set parent_inode to None instead. This
clarifies the behavior that directory entries keep their (numeric)
inodes until they are detached from the directory which may have
contributed to infrequent "file not found" errors.
* Adjust cache behavior to only hold objects that are cache-eligible
and have non-zero cache_size. This avoids filling the cache with
entries that are just going to be skipped over.
Overall: Memory usage is mostly stable but does tend to creep up over
time. My best guess is that this is forced because we need to keep
inodes in RAM as long as the kernel maintains a reference to them, so
with multiple processes accessing different filesystem locations, this
is simply RAM required for the working set.
I'm also cautiously optimistic that issues I observed with performance
slowing down with long-lived processes are improved (e.g. fixing
memory leaks means no more unbounded growth of cache_entries, which
means no more time wasted iterating over huge lists).
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 08a44c9533..2dfba46473 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -56,12 +56,6 @@ inode assigned to it and appears in the Inodes._entries dictionary.
from __future__ import absolute_import
from __future__ import division
-from future.utils import viewitems
-from future.utils import native
-from future.utils import listvalues
-from future.utils import listitems
-from future import standard_library
-standard_library.install_aliases()
from builtins import next
from builtins import str
from builtins import object
@@ -81,9 +75,12 @@ import functools
import arvados.keep
from prometheus_client import Summary
import queue
+from dataclasses import dataclass
+import typing
+import gc
from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from .fusefile import StringFile, FuseArvadosFile
+from .fusefile import File, StringFile, FuseArvadosFile
_logger = logging.getLogger('arvados.arvados_fuse')
@@ -158,7 +155,6 @@ class InodeCache(object):
def __init__(self, cap, min_entries=4):
self._cache_entries = collections.OrderedDict()
- self._by_uuid = {}
self.cap = cap
self._total = 0
self.min_entries = min_entries
@@ -181,39 +177,21 @@ class InodeCache(object):
return
_logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
- for ent in listvalues(self._cache_entries):
+
+ # Copy this into a deque for two reasons:
+ #
+ # 1. _cache_entries is modified by unmanage() which is called
+ # by _remove
+ #
+ # 2. popping off the front means the reference goes away
+ # immediately intead of sticking around for the lifetime of
+ # "values"
+ values = collections.deque(self._cache_entries.values())
+
+ while len(values) > 0:
if self._total < self.cap or len(self._cache_entries) < self.min_entries:
break
- if ent.cache_size > 0 or ent.dead:
- # if cache_size is zero it's been cleared already
- yield ent
-
- def manage(self, obj):
- """Add a new object to be cache managed.
-
- This means evict_candidates will suggest clearing and removing
- the inode when there is memory pressure.
-
- """
-
- if obj.inode in self._cache_entries:
- return
-
- obj.cache_size = obj.objsize()
- self._total += obj.cache_size
-
- self._cache_entries[obj.inode] = obj
-
- obj.cache_uuid = obj.uuid()
- if obj.cache_uuid:
- if obj.cache_uuid not in self._by_uuid:
- self._by_uuid[obj.cache_uuid] = [obj]
- else:
- if obj not in self._by_uuid[obj.cache_uuid]:
- self._by_uuid[obj.cache_uuid].append(obj)
-
- _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
- obj.inode, obj.cache_size, obj.cache_uuid, self._total, len(self._cache_entries))
+ yield values.popleft()
def unmanage(self, entry):
"""Stop managing an object in the cache.
@@ -230,13 +208,6 @@ class InodeCache(object):
self._total -= entry.cache_size
entry.cache_size = 0
- # manage the mapping of uuid to object
- if entry.cache_uuid:
- self._by_uuid[entry.cache_uuid].remove(entry)
- if not self._by_uuid[entry.cache_uuid]:
- del self._by_uuid[entry.cache_uuid]
- entry.cache_uuid = None
-
# Now forget about it
del self._cache_entries[entry.inode]
@@ -245,11 +216,24 @@ class InodeCache(object):
object changing (usually because it has been loaded or
cleared).
+ Adds or removes entries to the cache list based on the object
+ cache size.
+
"""
+
+ if not obj.persisted():
+ return
+
if obj.inode in self._cache_entries:
self._total -= obj.cache_size
- obj.cache_size = obj.objsize()
+
+ obj.cache_size = obj.objsize()
+
+ if obj.cache_size > 0 or obj.parent_inode is None:
self._total += obj.cache_size
+ self._cache_entries[obj.inode] = obj
+ elif obj.cache_size == 0 and obj.inode in self._cache_entries:
+ del self._cache_entries[obj.inode]
def touch(self, obj):
"""Indicate an object was used recently, making it low
@@ -258,17 +242,44 @@ class InodeCache(object):
"""
if obj.inode in self._cache_entries:
self._cache_entries.move_to_end(obj.inode)
- else:
- self.manage(obj)
-
- def find_by_uuid(self, uuid):
- return self._by_uuid.get(uuid, [])
+ return True
+ return False
def clear(self):
self._cache_entries.clear()
- self._by_uuid.clear()
self._total = 0
+ at dataclass
+class RemoveInode:
+ entry: typing.Union[Directory, File]
+ def inode_op(self, inodes, locked_ops):
+ if locked_ops is None:
+ inodes._remove(self.entry)
+ return True
+ else:
+ locked_ops.append(self)
+ return False
+
+ at dataclass
+class InvalidateInode:
+ inode: int
+ def inode_op(self, inodes, locked_ops):
+ llfuse.invalidate_inode(self.inode)
+ return True
+
+ at dataclass
+class InvalidateEntry:
+ inode: int
+ name: str
+ def inode_op(self, inodes, locked_ops):
+ llfuse.invalidate_entry(self.inode, self.name)
+ return True
+
+ at dataclass
+class EvictCandidates:
+ def inode_op(self, inodes, locked_ops):
+ return True
+
class Inodes(object):
"""Manage the set of inodes.
@@ -291,6 +302,9 @@ class Inodes(object):
self._inode_remove_thread.daemon = True
self._inode_remove_thread.start()
+ self.cap_cache_event = threading.Event()
+ self._by_uuid = collections.defaultdict(list)
+
def __getitem__(self, item):
return self._entries[item]
@@ -301,7 +315,7 @@ class Inodes(object):
return iter(self._entries.keys())
def items(self):
- return viewitems(self._entries.items())
+ return self._entries.items()
def __contains__(self, k):
return k in self._entries
@@ -313,12 +327,32 @@ class Inodes(object):
"""
entry._atime = time.time()
- self.inode_cache.touch(entry)
- self.cap_cache()
+ if self.inode_cache.touch(entry):
+ self.cap_cache()
def cap_cache(self):
"""Notify the _inode_remove thread to recheck the cache."""
- self._inode_remove_queue.put(("evict_candidates",))
+ if not self.cap_cache_event.is_set():
+ self.cap_cache_event.set()
+ self._inode_remove_queue.put(EvictCandidates())
+
+ def update_uuid(self, entry):
+ """Update the Arvados uuid associated with an inode entry.
+
+ This is used to look up inodes that need to be invalidated
+ when a websocket event indicates the object has changed on the
+ API server.
+
+ """
+ if entry.cache_uuid and entry in self._by_uuid[entry.cache_uuid]:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+
+ entry.cache_uuid = entry.uuid()
+ if entry.cache_uuid and entry not in self._by_uuid[entry.cache_uuid]:
+ self._by_uuid[entry.cache_uuid].append(entry)
+
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
def add_entry(self, entry):
"""Assign a numeric inode to a new entry."""
@@ -327,23 +361,23 @@ class Inodes(object):
if entry.inode == llfuse.ROOT_INODE:
entry.inc_ref()
self._entries[entry.inode] = entry
- if entry.persisted():
- # only "persisted" items can be reloaded from the server
- # making them safe to evict automatically.
- self.inode_cache.manage(entry)
+
+ self.update_uuid(entry)
+ self.inode_cache.update_cache_size(entry)
self.cap_cache()
return entry
def del_entry(self, entry):
"""Remove entry from the inode table.
- Put a tombstone marker on it and notify the _inode_remove
- thread to try and remove it.
+ Indicate this inode entry is pending deletion by setting
+ parent_inode to None. Notify the _inode_remove thread to try
+ and remove it.
"""
- entry.dead = True
- self._inode_remove_queue.put(("remove", entry))
+ entry.parent_inode = None
+ self._inode_remove_queue.put(RemoveInode(entry))
_logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
def _inode_remove(self):
@@ -355,61 +389,33 @@ class Inodes(object):
locked_ops = collections.deque()
while True:
- try:
- entry = self._inode_remove_queue.get(True)
- if entry is None:
+ blocking_get = True
+ while True:
+ try:
+ qentry = self._inode_remove_queue.get(blocking_get)
+ except queue.Empty:
+ break
+ blocking_get = False
+ if qentry is None:
return
+
+ if self._shutdown_started.is_set():
+ continue
+
# Process this entry
- _logger.debug("_inode_remove %s", entry)
- if self._inode_op(entry, locked_ops):
+ if qentry.inode_op(self, locked_ops):
self._inode_remove_queue.task_done()
- # Drain the queue of any other entries
- while True:
- try:
- entry = self._inode_remove_queue.get(False)
- if entry is None:
- return
- _logger.debug("_inode_remove %s", entry)
- if self._inode_op(entry, locked_ops):
- self._inode_remove_queue.task_done()
- except queue.Empty:
- break
-
- with llfuse.lock:
- while len(locked_ops) > 0:
- if self._inode_op(locked_ops.popleft(), None):
- self._inode_remove_queue.task_done()
- for entry in self.inode_cache.evict_candidates():
- self._remove(entry)
- except Exception as e:
- _logger.exception("_inode_remove")
+ # Give up the reference
+ qentry = None
- def _inode_op(self, op, locked_ops):
- """Process an inode operation: attempt to remove an inode
- entry, tell the kernel to invalidate a inode metadata or
- directory entry, or trigger a cache check.
-
- """
- if self._shutdown_started.is_set():
- return True
- if op[0] == "remove":
- if locked_ops is None:
- self._remove(op[1])
- return True
- else:
- locked_ops.append(op)
- return False
- if op[0] == "invalidate_inode":
- _logger.debug("sending invalidate inode %i", op[1])
- llfuse.invalidate_inode(op[1])
- return True
- if op[0] == "invalidate_entry":
- _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
- llfuse.invalidate_entry(op[1], op[2])
- return True
- if op[0] == "evict_candidates":
- return True
+ with llfuse.lock:
+ while len(locked_ops) > 0:
+ if locked_ops.popleft().inode_op(self, None):
+ self._inode_remove_queue.task_done()
+ self.cap_cache_event.clear()
+ for entry in self.inode_cache.evict_candidates():
+ self._remove(entry)
def wait_remove_queue_empty(self):
# used by tests
@@ -430,13 +436,7 @@ class Inodes(object):
# Removed already
return
- # Tell the kernel it should forget about it.
- entry.kernel_invalidate()
-
- if entry.has_ref():
- # has kernel reference, could still be accessed.
- # when the kernel forgets about it, we can delete it.
- #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+ if entry.inode == llfuse.ROOT_INODE:
return
if entry.in_use():
@@ -444,35 +444,38 @@ class Inodes(object):
#_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
return
- forget_inode = True
- parent = self._entries.get(entry.parent_inode)
- if (parent is not None and parent.has_ref()) or entry.inode == llfuse.ROOT_INODE:
- # the parent is still referenced, so we'll keep the
- # entry but wipe out the stuff under it
- forget_inode = False
+ # Tell the kernel it should forget about it
+ entry.kernel_invalidate()
- if entry.cache_size == 0 and not forget_inode:
- # Was cleared already
+ if entry.has_ref():
+ # has kernel reference, could still be accessed.
+ # when the kernel forgets about it, we can delete it.
+ #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
return
- if forget_inode:
- self.inode_cache.unmanage(entry)
-
- _logger.debug("InodeCache removing inode %i", entry.inode)
+ # commit any pending changes
+ with llfuse.lock_released:
+ entry.finalize()
- # For directories, clear the contents
+ # Clear the contents
entry.clear()
- _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s, inode entries %i, type %s",
- entry.inode, self.inode_cache.total(), forget_inode,
- len(self._entries), type(entry))
- if forget_inode:
+ if entry.parent_inode is None:
+ _logger.debug("InodeCache forgetting inode %i, object cache_size %i, cache total %i, forget_inode True, inode entries %i, type %s",
+ entry.inode, entry.cache_size, self.inode_cache.total(),
+ len(self._entries), type(entry))
+
+ if entry.cache_uuid:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
+ entry.cache_uuid = None
+
+ self.inode_cache.unmanage(entry)
+
del self._entries[entry.inode]
entry.inode = None
- # stop anything else
- with llfuse.lock_released:
- entry.finalize()
except Exception as e:
_logger.exception("failed remove")
@@ -480,13 +483,13 @@ class Inodes(object):
if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- self._inode_remove_queue.put(("invalidate_inode", entry.inode))
+ self._inode_remove_queue.put(InvalidateInode(entry.inode))
def invalidate_entry(self, entry, name):
if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
+ self._inode_remove_queue.put(InvalidateEntry(entry.inode, name.encode(self.encoding)))
def begin_shutdown(self):
self._inode_remove_queue.put(None)
@@ -499,8 +502,9 @@ class Inodes(object):
self.begin_shutdown()
self.inode_cache.clear()
+ self._by_uuid.clear()
- for k,v in viewitems(self._entries):
+ for k,v in self._entries.items():
try:
v.finalize()
except Exception as e:
@@ -511,6 +515,11 @@ class Inodes(object):
def forward_slash_subst(self):
return self._fsns
+ def find_by_uuid(self, uuid):
+ """Return a list of zero or more inode entries corresponding
+ to this Arvados UUID."""
+ return self._by_uuid.get(uuid, [])
+
def catch_exceptions(orig_func):
"""Catch uncaught exceptions and log them consistently."""
@@ -617,6 +626,13 @@ class Operations(llfuse.Operations):
self.write_ops_counter = arvados.keep.Counter()
self.events = None
+l
+ # We rely on the cyclic garbage collector to deallocate
+ # Collection objects from the Python SDK. A lower GC
+ # threshold encourages Python to be more aggressive in
+ # reclaiming these and seems to slow down the growth in memory
+ # usage over time.
+ gc.set_threshold(200)
def init(self):
# Allow threads that are waiting for the driver to be finished
@@ -653,7 +669,8 @@ class Operations(llfuse.Operations):
def destroy(self):
_logger.debug("arv-mount destroy: start")
- self.begin_shutdown()
+ with llfuse.lock_released:
+ self.begin_shutdown()
if self.events:
self.events.close()
@@ -683,14 +700,14 @@ class Operations(llfuse.Operations):
old_attrs = properties.get("old_attributes") or {}
new_attrs = properties.get("new_attributes") or {}
- for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
+ for item in self.inodes.find_by_uuid(ev["object_uuid"]):
item.invalidate()
oldowner = old_attrs.get("owner_uuid")
newowner = ev.get("object_owner_uuid")
for parent in (
- self.inodes.inode_cache.find_by_uuid(oldowner) +
- self.inodes.inode_cache.find_by_uuid(newowner)):
+ self.inodes.find_by_uuid(oldowner) +
+ self.inodes.find_by_uuid(newowner)):
parent.invalidate()
@getattr_time.time()
@@ -701,11 +718,16 @@ class Operations(llfuse.Operations):
raise llfuse.FUSEError(errno.ENOENT)
e = self.inodes[inode]
+ self.inodes.touch(e)
+ parent = None
+ if e.parent_inode:
+ parent = self.inodes[e.parent_inode]
+ self.inodes.touch(parent)
entry = llfuse.EntryAttributes()
entry.st_ino = inode
entry.generation = 0
- entry.entry_timeout = 0
+ entry.entry_timeout = parent.time_to_next_poll() if parent is not None else 0
entry.attr_timeout = e.time_to_next_poll() if e.allow_attr_cache else 0
entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
@@ -779,11 +801,17 @@ class Operations(llfuse.Operations):
if name == '..':
inode = p.parent_inode
elif isinstance(p, Directory) and name in p:
+ if p[name].inode is None:
+ _logger.debug("arv-mount lookup: parent_inode %i name '%s' found but inode was None",
+ parent_inode, name)
+ raise llfuse.FUSEError(errno.ENOENT)
+
inode = p[name].inode
if inode != None:
_logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
parent_inode, name, inode)
+ self.inodes.touch(self.inodes[inode])
self.inodes[inode].inc_ref()
return self.getattr(inode)
else:
@@ -799,7 +827,7 @@ class Operations(llfuse.Operations):
for inode, nlookup in inodes:
ent = self.inodes[inode]
_logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
- if ent.dec_ref(nlookup) == 0 and ent.dead:
+ if ent.dec_ref(nlookup) == 0 and ent.parent_inode is None:
self.inodes.del_entry(ent)
@open_time.time()
@@ -919,10 +947,10 @@ class Operations(llfuse.Operations):
_logger.debug("arv-mount opendir: inode %i fh %i ", inode, fh)
# update atime
- self.inodes.touch(p)
p.inc_use()
- self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+ self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + p.items())
p.dec_use()
+ self.inodes.touch(p)
return fh
@readdir_time.time()
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 366b5945bc..508ee7fb73 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -62,7 +62,7 @@ class FreshBase(object):
"""
__slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
- "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+ "ref_count", "cache_size", "cache_uuid", "allow_attr_cache")
def __init__(self):
self._stale = True
@@ -72,7 +72,6 @@ class FreshBase(object):
self._poll_time = 60
self.use_count = 0
self.ref_count = 0
- self.dead = False
self.cache_size = 0
self.cache_uuid = None
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 05b657b036..dbb7bb83f7 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -36,6 +36,8 @@ class Directory(FreshBase):
and the value referencing a File or Directory object.
"""
+ __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
+
def __init__(self, parent_inode, inodes, enable_write, filters):
"""parent_inode is the integer inode number"""
@@ -136,9 +138,8 @@ class Directory(FreshBase):
super(Directory, self).fresh()
def objsize(self):
- # This is a very rough guess of the amount of overhead involved for
- # each directory entry (128 bytes is 16 * 8-byte pointers).
- return len(self._entries) * 128
+ # Rough estimate of memory footprint based on using pympler
+ return len(self._entries) * 1024
def merge(self, items, fn, same, new_entry):
"""Helper method for updating the contents of the directory.
@@ -171,7 +172,7 @@ class Directory(FreshBase):
continue
if name in oldentries:
ent = oldentries[name]
- if same(ent, i):
+ if same(ent, i) and ent.parent_inode == self.inode:
# move existing directory entry over
self._entries[name] = ent
del oldentries[name]
@@ -186,18 +187,22 @@ class Directory(FreshBase):
ent = new_entry(i)
if ent is not None:
self._entries[name] = self.inodes.add_entry(ent)
+ # need to invalidate this just in case there was a
+ # previous entry that couldn't be moved over or a
+ # lookup that returned file not found and cached
+ # a negative result
+ self.inodes.invalidate_entry(self, name)
changed = True
_logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
# delete any other directory entries that were not in found in 'items'
- for i in oldentries:
- _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
- self.inodes.invalidate_entry(self, i)
- self.inodes.del_entry(oldentries[i])
+ for name, ent in oldentries.items():
+ _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
changed = True
if changed:
- self.inodes.invalidate_inode(self)
self._mtime = time.time()
self.inodes.inode_cache.update_cache_size(self)
@@ -213,11 +218,15 @@ class Directory(FreshBase):
def clear(self):
"""Delete all entries"""
+ if len(self._entries) == 0:
+ return
oldentries = self._entries
self._entries = {}
self.invalidate()
- for n in oldentries:
- self.inodes.del_entry(oldentries[n])
+ for name, ent in oldentries.items():
+ ent.clear()
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
self.inodes.inode_cache.update_cache_size(self)
def kernel_invalidate(self):
@@ -237,8 +246,6 @@ class Directory(FreshBase):
self.inodes.invalidate_entry(parent, k)
break
- self.inodes.invalidate_inode(self)
-
def mtime(self):
return self._mtime
@@ -282,6 +289,8 @@ class CollectionDirectoryBase(Directory):
"""
+ __slots__ = ("collection", "collection_root", "collection_record_file")
+
def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
self.collection = collection
@@ -291,11 +300,11 @@ class CollectionDirectoryBase(Directory):
def new_entry(self, name, item, mtime):
name = self.sanitize_filename(name)
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
- if item.fuse_entry.dead is not True:
- raise Exception("Can only reparent dead inode entry")
+ if item.fuse_entry.parent_inode is not None:
+ raise Exception("Can only reparent unparented inode entry")
if item.fuse_entry.inode is None:
raise Exception("Reparented entry must still have valid inode")
- item.fuse_entry.dead = False
+ item.fuse_entry.parent_inode = self.inode
self._entries[name] = item.fuse_entry
elif isinstance(item, arvados.collection.RichCollectionBase):
self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
@@ -446,6 +455,8 @@ class CollectionDirectoryBase(Directory):
def clear(self):
super(CollectionDirectoryBase, self).clear()
+ if self.collection is not None:
+ self.collection.unsubscribe()
self.collection = None
def objsize(self):
@@ -456,6 +467,9 @@ class CollectionDirectoryBase(Directory):
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
+ __slots__ = ("api", "num_retries", "collection_locator",
+ "_manifest_size", "_writable", "_updating_lock")
+
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
self.api = api
@@ -515,7 +529,9 @@ class CollectionDirectory(CollectionDirectoryBase):
if self.collection_record_file is not None:
self.collection_record_file.invalidate()
self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record file", self)
+ _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
+ self.collection_record_file.inode)
+ self.inodes.update_uuid(self)
self.inodes.inode_cache.update_cache_size(self)
self.fresh()
@@ -594,6 +610,7 @@ class CollectionDirectory(CollectionDirectoryBase):
return False
@use_counter
+ @check_update
def collection_record(self):
self.flush()
return self.collection.api_response()
@@ -627,28 +644,31 @@ class CollectionDirectory(CollectionDirectoryBase):
return (self.collection_locator is not None)
def objsize(self):
- # This is a very rough guess of the amount of overhead
- # involved for a collection; you've got the manifest text
- # itself which is not discarded by the Collection class, then
- # the block identifiers that get copied into their own
- # strings, then the rest of the overhead of the Python
- # objects.
- return self._manifest_size * 4
+ # This is a rough guess of the amount of overhead involved for
+ # a collection; the calculation is each file averages 128
+ # bytes in the manifest, but consume 1024 bytes when blown up
+ # into Python data structures.
+ return self._manifest_size * 8
def finalize(self):
- if self.collection is not None:
- if self.writable():
- try:
- self.collection.save()
- except Exception as e:
- _logger.exception("Failed to save collection %s", self.collection_locator)
- self.collection.stop_threads()
+ if self.collection is None:
+ return
+
+ if self.writable():
+ try:
+ self.collection.save()
+ except Exception as e:
+ _logger.exception("Failed to save collection %s", self.collection_locator)
+ self.collection.stop_threads()
def clear(self):
if self.collection is not None:
self.collection.stop_threads()
self._manifest_size = 0
super(CollectionDirectory, self).clear()
+ if self.collection_record_file is not None:
+ self.inodes.del_entry(self.collection_record_file)
+ self.collection_record_file = None
class TmpCollectionDirectory(CollectionDirectoryBase):
@@ -697,7 +717,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
with self.collection.lock:
self.collection_record_file.invalidate()
self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record", self)
+ _logger.debug("%s invalidated collection record", self.inode)
finally:
while lockcount > 0:
self.collection.lock.acquire()
@@ -992,6 +1012,10 @@ class TagDirectory(Directory):
class ProjectDirectory(Directory):
"""A special directory that contains the contents of a project."""
+ __slots__ = ("api", "num_retries", "project_object", "project_object_file",
+ "project_uuid", "_updating_lock",
+ "_current_user", "_full_listing", "storage_classes", "recursively_contained")
+
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
project_object, poll=True, poll_time=3, storage_classes=None):
super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
@@ -1195,6 +1219,12 @@ class ProjectDirectory(Directory):
def persisted(self):
return True
+ def clear(self):
+ super(ProjectDirectory, self).clear()
+ if self.project_object_file is not None:
+ self.inodes.del_entry(self.project_object_file)
+ self.project_object_file = None
+
@use_counter
@check_update
def mkdir(self, name):
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 45d3db16fe..9279f7d99d 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -80,9 +80,17 @@ class FuseArvadosFile(File):
if self.writable():
self.arvfile.parent.root_collection().save()
+ def clear(self):
+ if self.parent_inode is None:
+ self.arvfile.fuse_entry = None
+ self.arvfile = None
+
class StringFile(File):
"""Wrap a simple string as a file"""
+
+ __slots__ = ("contents",)
+
def __init__(self, parent_inode, contents, _mtime):
super(StringFile, self).__init__(parent_inode, _mtime)
self.contents = contents
@@ -97,6 +105,8 @@ class StringFile(File):
class ObjectFile(StringFile):
"""Wrap a dict as a serialized json object."""
+ __slots__ = ("object_uuid",)
+
def __init__(self, parent_inode, obj):
super(ObjectFile, self).__init__(parent_inode, "", 0)
self.object_uuid = obj['uuid']
@@ -125,6 +135,9 @@ class FuncToJSONFile(StringFile):
The function is called at the time the file is read. The result is
cached until invalidate() is called.
"""
+
+ __slots__ = ("func",)
+
def __init__(self, parent_inode, func):
super(FuncToJSONFile, self).__init__(parent_inode, "", 0)
self.func = func
diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py
index e0479d3668..02f4009724 100644
--- a/services/fuse/tests/mount_test_base.py
+++ b/services/fuse/tests/mount_test_base.py
@@ -105,7 +105,7 @@ class MountTestBase(unittest.TestCase):
self.llfuse_thread.join(timeout=60)
if self.llfuse_thread.is_alive():
logger.warning("MountTestBase.tearDown():"
- " llfuse thread still alive 20s after umount"
+ " llfuse thread still alive 60s after umount"
" -- exiting with SIGKILL")
os.kill(os.getpid(), signal.SIGKILL)
waited = time.time() - t0
diff --git a/services/fuse/tests/test_inodes.py b/services/fuse/tests/test_inodes.py
index aaef8e4b57..c5c92a9b3f 100644
--- a/services/fuse/tests/test_inodes.py
+++ b/services/fuse/tests/test_inodes.py
@@ -81,13 +81,16 @@ class InodeTests(unittest.TestCase):
# Change min_entries
cache.min_entries = 1
+ ent1.parent_inode = None
inodes.cap_cache()
inodes.wait_remove_queue_empty()
self.assertEqual(600, cache.total())
self.assertTrue(ent1.clear.called)
# Touching ent1 should cause ent3 to get cleared
+ ent3.parent_inode = None
self.assertFalse(ent3.clear.called)
+ inodes.inode_cache.update_cache_size(ent1)
inodes.touch(ent1)
inodes.wait_remove_queue_empty()
self.assertTrue(ent3.clear.called)
@@ -132,6 +135,7 @@ class InodeTests(unittest.TestCase):
ent3.has_ref.return_value = False
ent1.clear.called = False
ent3.clear.called = False
+ ent3.parent_inode = None
inodes.touch(ent3)
inodes.wait_remove_queue_empty()
self.assertFalse(ent1.clear.called)
@@ -164,6 +168,6 @@ class InodeTests(unittest.TestCase):
inodes.wait_remove_queue_empty()
self.assertEqual(0, cache.total())
- inodes.touch(ent3)
+ inodes.add_entry(ent3)
inodes.wait_remove_queue_empty()
self.assertEqual(600, cache.total())
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list