[ARVADOS] updated: d2f68bd1e108c3f2dda2322c427050d019b17e04

git at public.curoverse.com git at public.curoverse.com
Thu May 7 17:11:59 EDT 2015


Summary of changes:
 services/fuse/arvados_fuse/__init__.py | 57 ++++++++++++----------
 services/fuse/arvados_fuse/fresh.py    |  3 ++
 services/fuse/arvados_fuse/fusedir.py  | 12 ++---
 services/fuse/bin/arv-mount            | 12 +++--
 services/fuse/setup.py                 |  2 +-
 services/fuse/tests/test_inodes.py     | 88 ++++++++++++++++++++++++++++++++++
 services/fuse/tests/test_mount.py      |  4 +-
 7 files changed, 140 insertions(+), 38 deletions(-)
 create mode 100644 services/fuse/tests/test_inodes.py

       via  d2f68bd1e108c3f2dda2322c427050d019b17e04 (commit)
       via  0c01dc225cf57ec886b952440abd0764a85c7a67 (commit)
       via  fa4c5476b4189798b2123c6948d18db43449329e (commit)
      from  296e1b20ecf2c283508712332f163c06464b3b5d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d2f68bd1e108c3f2dda2322c427050d019b17e04
Merge: 296e1b2 0c01dc2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 7 17:11:51 2015 -0400

    Merge branch '3198-inode-cache' into 3198-writable-fuse, fix tests.
    
    Conflicts:
    	services/fuse/arvados_fuse/__init__.py
    	services/fuse/arvados_fuse/fresh.py
    	services/fuse/tests/test_mount.py

diff --cc services/fuse/arvados_fuse/__init__.py
index b3992ef,2387c69..1b44eaf
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@@ -67,9 -52,13 +67,9 @@@ class DirectoryHandle(Handle)
  
  
  class InodeCache(object):
-     def __init__(self, cap):
+     def __init__(self, cap, min_entries=4):
          self._entries = collections.OrderedDict()
 +        self._by_uuid = {}
          self._counter = itertools.count(1)
          self.cap = cap
          self._total = 0
@@@ -78,11 -71,8 +82,11 @@@
          if clear and not obj.clear():
              _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
              return False
-         self._total -= obj._cache_size
-         del self._entries[obj._cache_priority]
-         if obj._cache_uuid:
-             del self._by_uuid[obj._cache_uuid]
-             obj._cache_uuid = None
+         self._total -= obj.cache_size
+         del self._entries[obj.cache_priority]
++        if obj.cache_uuid:
++            del self._by_uuid[obj.cache_uuid]
++            obj.cache_uuid = None
          _logger.debug("Cleared %s total now %i", obj, self._total)
          return True
  
@@@ -96,17 -86,12 +100,17 @@@
  
      def manage(self, obj):
          if obj.persisted():
-             obj._cache_priority = next(self._counter)
-             obj._cache_size = obj.objsize()
-             self._entries[obj._cache_priority] = obj
-             if obj.uuid():
-                 obj._cache_uuid = obj.uuid()
-                 self._by_uuid[obj._cache_uuid] = obj
+             obj.cache_priority = next(self._counter)
+             obj.cache_size = obj.objsize()
+             self._entries[obj.cache_priority] = obj
++            obj.cache_uuid = obj.uuid()
++            if obj.cache_uuid:
++                self._by_uuid[obj.cache_uuid] = obj
              self._total += obj.objsize()
              _logger.debug("Managing %s total now %i", obj, self._total)
              self.cap_cache()
 +        else:
-             obj._cache_priority = None
++            obj.cache_priority = None
  
      def touch(self, obj):
          if obj.persisted():
@@@ -116,12 -101,9 +120,12 @@@
              _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
  
      def unmanage(self, obj):
-         if obj.persisted() and obj._cache_priority in self._entries:
+         if obj.persisted() and obj.cache_priority in self._entries:
              self._remove(obj, True)
  
 +    def find(self, uuid):
 +        return self._by_uuid.get(uuid)
 +
  class Inodes(object):
      """Manage the set of inodes.  This is the mapping from a numeric id
      to a concrete File or Directory object"""
@@@ -157,29 -139,9 +161,29 @@@
          return entry
  
      def del_entry(self, entry):
 -        self.inode_cache.unmanage(entry)
 -        llfuse.invalidate_inode(entry.inode)
 -        del self._entries[entry.inode]
 +        if entry.ref_count == 0:
 +            _logger.warn("Deleting inode %i", entry.inode)
-             self.cache.unmanage(entry)
++            self.inode_cache.unmanage(entry)
 +            llfuse.invalidate_inode(entry.inode)
 +            del self._entries[entry.inode]
 +        else:
 +            _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
 +            entry.dead = True
 +
 +def catch_exceptions(orig_func):
 +    @functools.wraps(orig_func)
 +    def catch_exceptions_wrapper(self, *args, **kwargs):
 +        try:
 +            return orig_func(self, *args, **kwargs)
 +        except llfuse.FUSEError:
 +            raise
 +        except EnvironmentError as e:
 +            raise llfuse.FUSEError(e.errno)
 +        except:
 +            _logger.exception("Unhandled exception during FUSE operation")
 +            raise llfuse.FUSEError(errno.EIO)
 +
 +    return catch_exceptions_wrapper
  
  
  class Operations(llfuse.Operations):
@@@ -194,7 -156,7 +198,10 @@@
  
      """
  
-     def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
 -    def __init__(self, uid, gid, encoding="utf-8", inode_cache=InodeCache(cap=256*1024*1024)):
++    def __init__(self, uid, gid,
++                 encoding="utf-8",
++                 inode_cache=InodeCache(cap=256*1024*1024),
++                 num_retries=4):
          super(Operations, self).__init__()
  
          self.inodes = Inodes(inode_cache)
@@@ -226,25 -180,6 +233,25 @@@
      def access(self, inode, mode, ctx):
          return True
  
 +    def listen_for_events(self, api_client):
 +        self.event = arvados.events.subscribe(api_client,
 +                                 [["event_type", "in", ["create", "update", "delete"]]],
 +                                 self.on_event)
 +
 +    def on_event(self, ev):
 +        if 'event_type' in ev:
 +            with llfuse.lock:
-                 item = self.inodes.cache.find(ev["object_uuid"])
++                item = self.inodes.inode_cache.find(ev["object_uuid"])
 +                if item:
 +                    item.invalidate()
 +                    item.update()
 +
-                 itemparent = self.inodes.cache.find(ev["object_owner_uuid"])
++                itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
 +                if itemparent:
 +                    itemparent.invalidate()
 +                    itemparent.update()
 +
 +    @catch_exceptions
      def getattr(self, inode):
          if inode not in self.inodes:
              raise llfuse.FUSEError(errno.ENOENT)
@@@ -351,35 -269,15 +358,35 @@@
          except arvados.errors.NotFoundError as e:
              _logger.warning("Block not found: " + str(e))
              raise llfuse.FUSEError(errno.EIO)
 -        except Exception:
 -            _logger.exception()
 -            raise llfuse.FUSEError(errno.EIO)
  
 +    @catch_exceptions
 +    def write(self, fh, off, buf):
 +        _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
 +        if fh in self._filehandles:
 +            handle = self._filehandles[fh]
 +        else:
 +            raise llfuse.FUSEError(errno.EBADF)
 +
 +        if not handle.obj.writable():
 +            raise llfuse.FUSEError(errno.EPERM)
 +
 +        self.inodes.touch(handle.obj)
 +
 +        with llfuse.lock_released:
 +            return handle.obj.writeto(off, buf, self.num_retries)
 +
 +    @catch_exceptions
      def release(self, fh):
          if fh in self._filehandles:
 +            try:
 +                self._filehandles[fh].flush()
 +            except EnvironmentError as e:
 +                raise llfuse.FUSEError(e.errno)
 +            except Exception:
 +                _logger.exception("Flush error")
              self._filehandles[fh].release()
              del self._filehandles[fh]
-         self.inodes.cache.cap_cache()
+         self.inodes.inode_cache.cap_cache()
  
      def releasedir(self, fh):
          self.release(fh)
diff --cc services/fuse/arvados_fuse/fresh.py
index e185fba,5acadfd..aeb8f73
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@@ -31,8 -31,8 +31,11 @@@ class FreshBase(object)
          self._atime = time.time()
          self._poll_time = 60
          self.use_count = 0
 -        self.cache_priority = 0
 +        self.ref_count = 0
 +        self.dead = False
++        self.cache_priority = None
+         self.cache_size = 0
++        self.cache_uuid = None
  
      # Mark the value as stale
      def invalidate(self):
diff --cc services/fuse/arvados_fuse/fusedir.py
index d77fba4,77e8dde..898af31
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@@ -276,35 -226,27 +276,35 @@@ class CollectionDirectory(CollectionDir
                  return True
  
              with llfuse.lock_released:
 -                coll_reader = arvados.CollectionReader(
 -                    self.collection_locator, self.api, self.api.keep,
 -                    num_retries=self.num_retries)
 -                new_collection_object = coll_reader.api_response() or {}
 -                # If the Collection only exists in Keep, there will be no API
 -                # response.  Fill in the fields we need.
 -                if 'uuid' not in new_collection_object:
 -                    new_collection_object['uuid'] = self.collection_locator
 -                if "portable_data_hash" not in new_collection_object:
 -                    new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
 -                if 'manifest_text' not in new_collection_object:
 -                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
 -                coll_reader.normalize()
 +                _logger.debug("Updating %s", self.collection_locator)
 +                if self.collection:
 +                    self.collection.update()
 +                else:
 +                    if uuid_pattern.match(self.collection_locator):
 +                        coll_reader = arvados.collection.Collection(
 +                            self.collection_locator, self.api, self.api.keep,
 +                            num_retries=self.num_retries)
 +                    else:
 +                        coll_reader = arvados.collection.CollectionReader(
 +                            self.collection_locator, self.api, self.api.keep,
 +                            num_retries=self.num_retries)
 +                    new_collection_record = coll_reader.api_response() or {}
 +                    # If the Collection only exists in Keep, there will be no API
 +                    # response.  Fill in the fields we need.
 +                    if 'uuid' not in new_collection_record:
 +                        new_collection_record['uuid'] = self.collection_locator
 +                    if "portable_data_hash" not in new_collection_record:
 +                        new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
 +                    if 'manifest_text' not in new_collection_record:
 +                        new_collection_record['manifest_text'] = coll_reader.manifest_text()
-             # end with llfuse.lock_released, re-acquire lock
 +
-             if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record["portable_data_hash"]:
-                 self.new_collection(new_collection_record, coll_reader)
++                    if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
++                        self.new_collection(new_collection_record, coll_reader)
 +
-             self._manifest_size = len(coll_reader.manifest_text())
-             _logger.debug("%s manifest_size %i", self, self._manifest_size)
++                    self._manifest_size = len(coll_reader.manifest_text())
++                    _logger.debug("%s manifest_size %i", self, self._manifest_size)
+             # end with llfuse.lock_released, re-acquire lock
  
 -            if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
 -                self.new_collection(new_collection_object, coll_reader)
 -
 -            self._manifest_size = len(coll_reader.manifest_text())
 -            _logger.debug("%s manifest_size %i", self, self._manifest_size)
 -
              self.fresh()
              return True
          except arvados.errors.NotFoundError:
diff --cc services/fuse/tests/test_inodes.py
index 0000000,82bb716..2c5f3dc
mode 000000,100644..100644
--- a/services/fuse/tests/test_inodes.py
+++ b/services/fuse/tests/test_inodes.py
@@@ -1,0 -1,87 +1,88 @@@
+ import arvados_fuse
+ import mock
+ import unittest
+ 
+ class InodeTests(unittest.TestCase):
+     def test_inodes(self):
+         cache = arvados_fuse.InodeCache(1000, 4)
+         inodes = arvados_fuse.Inodes(cache)
+ 
+         # Check that ent1 gets added to inodes
+         ent1 = mock.MagicMock()
+         ent1.return_value.in_use = False
+         ent1.persisted.return_value = True
+         ent1.clear.return_value = True
+         ent1.objsize.return_value = 500
+         inodes.add_entry(ent1)
+         self.assertIn(ent1.inode, inodes)
+         self.assertIs(inodes[ent1.inode], ent1)
+         self.assertEqual(500, cache.total())
+ 
+         # ent2 is not persisted, so it doesn't
+         # affect the cache total
+         ent2 = mock.MagicMock()
+         ent2.return_value.in_use = False
+         ent2.persisted.return_value = False
+         ent2.objsize.return_value = 600
+         inodes.add_entry(ent2)
+         self.assertEqual(500, cache.total())
+ 
+         # ent3 is persisted, adding it should cause ent1 to get cleared
+         ent3 = mock.MagicMock()
+         ent3.return_value.in_use = False
+         ent3.persisted.return_value = True
+         ent3.objsize.return_value = 600
+         ent3.clear.return_value = True
+ 
+         self.assertFalse(ent1.clear.called)
+         inodes.add_entry(ent3)
+ 
+         # Won't clear anything because min_entries = 4
+         self.assertEqual(2, len(cache._entries))
+         self.assertFalse(ent1.clear.called)
+         self.assertEqual(1100, cache.total())
+ 
+         # Change min_entries
+         cache.min_entries = 1
+         cache.cap_cache()
+         self.assertEqual(600, cache.total())
+         self.assertTrue(ent1.clear.called)
+ 
+         # Touching ent1 should cause ent3 to get cleared
+         self.assertFalse(ent3.clear.called)
+         cache.touch(ent1)
+         self.assertTrue(ent3.clear.called)
+         self.assertEqual(500, cache.total())
+ 
+         # ent1, ent3 clear return false, can't be cleared
+         ent1.clear.return_value = False
+         ent3.clear.return_value = False
+         ent1.clear.called = False
+         ent3.clear.called = False
+         self.assertFalse(ent1.clear.called)
+         self.assertFalse(ent3.clear.called)
+         cache.touch(ent3)
+         self.assertTrue(ent1.clear.called)
+         self.assertTrue(ent3.clear.called)
+         self.assertEqual(1100, cache.total())
+ 
+         # ent1 clear return false, so ent3
+         # gets cleared
+         ent1.clear.return_value = False
+         ent3.clear.return_value = True
+         ent1.clear.called = False
+         ent3.clear.called = False
+         self.assertFalse(ent1.clear.called)
+         self.assertFalse(ent3.clear.called)
+         cache.touch(ent3)
+         self.assertTrue(ent1.clear.called)
+         self.assertTrue(ent3.clear.called)
+         self.assertEqual(500, cache.total())
+ 
+         # Delete ent1
+         ent1.clear.return_value = True
++        ent1.ref_count = 0
+         inodes.del_entry(ent1)
+         self.assertEqual(0, cache.total())
+         cache.touch(ent3)
+         self.assertEqual(600, cache.total())
diff --cc services/fuse/tests/test_mount.py
index 9a17957,5535494..c7d6387
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@@ -28,14 -25,13 +28,14 @@@ class MountTestBase(unittest.TestCase)
          self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
  
      def make_mount(self, root_class, **root_kwargs):
-         self.operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
 -        operations = fuse.Operations(os.getuid(), os.getgid())
 -        operations.inodes.add_entry(root_class(
 -            llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
 -        llfuse.init(operations, self.mounttmp, [])
++        self.operations = fuse.Operations(os.getuid(), os.getgid())
 +        self.operations.inodes.add_entry(root_class(
 +            llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
-         llfuse.init(self.operations, self.mounttmp, ['debug'])
++        llfuse.init(self.operations, self.mounttmp, [])
          threading.Thread(None, llfuse.main).start()
          # wait until the driver is finished initializing
 -        operations.initlock.wait()
 +        self.operations.initlock.wait()
 +        return self.operations.inodes[llfuse.ROOT_INODE]
  
      def tearDown(self):
          # llfuse.close is buggy, so use fusermount instead.

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list