[ARVADOS] updated: 296e1b20ecf2c283508712332f163c06464b3b5d
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 21 17:12:12 EDT 2015
Summary of changes:
sdk/python/arvados/collection.py | 2 +-
services/fuse/arvados_fuse/__init__.py | 50 +++++++++++++++----
services/fuse/arvados_fuse/fresh.py | 3 ++
services/fuse/arvados_fuse/fusedir.py | 88 +++++++++++++++++++---------------
services/fuse/tests/test_mount.py | 74 ++++++++++++++++------------
5 files changed, 136 insertions(+), 81 deletions(-)
via 296e1b20ecf2c283508712332f163c06464b3b5d (commit)
from f69d2824c997c53caa11d30ba816768bad52e12b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 296e1b20ecf2c283508712332f163c06464b3b5d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Apr 21 17:12:07 2015 -0400
3198: Support for listening for events to trigger collection updates. TODO: add to arv-mount
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index c90f7b3..89cbda9 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1283,7 +1283,7 @@ class Collection(RichCollectionBase):
def __exit__(self, exc_type, exc_value, traceback):
"""Support scoped auto-commit in a with: block."""
- if exc_type is not None:
+ if exc_type is None:
if self.writable() and self._has_collection_uuid():
self.save()
if self._block_manager is not None:
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index e140672..b3992ef 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -69,6 +69,7 @@ class DirectoryHandle(Handle):
class InodeCache(object):
def __init__(self, cap):
self._entries = collections.OrderedDict()
+ self._by_uuid = {}
self._counter = itertools.count(1)
self.cap = cap
self._total = 0
@@ -79,27 +80,33 @@ class InodeCache(object):
return False
self._total -= obj._cache_size
del self._entries[obj._cache_priority]
+ if obj._cache_uuid:
+ del self._by_uuid[obj._cache_uuid]
+ obj._cache_uuid = None
_logger.debug("Cleared %s total now %i", obj, self._total)
return True
def cap_cache(self):
_logger.debug("total is %i cap is %i", self._total, self.cap)
if self._total > self.cap:
- need_gc = False
for key in list(self._entries.keys()):
if self._total < self.cap or len(self._entries) < 4:
break
self._remove(self._entries[key], True)
-
def manage(self, obj):
if obj.persisted():
obj._cache_priority = next(self._counter)
obj._cache_size = obj.objsize()
self._entries[obj._cache_priority] = obj
+ if obj.uuid():
+ obj._cache_uuid = obj.uuid()
+ self._by_uuid[obj._cache_uuid] = obj
self._total += obj.objsize()
_logger.debug("Managing %s total now %i", obj, self._total)
self.cap_cache()
+ else:
+ obj._cache_priority = None
def touch(self, obj):
if obj.persisted():
@@ -112,6 +119,8 @@ class InodeCache(object):
if obj.persisted() and obj._cache_priority in self._entries:
self._remove(obj, True)
+ def find(self, uuid):
+ return self._by_uuid.get(uuid)
class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
@@ -120,7 +129,7 @@ class Inodes(object):
def __init__(self, inode_cache=256*1024*1024):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
- self._obj_cache = InodeCache(cap=inode_cache)
+ self.cache = InodeCache(cap=inode_cache)
def __getitem__(self, item):
return self._entries[item]
@@ -139,21 +148,18 @@ class Inodes(object):
def touch(self, entry):
entry._atime = time.time()
- self._obj_cache.touch(entry)
-
- def cap_cache(self):
- self._obj_cache.cap_cache()
+ self.cache.touch(entry)
def add_entry(self, entry):
entry.inode = next(self._counter)
self._entries[entry.inode] = entry
- self._obj_cache.manage(entry)
+ self.cache.manage(entry)
return entry
def del_entry(self, entry):
if entry.ref_count == 0:
_logger.warn("Deleting inode %i", entry.inode)
- self._obj_cache.unmanage(entry)
+ self.cache.unmanage(entry)
llfuse.invalidate_inode(entry.inode)
del self._entries[entry.inode]
else:
@@ -206,14 +212,38 @@ class Operations(llfuse.Operations):
self.num_retries = num_retries
+ self.events = None
+
def init(self):
# Allow threads that are waiting for the driver to be finished
# initializing to continue
self.initlock.set()
+ def destroy(self):
+ if self.events:
+ self.events.close()
+
def access(self, inode, mode, ctx):
return True
+ def listen_for_events(self, api_client):
+ self.event = arvados.events.subscribe(api_client,
+ [["event_type", "in", ["create", "update", "delete"]]],
+ self.on_event)
+
+ def on_event(self, ev):
+ if 'event_type' in ev:
+ with llfuse.lock:
+ item = self.inodes.cache.find(ev["object_uuid"])
+ if item:
+ item.invalidate()
+ item.update()
+
+ itemparent = self.inodes.cache.find(ev["object_owner_uuid"])
+ if itemparent:
+ itemparent.invalidate()
+ itemparent.update()
+
@catch_exceptions
def getattr(self, inode):
if inode not in self.inodes:
@@ -349,7 +379,7 @@ class Operations(llfuse.Operations):
_logger.exception("Flush error")
self._filehandles[fh].release()
del self._filehandles[fh]
- self.inodes.cap_cache()
+ self.inodes.cache.cap_cache()
def releasedir(self, fh):
self.release(fh)
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 08ffb88..e185fba 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -78,3 +78,6 @@ class FreshBase(object):
def objsize(self):
return 0
+
+ def uuid(self):
+ return None
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index fd9fc72..d77fba4 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -219,8 +219,8 @@ class CollectionDirectory(CollectionDirectoryBase):
super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
self.api = api
self.num_retries = num_retries
- self.collection_object_file = None
- self.collection_object = None
+ self.collection_record_file = None
+ self.collection_record = None
if isinstance(collection_record, dict):
self.collection_locator = collection_record['uuid']
self._mtime = convertTime(collection_record.get('modified_at'))
@@ -245,27 +245,30 @@ class CollectionDirectory(CollectionDirectoryBase):
"""
self.collection_locator = new_locator
- self.collection_object = None
+ self.collection_record = None
self.update()
- def new_collection(self, new_collection_object, coll_reader):
+ def new_collection(self, new_collection_record, coll_reader):
if self.inode:
self.clear(force=True)
- self.collection_object = new_collection_object
+ self.collection_record = new_collection_record
- if self.collection_object:
- self._mtime = convertTime(self.collection_object.get('modified_at'))
-
- if self.collection_object_file is not None:
- self.collection_object_file.update(self.collection_object)
+ if self.collection_record:
+ self._mtime = convertTime(self.collection_record.get('modified_at'))
+ self.collection_locator = self.collection_record["uuid"]
+ if self.collection_record_file is not None:
+ self.collection_record_file.update(self.collection_record)
self.collection = coll_reader
self.populate(self.mtime())
+ def uuid(self):
+ return self.collection_locator
+
def update(self):
try:
- if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
+ if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
return True
if self.collection_locator is None:
@@ -273,27 +276,31 @@ class CollectionDirectory(CollectionDirectoryBase):
return True
with llfuse.lock_released:
- if uuid_pattern.match(self.collection_locator):
- coll_reader = arvados.collection.Collection(
- self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ _logger.debug("Updating %s", self.collection_locator)
+ if self.collection:
+ self.collection.update()
else:
- coll_reader = arvados.collection.CollectionReader(
- self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
- new_collection_object = coll_reader.api_response() or {}
- # If the Collection only exists in Keep, there will be no API
- # response. Fill in the fields we need.
- if 'uuid' not in new_collection_object:
- new_collection_object['uuid'] = self.collection_locator
- if "portable_data_hash" not in new_collection_object:
- new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
- if 'manifest_text' not in new_collection_object:
- new_collection_object['manifest_text'] = coll_reader.manifest_text()
+ if uuid_pattern.match(self.collection_locator):
+ coll_reader = arvados.collection.Collection(
+ self.collection_locator, self.api, self.api.keep,
+ num_retries=self.num_retries)
+ else:
+ coll_reader = arvados.collection.CollectionReader(
+ self.collection_locator, self.api, self.api.keep,
+ num_retries=self.num_retries)
+ new_collection_record = coll_reader.api_response() or {}
+ # If the Collection only exists in Keep, there will be no API
+ # response. Fill in the fields we need.
+ if 'uuid' not in new_collection_record:
+ new_collection_record['uuid'] = self.collection_locator
+ if "portable_data_hash" not in new_collection_record:
+ new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+ if 'manifest_text' not in new_collection_record:
+ new_collection_record['manifest_text'] = coll_reader.manifest_text()
# end with llfuse.lock_released, re-acquire lock
- if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
- self.new_collection(new_collection_object, coll_reader)
+ if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record["portable_data_hash"]:
+ self.new_collection(new_collection_record, coll_reader)
self._manifest_size = len(coll_reader.manifest_text())
_logger.debug("%s manifest_size %i", self, self._manifest_size)
@@ -304,21 +311,21 @@ class CollectionDirectory(CollectionDirectoryBase):
_logger.exception("arv-mount %s: error", self.collection_locator)
except arvados.errors.ArgumentError as detail:
_logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
- if self.collection_object is not None and "manifest_text" in self.collection_object:
- _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+ if self.collection_record is not None and "manifest_text" in self.collection_record:
+ _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
except Exception:
_logger.exception("arv-mount %s: error", self.collection_locator)
- if self.collection_object is not None and "manifest_text" in self.collection_object:
- _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+ if self.collection_record is not None and "manifest_text" in self.collection_record:
+ _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
return False
def __getitem__(self, item):
self.checkupdate()
if item == '.arvados#collection':
- if self.collection_object_file is None:
- self.collection_object_file = ObjectFile(self.inode, self.collection_object)
- self.inodes.add_entry(self.collection_object_file)
- return self.collection_object_file
+ if self.collection_record_file is None:
+ self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+ self.inodes.add_entry(self.collection_record_file)
+ return self.collection_record_file
else:
return super(CollectionDirectory, self).__getitem__(item)
@@ -329,8 +336,8 @@ class CollectionDirectory(CollectionDirectoryBase):
return super(CollectionDirectory, self).__contains__(k)
def invalidate(self):
- self.collection_object = None
- self.collection_object_file = None
+ self.collection_record = None
+ self.collection_record_file = None
super(CollectionDirectory, self).invalidate()
def persisted(self):
@@ -498,6 +505,9 @@ class ProjectDirectory(Directory):
else:
return None
+ def uuid(self):
+ return self.uuid
+
def update(self):
if self.project_object_file == None:
self.project_object_file = ObjectFile(self.inode, self.project_object)
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 70cfbef..9a17957 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -18,22 +18,6 @@ import run_test_server
logger = logging.getLogger('arvados.arv-mount')
-import threading, sys, traceback
-
-def dumpstacks(signal, frame):
- id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
- code = []
- for threadId, stack in sys._current_frames().items():
- code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
- for filename, lineno, name, line in traceback.extract_stack(stack):
- code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
- if line:
- code.append(" %s" % (line.strip()))
- print "\n".join(code)
-
-import signal
-signal.signal(signal.SIGUSR1, dumpstacks)
-
class MountTestBase(unittest.TestCase):
def setUp(self):
self.keeptmp = tempfile.mkdtemp()
@@ -44,14 +28,14 @@ class MountTestBase(unittest.TestCase):
self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
def make_mount(self, root_class, **root_kwargs):
- operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
- operations.inodes.add_entry(root_class(
- llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
- llfuse.init(operations, self.mounttmp, ['debug'])
+ self.operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
+ self.operations.inodes.add_entry(root_class(
+ llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
+ llfuse.init(self.operations, self.mounttmp, ['debug'])
threading.Thread(None, llfuse.main).start()
# wait until the driver is finished initializing
- operations.initlock.wait()
- return operations.inodes[llfuse.ROOT_INODE]
+ self.operations.initlock.wait()
+ return self.operations.inodes[llfuse.ROOT_INODE]
def tearDown(self):
# llfuse.close is buggy, so use fusermount instead.
@@ -341,7 +325,7 @@ class FuseUpdateFileTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
d1 = llfuse.listdir(self.mounttmp)
self.assertEqual(["file1.txt"], d1)
@@ -366,7 +350,7 @@ class FuseAddFileToCollectionTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
d1 = llfuse.listdir(self.mounttmp)
self.assertEqual(["file1.txt"], d1)
@@ -390,7 +374,7 @@ class FuseRemoveFileFromCollectionTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
d1 = llfuse.listdir(self.mounttmp)
self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
@@ -412,7 +396,7 @@ class FuseCreateFileTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
self.assertNotIn("file1.txt", collection)
@@ -442,7 +426,7 @@ class FuseWriteFileTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
self.assertNotIn("file1.txt", collection)
@@ -494,7 +478,7 @@ class FuseUpdateFileTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
@@ -520,7 +504,7 @@ class FuseMkdirTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
with self.assertRaises(IOError):
@@ -555,7 +539,7 @@ class FuseRmTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
os.mkdir(os.path.join(self.mounttmp, "testdir"))
@@ -615,7 +599,7 @@ class FuseMvTest(MountTestBase):
m = self.make_mount(fuse.CollectionDirectory)
with llfuse.lock:
- m.new_collection(None, collection)
+ m.new_collection(collection.api_response(), collection)
self.assertTrue(m.writable())
os.mkdir(os.path.join(self.mounttmp, "testdir"))
@@ -645,6 +629,34 @@ class FuseMvTest(MountTestBase):
r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+class FuseUpdateFromEventTest(MountTestBase):
+ def runTest(self):
+ arvados.logger.setLevel(logging.DEBUG)
+
+ collection = arvados.collection.Collection(api_client=self.api)
+ collection.save_new()
+
+ m = self.make_mount(fuse.CollectionDirectory)
+ with llfuse.lock:
+ m.new_collection(collection.api_response(), collection)
+
+ self.operations.listen_for_events(self.api)
+
+ d1 = llfuse.listdir(os.path.join(self.mounttmp))
+ self.assertEqual([], sorted(d1))
+
+ with arvados.collection.Collection(collection.manifest_locator(), api_client=self.api) as collection2:
+ with collection2.open("file1.txt", "w") as f:
+ f.write("foo")
+
+ time.sleep(1)
+
+ # should show up via event bus notify
+
+ d1 = llfuse.listdir(os.path.join(self.mounttmp))
+ self.assertEqual(["file1.txt"], sorted(d1))
+
+
class FuseUnitTest(unittest.TestCase):
def test_sanitize_filename(self):
acceptable = [
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list