[arvados] updated: 2.7.0-6109-geb986fe200
git repository hosting
git at public.arvados.org
Fri Mar 8 22:33:23 UTC 2024
Summary of changes:
services/fuse/arvados_fuse/__init__.py | 275 +++++++++++++++++++++------------
services/fuse/arvados_fuse/command.py | 4 +-
services/fuse/arvados_fuse/fresh.py | 12 +-
services/fuse/arvados_fuse/fusedir.py | 45 +++---
4 files changed, 204 insertions(+), 132 deletions(-)
via eb986fe200a4998052fd08323ff0a67eb05490fd (commit)
via 310bf276572164d7d47e250f45ed1b5cf7902a2d (commit)
from 32e56d60fd965260662b5c8d8aaafc0d793e33bd (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit eb986fe200a4998052fd08323ff0a67eb05490fd
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Mar 8 17:32:44 2024 -0500
21541: Making progress on memory management
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a966964248..0370237c1b 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -145,175 +145,99 @@ class InodeCache(object):
"""
def __init__(self, cap, min_entries=4):
- self._entries = collections.OrderedDict()
+ self._cache_entries = collections.OrderedDict()
self._by_uuid = {}
self.cap = cap
self._total = 0
self.min_entries = min_entries
-
- self._cache_remove_queue = queue.Queue()
- self._cache_remove_thread = threading.Thread(None, self._cache_remove)
- self._cache_remove_thread.daemon = True
- self._cache_remove_thread.start()
-
- self._cache_cap_event = threading.Event()
- self._cache_cap_thread = threading.Thread(None, self._cache_cap)
- self._cache_cap_thread.daemon = True
- self._cache_cap_thread.start()
-
+ self._total_lock = threading.Lock()
def total(self):
return self._total
- def _cache_cap(self):
- while True:
- self._cache_cap_event.wait()
- self._cache_cap_event.clear()
-
- if self._total > self.cap:
- with llfuse.lock:
- _logger.debug("InodeCache cap_cache %i, %i", self._total, self.cap)
- for ent in listvalues(self._entries):
- if self._total < self.cap or len(self._entries) < self.min_entries:
- break
- self._remove(ent)
-
-
- def _cache_remove(self):
- while True:
- entry = self._cache_remove_queue.get()
- _logger.debug("InodeCache got %s", entry)
- if entry is None:
- return
-
- if entry.inode not in self._entries:
- # removed already
- continue
-
- _logger.debug("InodeCache will remove inode %i", entry.inode)
- with llfuse.lock:
- _logger.debug("InodeCache removing inode %i", entry.inode)
- try:
- parent = self._entries.get(entry.parent_inode)
- if parent is not None and parent.has_ref(False):
- continue
-
- if parent is not None and parent.in_use():
- #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
- return
-
- # Invalidate the entry for self on the parent
- entry.kernel_invalidate()
-
- # For directories, clear the contents
- entry.clear()
-
- # manage cache size running sum
- self._total -= entry.cache_size
+ def evict_candidates(self):
+ with self._total_lock:
+ total = self._total
+ if total <= self.cap:
+ return
- # manage the mapping of uuid to object
- if entry.cache_uuid:
- self._by_uuid[entry.cache_uuid].remove(entry)
- if not self._by_uuid[entry.cache_uuid]:
- del self._by_uuid[entry.cache_uuid]
- entry.cache_uuid = None
+ _logger.debug("InodeCache evict_candidates total %i cap %i entries %i", self._total, self.cap, len(self._cache_entries))
+ for ent in listvalues(self._cache_entries):
+ with self._total_lock:
+ total = self._total
+ if total < self.cap or len(self._cache_entries) < self.min_entries:
+ break
+ yield ent
- # Now fully forget about it
- del self._entries[entry.inode]
+ def manage(self, obj):
+ if obj.inode in self._cache_entries:
+ return
- _logger.debug("InodeCache removed inode %i, total %i", entry.inode, self._total)
- entry.inode = None
+ obj.cache_size = obj.objsize()
- # stop anything else
- with llfuse.lock_released:
- entry.finalize()
- except Exception as e:
- _logger.exception("failed remove")
+ with self._total_lock:
+ _logger.debug("InodeCache b4 cache_size %i total %i", obj.cache_size, self._total)
+ self._total += obj.cache_size
+ _logger.debug("InodeCache after cache_size %i total %i", obj.cache_size, self._total)
+ total = self._total
- def _remove(self, entry):
- if entry.inode not in self._entries:
- return
+ self._cache_entries[obj.inode] = obj
- # Kernel behavior seems to be that if a file is
- # referenced, its parents remain referenced too. This
- # means has_ref() exits early when a collection is not
- # candidate for eviction.
- #
- # By contrast, in_use() doesn't increment references on
- # parents, so it requires a full tree walk to determine if
- # a collection is a candidate for eviction. This takes
- # .07s for 240000 files, which becomes a major drag when
- # cap_cache is being called several times a second and
- # there are multiple non-evictable collections in the
- # cache.
- #
- # So it is important for performance that we do the
- # has_ref() check first.
-
- # if entry.has_ref(True):
- # #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
- # return
-
- parent = self._entries.get(entry.parent_inode)
- if parent is not None and parent.has_ref(False):
- #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
- return
+ obj.cache_uuid = obj.uuid()
+ if obj.cache_uuid:
+ if obj.cache_uuid not in self._by_uuid:
+ self._by_uuid[obj.cache_uuid] = [obj]
+ else:
+ if obj not in self._by_uuid[obj.cache_uuid]:
+ self._by_uuid[obj.cache_uuid].append(obj)
- # if entry.has_ref(True):
- # #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
- # return
+ _logger.debug("InodeCache managing inode %i (size %i) (uuid %s) total now %i (%i entries)",
+ obj.inode, obj.cache_size, obj.cache_uuid, total, len(self._cache_entries))
- if parent is not None and parent.in_use():
- #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+ def unmanage(self, entry):
+ if entry.inode not in self._cache_entries:
return
- _logger.debug("InodeCache queuing %i", entry.inode)
- self._cache_remove_queue.put(entry)
+ # manage cache size running sum
+ # with self._total_lock:
+ # self._total -= entry.cache_size
+ # entry.cache_size = 0
- def cap_cache(self):
- self._cache_cap_event.set()
+ # manage the mapping of uuid to object
+ if entry.cache_uuid:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
+ entry.cache_uuid = None
- def manage(self, obj):
- if obj.persisted():
- obj.cache_size = obj.objsize()
- self._entries[obj.inode] = obj
- obj.cache_uuid = obj.uuid()
- if obj.cache_uuid:
- if obj.cache_uuid not in self._by_uuid:
- self._by_uuid[obj.cache_uuid] = [obj]
- else:
- if obj not in self._by_uuid[obj.cache_uuid]:
- self._by_uuid[obj.cache_uuid].append(obj)
- self._total += obj.cache_size
- _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
- obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
- self.cap_cache()
+ # Now forget about it
+ del self._cache_entries[entry.inode]
def update_cache_size(self, obj):
- if obj.inode in self._entries:
- self._total -= obj.cache_size
- obj.cache_size = obj.objsize()
- self._total += obj.cache_size
+ pass
+ # if obj.inode in self._cache_entries:
+ # with self._total_lock:
+ # _logger.debug("update_cache_size b4 cache_size %i total %i", obj.cache_size, self._total)
+ # self._total -= obj.cache_size
+ # obj.cache_size = obj.objsize()
+ # self._total += obj.cache_size
+ # _logger.debug("update_cache_size after cache_size %i total %i", obj.cache_size, self._total)
def touch(self, obj):
- if obj.persisted():
- if obj.inode in self._entries:
- self._entries.move_to_end(obj.inode)
- else:
- self.manage(obj)
-
- def unmanage(self, obj):
- if obj.persisted() and obj.inode in self._entries:
- self._remove(obj)
+ if obj.inode in self._cache_entries:
+ self._cache_entries.move_to_end(obj.inode)
+ else:
+ self.manage(obj)
def find_by_uuid(self, uuid):
return self._by_uuid.get(uuid, [])
def clear(self):
- self._entries.clear()
+ self._cache_entries.clear()
self._by_uuid.clear()
self._total = 0
+
class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
@@ -324,6 +248,11 @@ class Inodes(object):
self.inode_cache = inode_cache
self.encoding = encoding
+ self._inode_remove_queue = queue.Queue()
+ self._inode_remove_thread = threading.Thread(None, self._inode_remove)
+ self._inode_remove_thread.daemon = True
+ self._inode_remove_thread.start()
+
def __getitem__(self, item):
return self._entries[item]
@@ -343,36 +272,124 @@ class Inodes(object):
entry._atime = time.time()
self.inode_cache.touch(entry)
+ def cap_cache(self):
+ self._inode_remove_queue.put(("evict_candidates",))
+
def add_entry(self, entry):
+ # Assign a inode to a new entry
entry.inode = next(self._counter)
if entry.inode == llfuse.ROOT_INODE:
entry.inc_ref()
self._entries[entry.inode] = entry
- self.inode_cache.manage(entry)
+ if entry.persisted():
+ # only "persisted" items can be reloaded from the server
+ # making them safe to evict automatically.
+ self.inode_cache.manage(entry)
+ self.cap_cache()
return entry
def del_entry(self, entry):
+ # Remove entry from the inode table
if entry.ref_count == 0:
- self.inode_cache.unmanage(entry)
+ self._inode_remove_queue.put(("remove", entry))
else:
entry.dead = True
_logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+ def _inode_remove(self):
+ while True:
+ try:
+ entry = self._inode_remove_queue.get(True)
+ with llfuse.lock:
+ # Process this entry
+ _logger.debug("_inode_remove %s", entry)
+ self._inode_op(entry)
+
+ while True:
+ try:
+ # Drain the queue of any other entries
+ entry = self._inode_remove_queue.get(False)
+ _logger.debug("_inode_remove %s", entry)
+ self._inode_op(entry)
+ except queue.Empty:
+ break
+
+ for entry in self.inode_cache.evict_candidates():
+ self._remove(entry)
+ except Exception as e:
+ _logger.exception("_inode_remove")
+
+ def _inode_op(self, op):
+ if op[0] == "remove":
+ self._remove(op[1])
+ if op[0] == "invalidate_inode":
+ with llfuse.lock_released:
+ _logger.debug("sending invalidate inode %i", op[1])
+ llfuse.invalidate_inode(op[1])
+ if op[0] == "invalidate_entry":
+ with llfuse.lock_released:
+ _logger.debug("sending invalidate to inode %i entry %s", op[1], op[2])
+ llfuse.invalidate_entry(op[1], op[2])
+ if op[0] == "evict_candidates":
+ pass
+
+
+ def _remove(self, entry):
+ try:
+ if entry.inode is None:
+ # Removed already
+ return
+
+ if entry.has_ref():
+ # has kernel reference, can't be removed.
+ #_logger.debug("InodeCache cannot clear inode %i, is referenced", entry.inode)
+ return
+
+ if entry.in_use():
+ # referenced internally, stay pinned
+ #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+ return
+
+ forget_inode = True
+ parent = self._entries.get(entry.parent_inode)
+ if parent is not None and parent.has_ref():
+ # the parent is still referenced, so we'll keep the
+ # entry but wipe out the stuff under it
+ forget_inode = False
+
+ if forget_inode:
+ self.inode_cache.unmanage(entry)
+
+ _logger.debug("InodeCache removing inode %i", entry.inode)
+
+ # Invalidate the entry for self on the parent
+ entry.kernel_invalidate()
+
+ # For directories, clear the contents
+ entry.clear()
+
+ _logger.debug("InodeCache clearing inode %i, total %i, forget_inode %s",
+ entry.inode, self.inode_cache.total(), forget_inode)
+ if forget_inode:
+ entry.inode = None
+
+ # stop anything else
+ with llfuse.lock_released:
+ entry.finalize()
+ except Exception as e:
+ _logger.exception("failed remove")
+
def invalidate_inode(self, entry):
- if entry.has_ref(False):
+ if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- with llfuse.lock_released:
- _logger.debug("sending invalidate %i", entry.inode)
- llfuse.invalidate_inode(entry.inode)
+ self._inode_remove_queue.put(("invalidate_inode", entry.inode))
def invalidate_entry(self, entry, name):
- if entry.has_ref(False):
+ if entry.has_ref():
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- with llfuse.lock_released:
- _logger.debug("sending invalidate to inode %i entry %s", entry.inode, name)
- llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+ self._inode_remove_queue.put(("invalidate_entry", entry.inode, native(name.encode(self.encoding))))
def clear(self):
self.inode_cache.clear()
@@ -557,6 +574,7 @@ class Operations(llfuse.Operations):
@catch_exceptions
def getattr(self, inode, ctx=None):
if inode not in self.inodes:
+ _logger.debug("arv-mount getattr: inode %i missing", inode)
raise llfuse.FUSEError(errno.ENOENT)
e = self.inodes[inode]
@@ -667,6 +685,7 @@ class Operations(llfuse.Operations):
if inode in self.inodes:
p = self.inodes[inode]
else:
+ _logger.debug("arv-mount open: inode %i missing", inode)
raise llfuse.FUSEError(errno.ENOENT)
if isinstance(p, Directory):
@@ -748,7 +767,7 @@ class Operations(llfuse.Operations):
finally:
self._filehandles[fh].release()
del self._filehandles[fh]
- self.inodes.inode_cache.cap_cache()
+ self.inodes.cap_cache()
def releasedir(self, fh):
self.release(fh)
@@ -761,7 +780,7 @@ class Operations(llfuse.Operations):
if inode in self.inodes:
p = self.inodes[inode]
else:
- _logger.warning("arv-mount opendir: called with inode %i but it is missing", inode)
+ _logger.debug("arv-mount opendir: called with unknown or removed inode %i", inode)
raise llfuse.FUSEError(errno.ENOENT)
if not isinstance(p, Directory):
@@ -778,7 +797,9 @@ class Operations(llfuse.Operations):
# update atime
self.inodes.touch(p)
+ p.inc_use()
self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
+ p.dec_use()
return fh
@readdir_time.time()
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 53214ee94d..366b5945bc 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -125,17 +125,11 @@ class FreshBase(object):
self.ref_count -= n
return self.ref_count
- def has_ref(self, only_children):
+ def has_ref(self):
"""Determine if there are any kernel references to this
- object or its children.
-
- If only_children is True, ignore refcount of self and only consider
- children.
+ object.
"""
- if only_children:
- return False
- else:
- return self.ref_count > 0
+ return self.ref_count > 0
def objsize(self):
return 0
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 899e3abb34..25e611e67d 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -228,21 +228,14 @@ class Directory(FreshBase):
return True
return False
- def has_ref(self, only_children):
- if super(Directory, self).has_ref(only_children):
- return True
- for v in self._entries.values():
- if v.has_ref(False):
- return True
- return False
-
def clear(self):
"""Delete all entries"""
oldentries = self._entries
self._entries = {}
+ self.invalidate()
for n in oldentries:
self.inodes.del_entry(oldentries[n])
- self.invalidate()
+ self.inodes.inode_cache.update_cache_size(self)
def kernel_invalidate(self):
# Invalidating the dentry on the parent implies invalidating all paths
@@ -1138,7 +1131,9 @@ class ProjectDirectory(Directory):
def _add_entry(self, i, name):
ent = self.createDirectory(i)
+ ent.inc_use()
self._entries[name] = self.inodes.add_entry(ent)
+ ent.dec_use()
return self._entries[name]
@use_counter
commit 310bf276572164d7d47e250f45ed1b5cf7902a2d
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Mar 7 14:37:25 2024 -0500
21541: WIP, need to not throw things out too quickly
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a70c99fde9..a966964248 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -77,16 +77,6 @@ import arvados.keep
from prometheus_client import Summary
import queue
-# Default _notify_queue has a limit of 1000 items, but it really needs to be
-# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
-# details.
-
-if hasattr(llfuse, 'capi'):
- # llfuse < 0.42
- llfuse.capi._notify_queue = queue.Queue()
-else:
- # llfuse >= 0.42
- llfuse._notify_queue = queue.Queue()
LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
@@ -161,66 +151,127 @@ class InodeCache(object):
self._total = 0
self.min_entries = min_entries
+ self._cache_remove_queue = queue.Queue()
+ self._cache_remove_thread = threading.Thread(None, self._cache_remove)
+ self._cache_remove_thread.daemon = True
+ self._cache_remove_thread.start()
+
+ self._cache_cap_event = threading.Event()
+ self._cache_cap_thread = threading.Thread(None, self._cache_cap)
+ self._cache_cap_thread.daemon = True
+ self._cache_cap_thread.start()
+
+
def total(self):
return self._total
- def _remove(self, obj, clear):
- if obj.inode is None:
- return
- if clear:
- # Kernel behavior seems to be that if a file is
- # referenced, its parents remain referenced too. This
- # means has_ref() exits early when a collection is not
- # candidate for eviction.
- #
- # By contrast, in_use() doesn't increment references on
- # parents, so it requires a full tree walk to determine if
- # a collection is a candidate for eviction. This takes
- # .07s for 240000 files, which becomes a major drag when
- # cap_cache is being called several times a second and
- # there are multiple non-evictable collections in the
- # cache.
- #
- # So it is important for performance that we do the
- # has_ref() check first.
-
- if obj.has_ref(True):
- #_logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
+ def _cache_cap(self):
+ while True:
+ self._cache_cap_event.wait()
+ self._cache_cap_event.clear()
+
+ if self._total > self.cap:
+ with llfuse.lock:
+ _logger.debug("InodeCache cap_cache %i, %i", self._total, self.cap)
+ for ent in listvalues(self._entries):
+ if self._total < self.cap or len(self._entries) < self.min_entries:
+ break
+ self._remove(ent)
+
+
+ def _cache_remove(self):
+ while True:
+ entry = self._cache_remove_queue.get()
+ _logger.debug("InodeCache got %s", entry)
+ if entry is None:
return
- if obj.in_use():
- #_logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
- return
+ if entry.inode not in self._entries:
+ # removed already
+ continue
+
+ _logger.debug("InodeCache will remove inode %i", entry.inode)
+ with llfuse.lock:
+ _logger.debug("InodeCache removing inode %i", entry.inode)
+ try:
+ parent = self._entries.get(entry.parent_inode)
+ if parent is not None and parent.has_ref(False):
+ continue
+
+ if parent is not None and parent.in_use():
+ #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+ return
+
+ # Invalidate the entry for self on the parent
+ entry.kernel_invalidate()
+
+ # For directories, clear the contents
+ entry.clear()
+
+ # manage cache size running sum
+ self._total -= entry.cache_size
- obj.kernel_invalidate()
- _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
- obj.clear()
+ # manage the mapping of uuid to object
+ if entry.cache_uuid:
+ self._by_uuid[entry.cache_uuid].remove(entry)
+ if not self._by_uuid[entry.cache_uuid]:
+ del self._by_uuid[entry.cache_uuid]
+ entry.cache_uuid = None
- # The llfuse lock is released in del_entry(), which is called by
- # Directory.clear(). While the llfuse lock is released, it can happen
- # that a reentrant call removes this entry before this call gets to it.
- # Ensure that the entry is still valid before trying to remove it.
- if obj.inode not in self._entries:
+ # Now fully forget about it
+ del self._entries[entry.inode]
+
+ _logger.debug("InodeCache removed inode %i, total %i", entry.inode, self._total)
+ entry.inode = None
+
+ # stop anything else
+ with llfuse.lock_released:
+ entry.finalize()
+ except Exception as e:
+ _logger.exception("failed remove")
+
+ def _remove(self, entry):
+ if entry.inode not in self._entries:
return
- self._total -= obj.cache_size
- del self._entries[obj.inode]
- if obj.cache_uuid:
- self._by_uuid[obj.cache_uuid].remove(obj)
- if not self._by_uuid[obj.cache_uuid]:
- del self._by_uuid[obj.cache_uuid]
- obj.cache_uuid = None
- if clear:
- _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
+ # Kernel behavior seems to be that if a file is
+ # referenced, its parents remain referenced too. This
+ # means has_ref() exits early when a collection is not
+ # candidate for eviction.
+ #
+ # By contrast, in_use() doesn't increment references on
+ # parents, so it requires a full tree walk to determine if
+ # a collection is a candidate for eviction. This takes
+ # .07s for 240000 files, which becomes a major drag when
+ # cap_cache is being called several times a second and
+ # there are multiple non-evictable collections in the
+ # cache.
+ #
+ # So it is important for performance that we do the
+ # has_ref() check first.
+
+ # if entry.has_ref(True):
+ # #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
+ # return
+
+ parent = self._entries.get(entry.parent_inode)
+ if parent is not None and parent.has_ref(False):
+ #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
+ return
+
+ # if entry.has_ref(True):
+ # #_logger.debug("InodeCache cannot clear inode %i, still referenced", entry.inode)
+ # return
+
+ if parent is not None and parent.in_use():
+ #_logger.debug("InodeCache cannot clear inode %i, in use", entry.inode)
+ return
+
+ _logger.debug("InodeCache queuing %i", entry.inode)
+ self._cache_remove_queue.put(entry)
def cap_cache(self):
- _logger.debug("in cap_cache %i, %i", self._total, self.cap)
- if self._total > self.cap:
- for ent in listvalues(self._entries):
- if self._total < self.cap or len(self._entries) < self.min_entries:
- break
- self._remove(ent, True)
- _logger.debug("end cap_cache %i, %i", self._total, self.cap)
+ self._cache_cap_event.set()
def manage(self, obj):
if obj.persisted():
@@ -236,6 +287,7 @@ class InodeCache(object):
self._total += obj.cache_size
_logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
+ self.cap_cache()
def update_cache_size(self, obj):
if obj.inode in self._entries:
@@ -252,7 +304,7 @@ class InodeCache(object):
def unmanage(self, obj):
if obj.persisted() and obj.inode in self._entries:
- self._remove(obj, True)
+ self._remove(obj)
def find_by_uuid(self, uuid):
return self._by_uuid.get(uuid, [])
@@ -271,7 +323,6 @@ class Inodes(object):
self._counter = itertools.count(llfuse.ROOT_INODE)
self.inode_cache = inode_cache
self.encoding = encoding
- self.deferred_invalidations = []
def __getitem__(self, item):
return self._entries[item]
@@ -303,10 +354,6 @@ class Inodes(object):
def del_entry(self, entry):
if entry.ref_count == 0:
self.inode_cache.unmanage(entry)
- del self._entries[entry.inode]
- with llfuse.lock_released:
- entry.finalize()
- entry.inode = None
else:
entry.dead = True
_logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
@@ -315,13 +362,17 @@ class Inodes(object):
if entry.has_ref(False):
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- llfuse.invalidate_inode(entry.inode)
+ with llfuse.lock_released:
+ _logger.debug("sending invalidate %i", entry.inode)
+ llfuse.invalidate_inode(entry.inode)
def invalidate_entry(self, entry, name):
if entry.has_ref(False):
# Only necessary if the kernel has previously done a lookup on this
# inode and hasn't yet forgotten about it.
- llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
+ with llfuse.lock_released:
+ _logger.debug("sending invalidate to inode %i entry %s", entry.inode, name)
+ llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
def clear(self):
self.inode_cache.clear()
@@ -710,6 +761,7 @@ class Operations(llfuse.Operations):
if inode in self.inodes:
p = self.inodes[inode]
else:
+ _logger.warning("arv-mount opendir: called with inode %i but it is missing", inode)
raise llfuse.FUSEError(errno.ENOENT)
if not isinstance(p, Directory):
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 8c24467aeb..7c193bbb9f 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -387,7 +387,7 @@ class Mount(object):
e = self.operations.inodes.add_entry(Directory(
llfuse.ROOT_INODE,
self.operations.inodes,
- lambda: self.api.config(),
+ self.api.config(),
self.args.enable_write,
self.args.filters,
))
@@ -472,7 +472,7 @@ From here, the following directories are available:
def _llfuse_main(self):
try:
- llfuse.main(workers=1)
+ llfuse.main(workers=8)
except:
llfuse.close(unmount=False)
raise
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 0671670c9d..899e3abb34 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -67,7 +67,7 @@ class Directory(FreshBase):
def forward_slash_subst(self):
if not hasattr(self, '_fsns'):
self._fsns = None
- config = self.apiconfig()
+ config = self.apiconfig
try:
self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
except KeyError:
@@ -194,13 +194,13 @@ class Directory(FreshBase):
if not name:
continue
if name not in self._entries:
- _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
# create new directory entry
ent = new_entry(i)
if ent is not None:
ent.inc_use()
self._entries[name] = self.inodes.add_entry(ent)
changed = True
+ _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
@@ -214,7 +214,6 @@ class Directory(FreshBase):
self.inodes.invalidate_inode(self)
self._mtime = time.time()
self.inodes.inode_cache.update_cache_size(self)
- self.inodes.inode_cache.cap_cache()
for ent in self._entries.values():
ent.dec_use()
@@ -248,7 +247,11 @@ class Directory(FreshBase):
def kernel_invalidate(self):
# Invalidating the dentry on the parent implies invalidating all paths
# below it as well.
- parent = self.inodes[self.parent_inode]
+ if self.parent_inode in self.inodes:
+ parent = self.inodes[self.parent_inode]
+ else:
+ # parent was removed already.
+ return
# Find self on the parent in order to invalidate this path.
# Calling the public items() method might trigger a refresh,
@@ -469,12 +472,16 @@ class CollectionDirectoryBase(Directory):
super(CollectionDirectoryBase, self).clear()
self.collection = None
+ def objsize(self):
+ # objsize for the whole thing is represented at the root,
+ # don't double-count it
+ return 0
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters, None, self)
self.api = api
self.num_retries = num_retries
self._poll = True
@@ -652,7 +659,10 @@ class CollectionDirectory(CollectionDirectoryBase):
def finalize(self):
if self.collection is not None:
if self.writable():
- self.collection.save()
+ try:
+ self.collection.save()
+ except Exception as e:
+ _logger.exception("Failed to save collection %s", self.collection_locator)
self.collection.stop_threads()
def clear(self):
@@ -784,7 +794,7 @@ and the directory will appear if it exists.
""".lstrip()
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
- super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(MagicDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
@@ -883,7 +893,7 @@ class TagsDirectory(Directory):
"""A special directory that contains as subdirectories all tags visible to the user."""
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(TagsDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
self.api = api
self.num_retries = num_retries
self._poll = True
@@ -963,7 +973,7 @@ class TagDirectory(Directory):
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(TagDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
self.api = api
self.num_retries = num_retries
self.tag = tag
@@ -1006,7 +1016,7 @@ class ProjectDirectory(Directory):
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
project_object, poll=True, poll_time=3, storage_classes=None):
- super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
@@ -1317,7 +1327,7 @@ class SharedDirectory(Directory):
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
exclude, poll=False, poll_time=60, storage_classes=None):
- super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
+ super(SharedDirectory, self).__init__(parent_inode, inodes, api.config(), enable_write, filters)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list