[ARVADOS] updated: 296e1b20ecf2c283508712332f163c06464b3b5d

git at public.curoverse.com git at public.curoverse.com
Tue Apr 21 17:12:12 EDT 2015


Summary of changes:
 sdk/python/arvados/collection.py       |  2 +-
 services/fuse/arvados_fuse/__init__.py | 50 +++++++++++++++----
 services/fuse/arvados_fuse/fresh.py    |  3 ++
 services/fuse/arvados_fuse/fusedir.py  | 88 +++++++++++++++++++---------------
 services/fuse/tests/test_mount.py      | 74 ++++++++++++++++------------
 5 files changed, 136 insertions(+), 81 deletions(-)

       via  296e1b20ecf2c283508712332f163c06464b3b5d (commit)
      from  f69d2824c997c53caa11d30ba816768bad52e12b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 296e1b20ecf2c283508712332f163c06464b3b5d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Apr 21 17:12:07 2015 -0400

    3198: Support for listening for events to trigger collection updates.  TODO: add to arv-mount

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index c90f7b3..89cbda9 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1283,7 +1283,7 @@ class Collection(RichCollectionBase):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
-        if exc_type is not None:
+        if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
         if self._block_manager is not None:
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index e140672..b3992ef 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -69,6 +69,7 @@ class DirectoryHandle(Handle):
 class InodeCache(object):
     def __init__(self, cap):
         self._entries = collections.OrderedDict()
+        self._by_uuid = {}
         self._counter = itertools.count(1)
         self.cap = cap
         self._total = 0
@@ -79,27 +80,33 @@ class InodeCache(object):
             return False
         self._total -= obj._cache_size
         del self._entries[obj._cache_priority]
+        if obj._cache_uuid:
+            del self._by_uuid[obj._cache_uuid]
+            obj._cache_uuid = None
         _logger.debug("Cleared %s total now %i", obj, self._total)
         return True
 
     def cap_cache(self):
         _logger.debug("total is %i cap is %i", self._total, self.cap)
         if self._total > self.cap:
-            need_gc = False
             for key in list(self._entries.keys()):
                 if self._total < self.cap or len(self._entries) < 4:
                     break
                 self._remove(self._entries[key], True)
 
-
     def manage(self, obj):
         if obj.persisted():
             obj._cache_priority = next(self._counter)
             obj._cache_size = obj.objsize()
             self._entries[obj._cache_priority] = obj
+            if obj.uuid():
+                obj._cache_uuid = obj.uuid()
+                self._by_uuid[obj._cache_uuid] = obj
             self._total += obj.objsize()
             _logger.debug("Managing %s total now %i", obj, self._total)
             self.cap_cache()
+        else:
+            obj._cache_priority = None
 
     def touch(self, obj):
         if obj.persisted():
@@ -112,6 +119,8 @@ class InodeCache(object):
         if obj.persisted() and obj._cache_priority in self._entries:
             self._remove(obj, True)
 
+    def find(self, uuid):
+        return self._by_uuid.get(uuid)
 
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
@@ -120,7 +129,7 @@ class Inodes(object):
     def __init__(self, inode_cache=256*1024*1024):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
-        self._obj_cache = InodeCache(cap=inode_cache)
+        self.cache = InodeCache(cap=inode_cache)
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -139,21 +148,18 @@ class Inodes(object):
 
     def touch(self, entry):
         entry._atime = time.time()
-        self._obj_cache.touch(entry)
-
-    def cap_cache(self):
-        self._obj_cache.cap_cache()
+        self.cache.touch(entry)
 
     def add_entry(self, entry):
         entry.inode = next(self._counter)
         self._entries[entry.inode] = entry
-        self._obj_cache.manage(entry)
+        self.cache.manage(entry)
         return entry
 
     def del_entry(self, entry):
         if entry.ref_count == 0:
             _logger.warn("Deleting inode %i", entry.inode)
-            self._obj_cache.unmanage(entry)
+            self.cache.unmanage(entry)
             llfuse.invalidate_inode(entry.inode)
             del self._entries[entry.inode]
         else:
@@ -206,14 +212,38 @@ class Operations(llfuse.Operations):
 
         self.num_retries = num_retries
 
+        self.events = None
+
     def init(self):
         # Allow threads that are waiting for the driver to be finished
         # initializing to continue
         self.initlock.set()
 
+    def destroy(self):
+        if self.events:
+            self.events.close()
+
     def access(self, inode, mode, ctx):
         return True
 
+    def listen_for_events(self, api_client):
+        self.event = arvados.events.subscribe(api_client,
+                                 [["event_type", "in", ["create", "update", "delete"]]],
+                                 self.on_event)
+
+    def on_event(self, ev):
+        if 'event_type' in ev:
+            with llfuse.lock:
+                item = self.inodes.cache.find(ev["object_uuid"])
+                if item:
+                    item.invalidate()
+                    item.update()
+
+                itemparent = self.inodes.cache.find(ev["object_owner_uuid"])
+                if itemparent:
+                    itemparent.invalidate()
+                    itemparent.update()
+
     @catch_exceptions
     def getattr(self, inode):
         if inode not in self.inodes:
@@ -349,7 +379,7 @@ class Operations(llfuse.Operations):
                 _logger.exception("Flush error")
             self._filehandles[fh].release()
             del self._filehandles[fh]
-        self.inodes.cap_cache()
+        self.inodes.cache.cap_cache()
 
     def releasedir(self, fh):
         self.release(fh)
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 08ffb88..e185fba 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -78,3 +78,6 @@ class FreshBase(object):
 
     def objsize(self):
         return 0
+
+    def uuid(self):
+        return None
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index fd9fc72..d77fba4 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -219,8 +219,8 @@ class CollectionDirectory(CollectionDirectoryBase):
         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
         self.api = api
         self.num_retries = num_retries
-        self.collection_object_file = None
-        self.collection_object = None
+        self.collection_record_file = None
+        self.collection_record = None
         if isinstance(collection_record, dict):
             self.collection_locator = collection_record['uuid']
             self._mtime = convertTime(collection_record.get('modified_at'))
@@ -245,27 +245,30 @@ class CollectionDirectory(CollectionDirectoryBase):
         """
 
         self.collection_locator = new_locator
-        self.collection_object = None
+        self.collection_record = None
         self.update()
 
-    def new_collection(self, new_collection_object, coll_reader):
+    def new_collection(self, new_collection_record, coll_reader):
         if self.inode:
             self.clear(force=True)
 
-        self.collection_object = new_collection_object
+        self.collection_record = new_collection_record
 
-        if self.collection_object:
-            self._mtime = convertTime(self.collection_object.get('modified_at'))
-
-            if self.collection_object_file is not None:
-                self.collection_object_file.update(self.collection_object)
+        if self.collection_record:
+            self._mtime = convertTime(self.collection_record.get('modified_at'))
+            self.collection_locator = self.collection_record["uuid"]
+            if self.collection_record_file is not None:
+                self.collection_record_file.update(self.collection_record)
 
         self.collection = coll_reader
         self.populate(self.mtime())
 
+    def uuid(self):
+        return self.collection_locator
+
     def update(self):
         try:
-            if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
+            if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
 
             if self.collection_locator is None:
@@ -273,27 +276,31 @@ class CollectionDirectory(CollectionDirectoryBase):
                 return True
 
             with llfuse.lock_released:
-                if uuid_pattern.match(self.collection_locator):
-                    coll_reader = arvados.collection.Collection(
-                        self.collection_locator, self.api, self.api.keep,
-                        num_retries=self.num_retries)
+                _logger.debug("Updating %s", self.collection_locator)
+                if self.collection:
+                    self.collection.update()
                 else:
-                    coll_reader = arvados.collection.CollectionReader(
-                        self.collection_locator, self.api, self.api.keep,
-                        num_retries=self.num_retries)
-                new_collection_object = coll_reader.api_response() or {}
-                # If the Collection only exists in Keep, there will be no API
-                # response.  Fill in the fields we need.
-                if 'uuid' not in new_collection_object:
-                    new_collection_object['uuid'] = self.collection_locator
-                if "portable_data_hash" not in new_collection_object:
-                    new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
-                if 'manifest_text' not in new_collection_object:
-                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
+                    if uuid_pattern.match(self.collection_locator):
+                        coll_reader = arvados.collection.Collection(
+                            self.collection_locator, self.api, self.api.keep,
+                            num_retries=self.num_retries)
+                    else:
+                        coll_reader = arvados.collection.CollectionReader(
+                            self.collection_locator, self.api, self.api.keep,
+                            num_retries=self.num_retries)
+                    new_collection_record = coll_reader.api_response() or {}
+                    # If the Collection only exists in Keep, there will be no API
+                    # response.  Fill in the fields we need.
+                    if 'uuid' not in new_collection_record:
+                        new_collection_record['uuid'] = self.collection_locator
+                    if "portable_data_hash" not in new_collection_record:
+                        new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+                    if 'manifest_text' not in new_collection_record:
+                        new_collection_record['manifest_text'] = coll_reader.manifest_text()
             # end with llfuse.lock_released, re-acquire lock
 
-            if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
-                self.new_collection(new_collection_object, coll_reader)
+            if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record["portable_data_hash"]:
+                self.new_collection(new_collection_record, coll_reader)
 
             self._manifest_size = len(coll_reader.manifest_text())
             _logger.debug("%s manifest_size %i", self, self._manifest_size)
@@ -304,21 +311,21 @@ class CollectionDirectory(CollectionDirectoryBase):
             _logger.exception("arv-mount %s: error", self.collection_locator)
         except arvados.errors.ArgumentError as detail:
             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         except Exception:
             _logger.exception("arv-mount %s: error", self.collection_locator)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         return False
 
     def __getitem__(self, item):
         self.checkupdate()
         if item == '.arvados#collection':
-            if self.collection_object_file is None:
-                self.collection_object_file = ObjectFile(self.inode, self.collection_object)
-                self.inodes.add_entry(self.collection_object_file)
-            return self.collection_object_file
+            if self.collection_record_file is None:
+                self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+                self.inodes.add_entry(self.collection_record_file)
+            return self.collection_record_file
         else:
             return super(CollectionDirectory, self).__getitem__(item)
 
@@ -329,8 +336,8 @@ class CollectionDirectory(CollectionDirectoryBase):
             return super(CollectionDirectory, self).__contains__(k)
 
     def invalidate(self):
-        self.collection_object = None
-        self.collection_object_file = None
+        self.collection_record = None
+        self.collection_record_file = None
         super(CollectionDirectory, self).invalidate()
 
     def persisted(self):
@@ -498,6 +505,9 @@ class ProjectDirectory(Directory):
         else:
             return None
 
+    def uuid(self):
+        return self.uuid
+
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 70cfbef..9a17957 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -18,22 +18,6 @@ import run_test_server
 
 logger = logging.getLogger('arvados.arv-mount')
 
-import threading, sys, traceback
-
-def dumpstacks(signal, frame):
-    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
-    code = []
-    for threadId, stack in sys._current_frames().items():
-        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
-        for filename, lineno, name, line in traceback.extract_stack(stack):
-            code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
-            if line:
-                code.append("  %s" % (line.strip()))
-    print "\n".join(code)
-
-import signal
-signal.signal(signal.SIGUSR1, dumpstacks)
-
 class MountTestBase(unittest.TestCase):
     def setUp(self):
         self.keeptmp = tempfile.mkdtemp()
@@ -44,14 +28,14 @@ class MountTestBase(unittest.TestCase):
         self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def make_mount(self, root_class, **root_kwargs):
-        operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
-        operations.inodes.add_entry(root_class(
-            llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
-        llfuse.init(operations, self.mounttmp, ['debug'])
+        self.operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
+        self.operations.inodes.add_entry(root_class(
+            llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
+        llfuse.init(self.operations, self.mounttmp, ['debug'])
         threading.Thread(None, llfuse.main).start()
         # wait until the driver is finished initializing
-        operations.initlock.wait()
-        return operations.inodes[llfuse.ROOT_INODE]
+        self.operations.initlock.wait()
+        return self.operations.inodes[llfuse.ROOT_INODE]
 
     def tearDown(self):
         # llfuse.close is buggy, so use fusermount instead.
@@ -341,7 +325,7 @@ class FuseUpdateFileTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
 
         d1 = llfuse.listdir(self.mounttmp)
         self.assertEqual(["file1.txt"], d1)
@@ -366,7 +350,7 @@ class FuseAddFileToCollectionTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
 
         d1 = llfuse.listdir(self.mounttmp)
         self.assertEqual(["file1.txt"], d1)
@@ -390,7 +374,7 @@ class FuseRemoveFileFromCollectionTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
 
         d1 = llfuse.listdir(self.mounttmp)
         self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
@@ -412,7 +396,7 @@ class FuseCreateFileTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
         self.assertNotIn("file1.txt", collection)
@@ -442,7 +426,7 @@ class FuseWriteFileTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
         self.assertNotIn("file1.txt", collection)
@@ -494,7 +478,7 @@ class FuseUpdateFileTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
         with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
@@ -520,7 +504,7 @@ class FuseMkdirTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
         with self.assertRaises(IOError):
@@ -555,7 +539,7 @@ class FuseRmTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
         os.mkdir(os.path.join(self.mounttmp, "testdir"))
@@ -615,7 +599,7 @@ class FuseMvTest(MountTestBase):
 
         m = self.make_mount(fuse.CollectionDirectory)
         with llfuse.lock:
-            m.new_collection(None, collection)
+            m.new_collection(collection.api_response(), collection)
         self.assertTrue(m.writable())
 
         os.mkdir(os.path.join(self.mounttmp, "testdir"))
@@ -645,6 +629,34 @@ class FuseMvTest(MountTestBase):
             r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
 
 
+class FuseUpdateFromEventTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        self.operations.listen_for_events(self.api)
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual([], sorted(d1))
+
+        with arvados.collection.Collection(collection.manifest_locator(), api_client=self.api) as collection2:
+            with collection2.open("file1.txt", "w") as f:
+                f.write("foo")
+
+        time.sleep(1)
+
+        # should show up via event bus notify
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual(["file1.txt"], sorted(d1))
+
+
 class FuseUnitTest(unittest.TestCase):
     def test_sanitize_filename(self):
         acceptable = [

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list