[ARVADOS] updated: d2f68bd1e108c3f2dda2322c427050d019b17e04
git at public.curoverse.com
git at public.curoverse.com
Thu May 7 17:11:59 EDT 2015
Summary of changes:
services/fuse/arvados_fuse/__init__.py | 57 ++++++++++++----------
services/fuse/arvados_fuse/fresh.py | 3 ++
services/fuse/arvados_fuse/fusedir.py | 12 ++---
services/fuse/bin/arv-mount | 12 +++--
services/fuse/setup.py | 2 +-
services/fuse/tests/test_inodes.py | 88 ++++++++++++++++++++++++++++++++++
services/fuse/tests/test_mount.py | 4 +-
7 files changed, 140 insertions(+), 38 deletions(-)
create mode 100644 services/fuse/tests/test_inodes.py
via d2f68bd1e108c3f2dda2322c427050d019b17e04 (commit)
via 0c01dc225cf57ec886b952440abd0764a85c7a67 (commit)
via fa4c5476b4189798b2123c6948d18db43449329e (commit)
from 296e1b20ecf2c283508712332f163c06464b3b5d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d2f68bd1e108c3f2dda2322c427050d019b17e04
Merge: 296e1b2 0c01dc2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu May 7 17:11:51 2015 -0400
Merge branch '3198-inode-cache' into 3198-writable-fuse, fix tests.
Conflicts:
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/tests/test_mount.py
diff --cc services/fuse/arvados_fuse/__init__.py
index b3992ef,2387c69..1b44eaf
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@@ -67,9 -52,13 +67,9 @@@ class DirectoryHandle(Handle)
class InodeCache(object):
- def __init__(self, cap):
+ def __init__(self, cap, min_entries=4):
self._entries = collections.OrderedDict()
+ self._by_uuid = {}
self._counter = itertools.count(1)
self.cap = cap
self._total = 0
@@@ -78,11 -71,8 +82,11 @@@
if clear and not obj.clear():
_logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
return False
- self._total -= obj._cache_size
- del self._entries[obj._cache_priority]
- if obj._cache_uuid:
- del self._by_uuid[obj._cache_uuid]
- obj._cache_uuid = None
+ self._total -= obj.cache_size
+ del self._entries[obj.cache_priority]
++ if obj.cache_uuid:
++ del self._by_uuid[obj.cache_uuid]
++ obj.cache_uuid = None
_logger.debug("Cleared %s total now %i", obj, self._total)
return True
@@@ -96,17 -86,12 +100,17 @@@
def manage(self, obj):
if obj.persisted():
- obj._cache_priority = next(self._counter)
- obj._cache_size = obj.objsize()
- self._entries[obj._cache_priority] = obj
- if obj.uuid():
- obj._cache_uuid = obj.uuid()
- self._by_uuid[obj._cache_uuid] = obj
+ obj.cache_priority = next(self._counter)
+ obj.cache_size = obj.objsize()
+ self._entries[obj.cache_priority] = obj
++ obj.cache_uuid = obj.uuid()
++ if obj.cache_uuid:
++ self._by_uuid[obj.cache_uuid] = obj
self._total += obj.objsize()
_logger.debug("Managing %s total now %i", obj, self._total)
self.cap_cache()
+ else:
- obj._cache_priority = None
++ obj.cache_priority = None
def touch(self, obj):
if obj.persisted():
@@@ -116,12 -101,9 +120,12 @@@
_logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
def unmanage(self, obj):
- if obj.persisted() and obj._cache_priority in self._entries:
+ if obj.persisted() and obj.cache_priority in self._entries:
self._remove(obj, True)
+ def find(self, uuid):
+ return self._by_uuid.get(uuid)
+
class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
@@@ -157,29 -139,9 +161,29 @@@
return entry
def del_entry(self, entry):
- self.inode_cache.unmanage(entry)
- llfuse.invalidate_inode(entry.inode)
- del self._entries[entry.inode]
+ if entry.ref_count == 0:
+ _logger.warn("Deleting inode %i", entry.inode)
- self.cache.unmanage(entry)
++ self.inode_cache.unmanage(entry)
+ llfuse.invalidate_inode(entry.inode)
+ del self._entries[entry.inode]
+ else:
+ _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
+ entry.dead = True
+
+def catch_exceptions(orig_func):
+ @functools.wraps(orig_func)
+ def catch_exceptions_wrapper(self, *args, **kwargs):
+ try:
+ return orig_func(self, *args, **kwargs)
+ except llfuse.FUSEError:
+ raise
+ except EnvironmentError as e:
+ raise llfuse.FUSEError(e.errno)
+ except:
+ _logger.exception("Unhandled exception during FUSE operation")
+ raise llfuse.FUSEError(errno.EIO)
+
+ return catch_exceptions_wrapper
class Operations(llfuse.Operations):
@@@ -194,7 -156,7 +198,10 @@@
"""
- def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
- def __init__(self, uid, gid, encoding="utf-8", inode_cache=InodeCache(cap=256*1024*1024)):
++ def __init__(self, uid, gid,
++ encoding="utf-8",
++ inode_cache=InodeCache(cap=256*1024*1024),
++ num_retries=4):
super(Operations, self).__init__()
self.inodes = Inodes(inode_cache)
@@@ -226,25 -180,6 +233,25 @@@
def access(self, inode, mode, ctx):
return True
+ def listen_for_events(self, api_client):
+ self.event = arvados.events.subscribe(api_client,
+ [["event_type", "in", ["create", "update", "delete"]]],
+ self.on_event)
+
+ def on_event(self, ev):
+ if 'event_type' in ev:
+ with llfuse.lock:
- item = self.inodes.cache.find(ev["object_uuid"])
++ item = self.inodes.inode_cache.find(ev["object_uuid"])
+ if item:
+ item.invalidate()
+ item.update()
+
- itemparent = self.inodes.cache.find(ev["object_owner_uuid"])
++ itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
+ if itemparent:
+ itemparent.invalidate()
+ itemparent.update()
+
+ @catch_exceptions
def getattr(self, inode):
if inode not in self.inodes:
raise llfuse.FUSEError(errno.ENOENT)
@@@ -351,35 -269,15 +358,35 @@@
except arvados.errors.NotFoundError as e:
_logger.warning("Block not found: " + str(e))
raise llfuse.FUSEError(errno.EIO)
- except Exception:
- _logger.exception()
- raise llfuse.FUSEError(errno.EIO)
+ @catch_exceptions
+ def write(self, fh, off, buf):
+ _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
+ if fh in self._filehandles:
+ handle = self._filehandles[fh]
+ else:
+ raise llfuse.FUSEError(errno.EBADF)
+
+ if not handle.obj.writable():
+ raise llfuse.FUSEError(errno.EPERM)
+
+ self.inodes.touch(handle.obj)
+
+ with llfuse.lock_released:
+ return handle.obj.writeto(off, buf, self.num_retries)
+
+ @catch_exceptions
def release(self, fh):
if fh in self._filehandles:
+ try:
+ self._filehandles[fh].flush()
+ except EnvironmentError as e:
+ raise llfuse.FUSEError(e.errno)
+ except Exception:
+ _logger.exception("Flush error")
self._filehandles[fh].release()
del self._filehandles[fh]
- self.inodes.cache.cap_cache()
+ self.inodes.inode_cache.cap_cache()
def releasedir(self, fh):
self.release(fh)
diff --cc services/fuse/arvados_fuse/fresh.py
index e185fba,5acadfd..aeb8f73
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@@ -31,8 -31,8 +31,11 @@@ class FreshBase(object)
self._atime = time.time()
self._poll_time = 60
self.use_count = 0
- self.cache_priority = 0
+ self.ref_count = 0
+ self.dead = False
++ self.cache_priority = None
+ self.cache_size = 0
++ self.cache_uuid = None
# Mark the value as stale
def invalidate(self):
diff --cc services/fuse/arvados_fuse/fusedir.py
index d77fba4,77e8dde..898af31
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@@ -276,35 -226,27 +276,35 @@@ class CollectionDirectory(CollectionDir
return True
with llfuse.lock_released:
- coll_reader = arvados.CollectionReader(
- self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
- new_collection_object = coll_reader.api_response() or {}
- # If the Collection only exists in Keep, there will be no API
- # response. Fill in the fields we need.
- if 'uuid' not in new_collection_object:
- new_collection_object['uuid'] = self.collection_locator
- if "portable_data_hash" not in new_collection_object:
- new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
- if 'manifest_text' not in new_collection_object:
- new_collection_object['manifest_text'] = coll_reader.manifest_text()
- coll_reader.normalize()
+ _logger.debug("Updating %s", self.collection_locator)
+ if self.collection:
+ self.collection.update()
+ else:
+ if uuid_pattern.match(self.collection_locator):
+ coll_reader = arvados.collection.Collection(
+ self.collection_locator, self.api, self.api.keep,
+ num_retries=self.num_retries)
+ else:
+ coll_reader = arvados.collection.CollectionReader(
+ self.collection_locator, self.api, self.api.keep,
+ num_retries=self.num_retries)
+ new_collection_record = coll_reader.api_response() or {}
+ # If the Collection only exists in Keep, there will be no API
+ # response. Fill in the fields we need.
+ if 'uuid' not in new_collection_record:
+ new_collection_record['uuid'] = self.collection_locator
+ if "portable_data_hash" not in new_collection_record:
+ new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+ if 'manifest_text' not in new_collection_record:
+ new_collection_record['manifest_text'] = coll_reader.manifest_text()
- # end with llfuse.lock_released, re-acquire lock
+
- if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record["portable_data_hash"]:
- self.new_collection(new_collection_record, coll_reader)
++ if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
++ self.new_collection(new_collection_record, coll_reader)
+
- self._manifest_size = len(coll_reader.manifest_text())
- _logger.debug("%s manifest_size %i", self, self._manifest_size)
++ self._manifest_size = len(coll_reader.manifest_text())
++ _logger.debug("%s manifest_size %i", self, self._manifest_size)
+ # end with llfuse.lock_released, re-acquire lock
- if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
- self.new_collection(new_collection_object, coll_reader)
-
- self._manifest_size = len(coll_reader.manifest_text())
- _logger.debug("%s manifest_size %i", self, self._manifest_size)
-
self.fresh()
return True
except arvados.errors.NotFoundError:
diff --cc services/fuse/tests/test_inodes.py
index 0000000,82bb716..2c5f3dc
mode 000000,100644..100644
--- a/services/fuse/tests/test_inodes.py
+++ b/services/fuse/tests/test_inodes.py
@@@ -1,0 -1,87 +1,88 @@@
+ import arvados_fuse
+ import mock
+ import unittest
+
+ class InodeTests(unittest.TestCase):
+ def test_inodes(self):
+ cache = arvados_fuse.InodeCache(1000, 4)
+ inodes = arvados_fuse.Inodes(cache)
+
+ # Check that ent1 gets added to inodes
+ ent1 = mock.MagicMock()
+ ent1.return_value.in_use = False
+ ent1.persisted.return_value = True
+ ent1.clear.return_value = True
+ ent1.objsize.return_value = 500
+ inodes.add_entry(ent1)
+ self.assertIn(ent1.inode, inodes)
+ self.assertIs(inodes[ent1.inode], ent1)
+ self.assertEqual(500, cache.total())
+
+ # ent2 is not persisted, so it doesn't
+ # affect the cache total
+ ent2 = mock.MagicMock()
+ ent2.return_value.in_use = False
+ ent2.persisted.return_value = False
+ ent2.objsize.return_value = 600
+ inodes.add_entry(ent2)
+ self.assertEqual(500, cache.total())
+
+ # ent3 is persisted, adding it should cause ent1 to get cleared
+ ent3 = mock.MagicMock()
+ ent3.return_value.in_use = False
+ ent3.persisted.return_value = True
+ ent3.objsize.return_value = 600
+ ent3.clear.return_value = True
+
+ self.assertFalse(ent1.clear.called)
+ inodes.add_entry(ent3)
+
+ # Won't clear anything because min_entries = 4
+ self.assertEqual(2, len(cache._entries))
+ self.assertFalse(ent1.clear.called)
+ self.assertEqual(1100, cache.total())
+
+ # Change min_entries
+ cache.min_entries = 1
+ cache.cap_cache()
+ self.assertEqual(600, cache.total())
+ self.assertTrue(ent1.clear.called)
+
+ # Touching ent1 should cause ent3 to get cleared
+ self.assertFalse(ent3.clear.called)
+ cache.touch(ent1)
+ self.assertTrue(ent3.clear.called)
+ self.assertEqual(500, cache.total())
+
+ # ent1, ent3 clear return false, can't be cleared
+ ent1.clear.return_value = False
+ ent3.clear.return_value = False
+ ent1.clear.called = False
+ ent3.clear.called = False
+ self.assertFalse(ent1.clear.called)
+ self.assertFalse(ent3.clear.called)
+ cache.touch(ent3)
+ self.assertTrue(ent1.clear.called)
+ self.assertTrue(ent3.clear.called)
+ self.assertEqual(1100, cache.total())
+
+ # ent1 clear return false, so ent3
+ # gets cleared
+ ent1.clear.return_value = False
+ ent3.clear.return_value = True
+ ent1.clear.called = False
+ ent3.clear.called = False
+ self.assertFalse(ent1.clear.called)
+ self.assertFalse(ent3.clear.called)
+ cache.touch(ent3)
+ self.assertTrue(ent1.clear.called)
+ self.assertTrue(ent3.clear.called)
+ self.assertEqual(500, cache.total())
+
+ # Delete ent1
+ ent1.clear.return_value = True
++ ent1.ref_count = 0
+ inodes.del_entry(ent1)
+ self.assertEqual(0, cache.total())
+ cache.touch(ent3)
+ self.assertEqual(600, cache.total())
diff --cc services/fuse/tests/test_mount.py
index 9a17957,5535494..c7d6387
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@@ -28,14 -25,13 +28,14 @@@ class MountTestBase(unittest.TestCase)
self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
def make_mount(self, root_class, **root_kwargs):
- self.operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
- operations = fuse.Operations(os.getuid(), os.getgid())
- operations.inodes.add_entry(root_class(
- llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
- llfuse.init(operations, self.mounttmp, [])
++ self.operations = fuse.Operations(os.getuid(), os.getgid())
+ self.operations.inodes.add_entry(root_class(
+ llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
- llfuse.init(self.operations, self.mounttmp, ['debug'])
++ llfuse.init(self.operations, self.mounttmp, [])
threading.Thread(None, llfuse.main).start()
# wait until the driver is finished initializing
- operations.initlock.wait()
+ self.operations.initlock.wait()
+ return self.operations.inodes[llfuse.ROOT_INODE]
def tearDown(self):
# llfuse.close is buggy, so use fusermount instead.
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list