[ARVADOS] created: 416a7c3a1b96abf7982362682048481f2afda0c9

git at public.curoverse.com git at public.curoverse.com
Tue Apr 14 15:10:52 EDT 2015


        at  416a7c3a1b96abf7982362682048481f2afda0c9 (commit)


commit 416a7c3a1b96abf7982362682048481f2afda0c9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Apr 14 15:10:32 2015 -0400

    3198: Manage inode cache based on (approximate) object size instead of object
    count.  It's only a soft limit on memory usage but still a big improvement.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 3129bdf..ce342b5 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -703,7 +703,7 @@ class ArvadosFile(object):
                     # segment is past the trucate size, all done
                     break
                 elif size < range_end:
-                    nr = Range(r.locator, r.range_start, size - r.range_start)
+                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
                     nr.segment_offset = r.segment_offset
                     new_segs.append(nr)
                     break
@@ -815,7 +815,7 @@ class ArvadosFile(object):
         """Internal implementation of add_segment."""
         self._modified = True
         for lr in locators_and_ranges(blocks, pos, size):
-            last = self._segments[-1] if self._segments else Range(0, 0, 0)
+            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
             self._segments.append(r)
 
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 3d48652..f03deed 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1381,7 +1381,7 @@ class Collection(RichCollectionBase):
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
                     blocksize = long(block_locator.group(1))
-                    blocks.append(Range(tok, streamoffset, blocksize))
+                    blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 3a42aa0..afc202e 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -35,7 +35,7 @@ class StreamReader(object):
             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
             if s:
                 blocksize = long(s.group(1))
-                self._data_locators.append(Range(tok, streamoffset, blocksize))
+                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
                 streamoffset += blocksize
                 continue
 
@@ -45,7 +45,7 @@ class StreamReader(object):
                 size = long(s.group(2))
                 name = s.group(3).replace('\\040', ' ')
                 if name not in self._files:
-                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name)
+                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
                 else:
                     filereader = self._files[name]
                     filereader.segments.append(Range(pos, filereader.size(), size))
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 5bb21c6..83b4710 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -61,40 +61,52 @@ class InodeCache(object):
         self._entries = collections.OrderedDict()
         self._counter = itertools.count(1)
         self.cap = cap
+        self._total = 0
+
+    def _remove(self, obj, clear):
+        if clear and not obj.clear():
+            _logger.warn("Could not clear %s in_use %s", obj, obj.in_use())
+            return False
+        self._total -= obj._cache_size
+        del self._entries[obj._cache_priority]
+        _logger.warn("Cleared %s total now %i", obj, self._total)
+        return True
 
     def cap_cache(self):
-        if len(self._entries) > self.cap:
-            ent = iter(self._entries)
-            ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
-            for key in ents:
-                capobj = self._entries[key]
-                if capobj.clear():
-                    _logger.debug("Cleared %s", self._entries[key])
-                    del self._entries[key]
+        _logger.warn("total is %i cap is %i", self._total, self.cap)
+        if self._total > self.cap:
+            need_gc = False
+            for key in list(self._entries.keys()):
+                if self._total < self.cap or len(self._entries) < 4:
+                    break
+                self._remove(self._entries[key], True)
+
 
     def manage(self, obj):
-        obj._cache_priority = next(self._counter)
-        self._entries[obj._cache_priority] = obj
-        _logger.debug("Managing %s", obj)
-        self.cap_cache()
+        if obj.persisted():
+            obj._cache_priority = next(self._counter)
+            obj._cache_size = obj.objsize()
+            self._entries[obj._cache_priority] = obj
+            self._total += obj.objsize()
+            _logger.warn("Managing %s total now %i", obj, self._total)
+            self.cap_cache()
 
     def touch(self, obj):
-        if obj._cache_priority in self._entries:
-            del self._entries[obj._cache_priority]
-        self.manage(obj)
+        if obj.persisted():
+            if obj._cache_priority in self._entries:
+                self._remove(obj, False)
+            self.manage(obj)
+            _logger.warn("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
 
     def unmanage(self, obj):
-        if obj._cache_priority in self._entries:
-            if obj.clear():
-                _logger.debug("Cleared %s", obj)
-                del self._entries[obj._cache_priority]
-
+        if obj.persisted() and obj._cache_priority in self._entries:
+            self._remove(obj, True)
 
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self, inode_cache=1000):
+    def __init__(self, inode_cache=256*1024*1024):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self._obj_cache = InodeCache(cap=inode_cache)
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 8bb810b..9da3a5c 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -1,6 +1,7 @@
 import time
 import ciso8601
 import calendar
+import functools
 
 def convertTime(t):
     """Parse Arvados timestamp to unix time."""
@@ -11,6 +12,16 @@ def convertTime(t):
     except (TypeError, ValueError):
         return 0
 
+def use_counter(orig_func):
+    @functools.wraps(orig_func)
+    def use_counter_wrapper(self, *args, **kwargs):
+        try:
+            self.inc_use()
+            return orig_func(self, *args, **kwargs)
+        finally:
+            self.dec_use()
+    return use_counter_wrapper
+
 class FreshBase(object):
     """Base class for maintaining fresh/stale state to determine when to update."""
     def __init__(self):
@@ -19,6 +30,7 @@ class FreshBase(object):
         self._last_update = time.time()
         self._atime = time.time()
         self._poll_time = 60
+        self.use_count = 0
 
     # Mark the value as stale
     def invalidate(self):
@@ -38,3 +50,21 @@ class FreshBase(object):
 
     def atime(self):
         return self._atime
+
+    def persisted(self):
+        return False
+
+    def clear(self, force=False):
+        pass
+
+    def in_use(self):
+        return self.use_count > 0
+
+    def inc_use(self):
+        self.use_count += 1
+
+    def dec_use(self):
+        self.use_count -= 1
+
+    def objsize(self):
+        return 0
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index cfa81b3..08a5168 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -7,7 +7,7 @@ import apiclient
 import functools
 
 from fusefile import StringFile, StreamReaderFile, ObjectFile
-from fresh import FreshBase, convertTime
+from fresh import FreshBase, convertTime, use_counter
 
 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
 
@@ -32,16 +32,6 @@ def sanitize_filename(dirty):
     else:
         return _disallowed_filename_characters.sub('_', dirty)
 
-def use_counter(orig_func):
-    @functools.wraps(orig_func)
-    def use_counter_wrapper(self, *args, **kwargs):
-        try:
-            self.inc_use()
-            return orig_func(self, *args, **kwargs)
-        finally:
-            self.dec_use()
-    return use_counter_wrapper
-
 
 class Directory(FreshBase):
     """Generic directory object, backed by a dict.
@@ -61,7 +51,6 @@ class Directory(FreshBase):
         self.inodes = inodes
         self._entries = {}
         self._mtime = time.time()
-        self.use_count = 0
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
@@ -74,14 +63,8 @@ class Directory(FreshBase):
     def size(self):
         return 0
 
-    def in_use(self):
-        return self.use_count > 0
-
-    def inc_use(self):
-        self.use_count += 1
-
-    def dec_use(self):
-        self.use_count -= 1
+    def persisted(self):
+        return False
 
     def checkupdate(self):
         if self.stale():
@@ -165,14 +148,12 @@ class Directory(FreshBase):
             oldentries = self._entries
             self._entries = {}
             for n in oldentries:
-                if isinstance(n, Directory):
-                    if not n.clear(force):
-                        self._entries = oldentries
-                        return False
+                if not oldentries[n].clear(force):
+                    self._entries = oldentries
+                    return False
             for n in oldentries:
-                if isinstance(n, Directory):
-                    llfuse.invalidate_entry(self.inode, str(n))
-                    self.inodes.del_entry(oldentries[n])
+                llfuse.invalidate_entry(self.inode, str(n))
+                self.inodes.del_entry(oldentries[n])
             llfuse.invalidate_inode(self.inode)
             self.invalidate()
             return True
@@ -198,6 +179,7 @@ class CollectionDirectory(Directory):
         else:
             self.collection_locator = collection
             self._mtime = 0
+        self._manifest_size = 0
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
@@ -214,6 +196,8 @@ class CollectionDirectory(Directory):
         self.update()
 
     def new_collection(self, new_collection_object, coll_reader):
+        self.clear(force=True)
+
         self.collection_object = new_collection_object
 
         self._mtime = convertTime(self.collection_object.get('modified_at'))
@@ -221,7 +205,6 @@ class CollectionDirectory(Directory):
         if self.collection_object_file is not None:
             self.collection_object_file.update(self.collection_object)
 
-        self.clear(force=True)
         for s in coll_reader.all_streams():
             cwd = self
             for part in s.name().split('/'):
@@ -229,9 +212,6 @@ class CollectionDirectory(Directory):
                     partname = sanitize_filename(part)
                     if partname not in cwd._entries:
                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
-                        # (hack until using new API)
-                        cwd._entries[partname].inc_use()
-                        # end hack
                     cwd = cwd._entries[partname]
             for k, v in s.files().items():
                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
@@ -264,6 +244,9 @@ class CollectionDirectory(Directory):
             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
                 self.new_collection(new_collection_object, coll_reader)
 
+            self._manifest_size = len(coll_reader.manifest_text())
+            _logger.debug("%s manifest_size %i", self, self._manifest_size)
+
             self.fresh()
             return True
         except arvados.errors.NotFoundError:
@@ -295,15 +278,15 @@ class CollectionDirectory(Directory):
             return super(CollectionDirectory, self).__contains__(k)
 
     def invalidate(self):
-        super(CollectionDirectory, self).invalidate()
         self.collection_object = None
+        self.collection_object_file = None
+        super(CollectionDirectory, self).invalidate()
 
-    def clear(self, force=False):
-        if self.collection_locator is None:
-            return False
-        else:
-            return super(CollectionDirectory, self).clear(force)
+    def persisted(self):
+        return (self.collection_locator is not None)
 
+    def objsize(self):
+        return self._manifest_size * 128
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
@@ -525,6 +508,11 @@ class ProjectDirectory(Directory):
         else:
             return super(ProjectDirectory, self).__contains__(k)
 
+    def persisted(self):
+        return False
+
+    def objsize(self):
+        return len(self.project_object) * 1024 if self.project_object else 0
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index e122d9d..efe31c3 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -24,14 +24,9 @@ class File(FreshBase):
     def mtime(self):
         return self._mtime
 
-    def clear(self):
-        pass
+    def clear(self, force=False):
+        return True
 
-    def inc_use(self):
-        pass
-
-    def dec_use(self):
-        pass
 
 class StreamReaderFile(File):
     """Wraps a StreamFileReader as a file."""
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 9c0ef10..3c96a56 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -46,7 +46,7 @@ with "--".
     parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
     parser.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
 
-    parser.add_argument('--inode-cache', type=int, help="Inode cache size", default=1024)
+    parser.add_argument('--inode-cache', type=int, help="Inode cache size (default 128MiB)", default=128*1024*1024)
 
     parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                         dest="exec_args", metavar=('command', 'args', '...', '--'),

commit 72cb9197fe4d7a1cc8b822e724d3fc03d77541dd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Apr 13 17:08:42 2015 -0400

    3198: Fixed arv-mount for refactoring.

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index deb0cd3..5bb21c6 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -24,7 +24,7 @@ import ciso8601
 import collections
 
 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
-from fusefile import StreamReaderFile
+from fusefile import StreamReaderFile, StringFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -56,7 +56,7 @@ class DirectoryHandle(object):
         self.dirobj.dec_use()
 
 
-class ObjectCache(object):
+class InodeCache(object):
     def __init__(self, cap):
         self._entries = collections.OrderedDict()
         self._counter = itertools.count(1)
@@ -69,11 +69,13 @@ class ObjectCache(object):
             for key in ents:
                 capobj = self._entries[key]
                 if capobj.clear():
+                    _logger.debug("Cleared %s", self._entries[key])
                     del self._entries[key]
 
     def manage(self, obj):
         obj._cache_priority = next(self._counter)
         self._entries[obj._cache_priority] = obj
+        _logger.debug("Managing %s", obj)
         self.cap_cache()
 
     def touch(self, obj):
@@ -84,6 +86,7 @@ class ObjectCache(object):
     def unmanage(self, obj):
         if obj._cache_priority in self._entries:
             if obj.clear():
+                _logger.debug("Cleared %s", obj)
                 del self._entries[obj._cache_priority]
 
 
@@ -91,10 +94,10 @@ class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self, cache_cap=1000):
+    def __init__(self, inode_cache=1000):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
-        self._obj_cache = ObjectCache(cap=cache_cap)
+        self._obj_cache = InodeCache(cap=inode_cache)
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -112,6 +115,7 @@ class Inodes(object):
         return k in self._entries
 
     def touch(self, entry):
+        entry._atime = time.time()
         self._obj_cache.touch(entry)
 
     def cap_cache(self):
@@ -141,10 +145,10 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000):
+    def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000):
         super(Operations, self).__init__()
 
-        self.inodes = Inodes(cache_cap)
+        self.inodes = Inodes(inode_cache)
         self.uid = uid
         self.gid = gid
         self.encoding = encoding
@@ -236,6 +240,7 @@ class Operations(llfuse.Operations):
         fh = self._filehandles_counter
         self._filehandles_counter += 1
         self._filehandles[fh] = FileHandle(fh, p)
+        self.inodes.touch(p)
         return fh
 
     def read(self, fh, off, size):
@@ -245,8 +250,7 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        # update atime
-        handle.fileobj._atime = time.time()
+        self.inodes.touch(handle.fileobj)
 
         try:
             with llfuse.lock_released:
@@ -286,14 +290,10 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.EIO)
 
         # update atime
-        p._atime = time.time()
+        self.inodes.touch(p)
 
-        try:
-            p.inc_use()
-            self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
-            return fh
-        finally:
-            p.dec_use()
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+        return fh
 
 
     def readdir(self, fh, off):
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index b540efe..9c0ef10 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -45,6 +45,9 @@ with "--".
     parser.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
     parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
     parser.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
+
+    parser.add_argument('--inode-cache', type=int, help="Inode cache size", default=1024)
+
     parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                         dest="exec_args", metavar=('command', 'args', '...', '--'),
                         help="""Mount, run a command, then unmount and exit""")
@@ -81,7 +84,7 @@ with "--".
 
     try:
         # Create the request handler
-        operations = Operations(os.getuid(), os.getgid(), args.encoding)
+        operations = Operations(os.getuid(), os.getgid(), args.encoding, args.inode_cache)
         api = ThreadSafeApiCache(arvados.config.settings())
 
         usr = api.users().current().execute(num_retries=args.retries)
@@ -112,7 +115,7 @@ with "--".
         if dir_class is not None:
             operations.inodes.add_entry(dir_class(*dir_args))
         else:
-            e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
+            e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE, operations.inodes))
             dir_args[0] = e.inode
 
             e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 4802d69..ecc0888 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -25,7 +25,7 @@ class MountTestBase(unittest.TestCase):
         self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def make_mount(self, root_class, **root_kwargs):
-        operations = fuse.Operations(os.getuid(), os.getgid(), cache_cap=2)
+        operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
         operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(operations, self.mounttmp, [])

commit d8d82841f43394b3781804844d4860bc8205d5fc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Apr 13 16:42:46 2015 -0400

    3198: Implement cache management for directory objects.

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a886e37..deb0cd3 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -21,6 +21,7 @@ import calendar
 import threading
 import itertools
 import ciso8601
+import collections
 
 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
 from fusefile import StreamReaderFile
@@ -29,21 +30,71 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 
 
 class FileHandle(object):
-    """Connects a numeric file handle to a File or Directory object that has
+    """Connects a numeric file handle to a File object that has
     been opened by the client."""
 
-    def __init__(self, fh, entry):
+    def __init__(self, fh, fileobj):
         self.fh = fh
-        self.entry = entry
+        self.fileobj = fileobj
+        self.fileobj.inc_use()
+
+    def release(self):
+        self.fileobj.dec_use()
+
+
+class DirectoryHandle(object):
+    """Connects a numeric file handle to a Directory object that has
+    been opened by the client."""
+
+    def __init__(self, fh, dirobj, entries):
+        self.fh = fh
+        self.entries = entries
+        self.dirobj = dirobj
+        self.dirobj.inc_use()
+
+    def release(self):
+        self.dirobj.dec_use()
+
+
+class ObjectCache(object):
+    def __init__(self, cap):
+        self._entries = collections.OrderedDict()
+        self._counter = itertools.count(1)
+        self.cap = cap
+
+    def cap_cache(self):
+        if len(self._entries) > self.cap:
+            ent = iter(self._entries)
+            ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
+            for key in ents:
+                capobj = self._entries[key]
+                if capobj.clear():
+                    del self._entries[key]
+
+    def manage(self, obj):
+        obj._cache_priority = next(self._counter)
+        self._entries[obj._cache_priority] = obj
+        self.cap_cache()
+
+    def touch(self, obj):
+        if obj._cache_priority in self._entries:
+            del self._entries[obj._cache_priority]
+        self.manage(obj)
+
+    def unmanage(self, obj):
+        if obj._cache_priority in self._entries:
+            if obj.clear():
+                del self._entries[obj._cache_priority]
 
 
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self):
+    def __init__(self, cache_cap=1000):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
+        self._obj_cache = ObjectCache(cap=cache_cap)
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -60,12 +111,20 @@ class Inodes(object):
     def __contains__(self, k):
         return k in self._entries
 
+    def touch(self, entry):
+        self._obj_cache.touch(entry)
+
+    def cap_cache(self):
+        self._obj_cache.cap_cache()
+
     def add_entry(self, entry):
         entry.inode = next(self._counter)
         self._entries[entry.inode] = entry
+        self._obj_cache.manage(entry)
         return entry
 
     def del_entry(self, entry):
+        self._obj_cache.unmanage(entry)
         llfuse.invalidate_inode(entry.inode)
         del self._entries[entry.inode]
 
@@ -82,10 +141,10 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, encoding="utf-8"):
+    def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000):
         super(Operations, self).__init__()
 
-        self.inodes = Inodes()
+        self.inodes = Inodes(cache_cap)
         self.uid = uid
         self.gid = gid
         self.encoding = encoding
@@ -187,11 +246,11 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.EBADF)
 
         # update atime
-        handle.entry._atime = time.time()
+        handle.fileobj._atime = time.time()
 
         try:
             with llfuse.lock_released:
-                return handle.entry.readfrom(off, size)
+                return handle.fileobj.readfrom(off, size)
         except arvados.errors.NotFoundError as e:
             _logger.warning("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
@@ -201,7 +260,12 @@ class Operations(llfuse.Operations):
 
     def release(self, fh):
         if fh in self._filehandles:
+            self._filehandles[fh].release()
             del self._filehandles[fh]
+        self.inodes.cap_cache()
+
+    def releasedir(self, fh):
+        self.release(fh)
 
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
@@ -224,8 +288,13 @@ class Operations(llfuse.Operations):
         # update atime
         p._atime = time.time()
 
-        self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
-        return fh
+        try:
+            p.inc_use()
+            self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+            return fh
+        finally:
+            p.dec_use()
+
 
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -235,20 +304,17 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        _logger.debug("arv-mount handle.entry %s", handle.entry)
+        _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
 
         e = off
-        while e < len(handle.entry):
-            if handle.entry[e][1].inode in self.inodes:
+        while e < len(handle.entries):
+            if handle.entries[e][1].inode in self.inodes:
                 try:
-                    yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
+                    yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
                 except UnicodeEncodeError:
                     pass
             e += 1
 
-    def releasedir(self, fh):
-        del self._filehandles[fh]
-
     def statfs(self):
         st = llfuse.StatvfsData()
         st.f_bsize = 64 * 1024
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 1144622..cfa81b3 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -4,6 +4,7 @@ import time
 import llfuse
 import arvados
 import apiclient
+import functools
 
 from fusefile import StringFile, StreamReaderFile, ObjectFile
 from fresh import FreshBase, convertTime
@@ -31,6 +32,16 @@ def sanitize_filename(dirty):
     else:
         return _disallowed_filename_characters.sub('_', dirty)
 
+def use_counter(orig_func):
+    @functools.wraps(orig_func)
+    def use_counter_wrapper(self, *args, **kwargs):
+        try:
+            self.inc_use()
+            return orig_func(self, *args, **kwargs)
+        finally:
+            self.dec_use()
+    return use_counter_wrapper
+
 
 class Directory(FreshBase):
     """Generic directory object, backed by a dict.
@@ -39,7 +50,7 @@ class Directory(FreshBase):
     and the value referencing a File or Directory object.
     """
 
-    def __init__(self, parent_inode):
+    def __init__(self, parent_inode, inodes):
         super(Directory, self).__init__()
 
         """parent_inode is the integer inode number"""
@@ -47,11 +58,14 @@ class Directory(FreshBase):
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
+        self.inodes = inodes
         self._entries = {}
         self._mtime = time.time()
+        self.use_count = 0
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
+    @use_counter
     def update(self):
         pass
 
@@ -60,6 +74,15 @@ class Directory(FreshBase):
     def size(self):
         return 0
 
+    def in_use(self):
+        return self.use_count > 0
+
+    def inc_use(self):
+        self.use_count += 1
+
+    def dec_use(self):
+        self.use_count -= 1
+
     def checkupdate(self):
         if self.stale():
             try:
@@ -67,22 +90,25 @@ class Directory(FreshBase):
             except apiclient.errors.HttpError as e:
                 _logger.debug(e)
 
+    @use_counter
     def __getitem__(self, item):
         self.checkupdate()
         return self._entries[item]
 
+    @use_counter
     def items(self):
         self.checkupdate()
-        return self._entries.items()
-
-    def __iter__(self):
-        self.checkupdate()
-        return self._entries.iterkeys()
+        return list(self._entries.items())
 
+    @use_counter
     def __contains__(self, k):
         self.checkupdate()
         return k in self._entries
 
+    def fresh(self):
+        self.inodes.touch(self)
+        super(Directory, self).fresh()
+
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
 
@@ -132,17 +158,26 @@ class Directory(FreshBase):
 
         self.fresh()
 
-    def clear(self):
+    def clear(self, force=False):
         """Delete all entries"""
-        oldentries = self._entries
-        self._entries = {}
-        for n in oldentries:
-            if isinstance(n, Directory):
-                n.clear()
-            llfuse.invalidate_entry(self.inode, str(n))
-            self.inodes.del_entry(oldentries[n])
-        llfuse.invalidate_inode(self.inode)
-        self.invalidate()
+
+        if not self.in_use() or force:
+            oldentries = self._entries
+            self._entries = {}
+            for n in oldentries:
+                if isinstance(n, Directory):
+                    if not n.clear(force):
+                        self._entries = oldentries
+                        return False
+            for n in oldentries:
+                if isinstance(n, Directory):
+                    llfuse.invalidate_entry(self.inode, str(n))
+                    self.inodes.del_entry(oldentries[n])
+            llfuse.invalidate_inode(self.inode)
+            self.invalidate()
+            return True
+        else:
+            return False
 
     def mtime(self):
         return self._mtime
@@ -152,8 +187,7 @@ class CollectionDirectory(Directory):
     """Represents the root of a directory tree holding a collection."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, collection):
-        super(CollectionDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(CollectionDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.collection_object_file = None
@@ -187,14 +221,17 @@ class CollectionDirectory(Directory):
         if self.collection_object_file is not None:
             self.collection_object_file.update(self.collection_object)
 
-        self.clear()
+        self.clear(force=True)
         for s in coll_reader.all_streams():
             cwd = self
             for part in s.name().split('/'):
                 if part != '' and part != '.':
                     partname = sanitize_filename(part)
                     if partname not in cwd._entries:
-                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
+                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
+                        # (hack until using new API)
+                        cwd._entries[partname].inc_use()
+                        # end hack
                     cwd = cwd._entries[partname]
             for k, v in s.files().items():
                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
@@ -257,6 +294,16 @@ class CollectionDirectory(Directory):
         else:
             return super(CollectionDirectory, self).__contains__(k)
 
+    def invalidate(self):
+        super(CollectionDirectory, self).invalidate()
+        self.collection_object = None
+
+    def clear(self, force=False):
+        if self.collection_locator is None:
+            return False
+        else:
+            return super(CollectionDirectory, self).clear(force)
+
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
@@ -281,8 +328,7 @@ will appear if it exists.
 """.lstrip()
 
     def __init__(self, parent_inode, inodes, api, num_retries):
-        super(MagicDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(MagicDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
 
@@ -308,6 +354,7 @@ will appear if it exists.
         try:
             e = self.inodes.add_entry(CollectionDirectory(
                     self.inode, self.inodes, self.api, self.num_retries, k))
+
             if e.update():
                 self._entries[k] = e
                 return True
@@ -323,28 +370,25 @@ will appear if it exists.
         else:
             raise KeyError("No collection with id " + item)
 
+    def clear(self, force=False):
+        pass
+
 
 class RecursiveInvalidateDirectory(Directory):
     def invalidate(self):
-        if self.inode == llfuse.ROOT_INODE:
-            llfuse.lock.acquire()
         try:
             super(RecursiveInvalidateDirectory, self).invalidate()
             for a in self._entries:
                 self._entries[a].invalidate()
         except Exception:
             _logger.exception()
-        finally:
-            if self.inode == llfuse.ROOT_INODE:
-                llfuse.lock.release()
 
 
 class TagsDirectory(RecursiveInvalidateDirectory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagsDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
@@ -370,8 +414,7 @@ class TagDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, tag,
                  poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
@@ -397,8 +440,7 @@ class ProjectDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
                  poll=False, poll_time=60):
-        super(ProjectDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(ProjectDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
@@ -462,11 +504,6 @@ class ProjectDirectory(Directory):
 
             contents = arvados.util.list_all(self.api.groups().contents,
                                              self.num_retries, uuid=self.uuid)
-            # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(
-                self.api.links().list, self.num_retries,
-                filters=[['tail_uuid', '=', self.uuid],
-                         ['link_class', '=', 'name']])
 
         # end with llfuse.lock_released, re-acquire lock
 
@@ -494,8 +531,7 @@ class SharedDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
                  poll=False, poll_time=60):
-        super(SharedDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(SharedDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 6dd2d8a..e122d9d 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -24,6 +24,14 @@ class File(FreshBase):
     def mtime(self):
         return self._mtime
 
+    def clear(self):
+        pass
+
+    def inc_use(self):
+        pass
+
+    def dec_use(self):
+        pass
 
 class StreamReaderFile(File):
     """Wraps a StreamFileReader as a file."""
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 764a099..4802d69 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -25,7 +25,7 @@ class MountTestBase(unittest.TestCase):
         self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def make_mount(self, root_class, **root_kwargs):
-        operations = fuse.Operations(os.getuid(), os.getgid())
+        operations = fuse.Operations(os.getuid(), os.getgid(), cache_cap=2)
         operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(operations, self.mounttmp, [])
@@ -243,8 +243,7 @@ class FuseSharedTest(MountTestBase):
         # directory)
         fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
         fuse_user_objs.sort()
-        self.assertEqual(['Empty collection.link',                # permission link on collection
-                          'FUSE Test Project',                    # project owned by user
+        self.assertEqual(['FUSE Test Project',                    # project owned by user
                           'collection #1 owned by FUSE',          # collection owned by user
                           'collection #2 owned by FUSE',          # collection owned by user
                           'pipeline instance owned by FUSE.pipelineInstance',  # pipeline instance owned by user

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list