[ARVADOS] updated: d1dbdfd430be8a1bdd21fc1f02f8fe5e2d989092

git at public.curoverse.com git at public.curoverse.com
Thu Jun 18 16:38:00 EDT 2015


Summary of changes:
 services/fuse/arvados_fuse/__init__.py |  57 ++++++++++----
 services/fuse/arvados_fuse/fusedir.py  |  18 ++---
 services/fuse/tests/fstest.py          | 133 +++++++++++++++++++++++++++++++++
 services/fuse/tests/prof.py            |  17 +++++
 4 files changed, 201 insertions(+), 24 deletions(-)
 create mode 100644 services/fuse/tests/fstest.py
 create mode 100644 services/fuse/tests/prof.py

       via  d1dbdfd430be8a1bdd21fc1f02f8fe5e2d989092 (commit)
       via  08284382b53f621c09c4ffc87d82fa0261a69d32 (commit)
      from  20ade56019456b41c98021c2ed5a848bd8d018bb (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d1dbdfd430be8a1bdd21fc1f02f8fe5e2d989092
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 18 16:32:21 2015 -0400

    3198: concurrency test and associated fixes

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index fda1b63..38e794c 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1215,7 +1215,6 @@ class Collection(RichCollectionBase):
                 # We've merged this record this before.  Don't do anything.
                 return
             else:
-                _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
@@ -1246,7 +1245,6 @@ class Collection(RichCollectionBase):
 
     def _remember_api_response(self, response):
         self._api_response = response
-        _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
 
     def _populate_from_api_server(self):
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index eba17b3..cab44dd 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -227,41 +227,22 @@ class Inodes(object):
 
     def del_entry(self, entry):
         if entry.ref_count == 0:
-            _logger.debug("Deleting inode %i", entry.inode)
             self.inode_cache.unmanage(entry)
-            _logger.debug("(1) unmanaged inode %i", entry.inode)
-
             del self._entries[entry.inode]
-            _logger.debug("(2) deleted inode %i", entry.inode)
-
             with llfuse.lock_released:
                 entry.finalize()
-                _logger.debug("(3) finalized inode %i", entry.inode)
-
             self.invalidate_inode(entry.inode)
-            _logger.debug("(4) invalidated inode %i", entry.inode)
-
             entry.inode = None
         else:
             entry.dead = True
             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
 
     def invalidate_inode(self, inode):
-        self.deferred_invalidations.append((inode,))
+        llfuse.invalidate_inode(inode)
 
     def invalidate_entry(self, inode, name):
-        self.deferred_invalidations.append((inode, name))
+        llfuse.invalidate_entry(inode, name)
 
-    def do_invalidations(self):
-        di = self.deferred_invalidations
-        self.deferred_invalidations = []
-
-        with llfuse.lock_released:
-            for d in di:
-                if len(d) == 1:
-                    llfuse.invalidate_inode(d[0])
-                elif len(d) == 2:
-                    llfuse.invalidate_entry(d[0], d[1])
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -274,19 +255,18 @@ def catch_exceptions(orig_func):
             raise
         except EnvironmentError as e:
             raise llfuse.FUSEError(e.errno)
+        except arvados.errors.KeepWriteError as e:
+            _logger.error("Keep write error: " + str(e))
+            raise llfuse.FUSEError(errno.EIO)
+        except arvados.errors.NotFoundError as e:
+            _logger.error("Block not found error: " + str(e))
+            raise llfuse.FUSEError(errno.EIO)
         except:
             _logger.exception("Unhandled exception during FUSE operation")
             raise llfuse.FUSEError(errno.EIO)
 
     return catch_exceptions_wrapper
 
-def deferred_invalidate(orig_func):
-    @functools.wraps(orig_func)
-    def deferred_invalidate_wrapper(self, *args, **kwargs):
-        n = orig_func(self, *args, **kwargs)
-        self.inodes.do_invalidations()
-        return n
-    return deferred_invalidate_wrapper
 
 class Operations(llfuse.Operations):
     """This is the main interface with llfuse.
@@ -334,7 +314,10 @@ class Operations(llfuse.Operations):
             self.events = None
 
         for k,v in self.inodes.items():
-            v.finalize()
+            try:
+                v.finalize()
+            except Exception as e:
+                _logger.exception("Error during finalize of inode %i", k)
         self.inodes = None
 
     def access(self, inode, mode, ctx):
@@ -375,7 +358,6 @@ class Operations(llfuse.Operations):
                     itemparent.invalidate()
                     itemparent.update()
 
-                self.inodes.do_invalidations()
 
     @catch_exceptions
     def getattr(self, inode):
@@ -408,8 +390,6 @@ class Operations(llfuse.Operations):
 
         entry.st_size = e.size()
 
-        _logger.debug("getattr got size")
-
         entry.st_blksize = 512
         entry.st_blocks = (entry.st_size/512)+1
         entry.st_atime = int(e.atime())
@@ -457,7 +437,6 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.ENOENT)
 
     @catch_exceptions
-    @deferred_invalidate
     def forget(self, inodes):
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
@@ -493,11 +472,7 @@ class Operations(llfuse.Operations):
 
         self.inodes.touch(handle.obj)
 
-        try:
-            return handle.obj.readfrom(off, size, self.num_retries)
-        except arvados.errors.NotFoundError as e:
-            _logger.error("Block not found: " + str(e))
-            raise llfuse.FUSEError(errno.EIO)
+        return handle.obj.readfrom(off, size, self.num_retries)
 
     @catch_exceptions
     def write(self, fh, off, buf):
@@ -515,24 +490,21 @@ class Operations(llfuse.Operations):
         return handle.obj.writeto(off, buf, self.num_retries)
 
     @catch_exceptions
-    @deferred_invalidate
     def release(self, fh):
         if fh in self._filehandles:
             try:
                 self._filehandles[fh].flush()
-            except EnvironmentError as e:
-                raise llfuse.FUSEError(e.errno)
             except Exception:
-                _logger.exception("Flush error")
-            self._filehandles[fh].release()
-            del self._filehandles[fh]
+                raise
+            finally:
+                self._filehandles[fh].release()
+                del self._filehandles[fh]
         self.inodes.inode_cache.cap_cache()
 
     def releasedir(self, fh):
         self.release(fh)
 
     @catch_exceptions
-    @deferred_invalidate
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
 
@@ -607,7 +579,6 @@ class Operations(llfuse.Operations):
         return p
 
     @catch_exceptions
-    @deferred_invalidate
     def create(self, inode_parent, name, mode, flags, ctx):
         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
 
@@ -624,7 +595,6 @@ class Operations(llfuse.Operations):
         return (fh, self.getattr(f.inode))
 
     @catch_exceptions
-    @deferred_invalidate
     def mkdir(self, inode_parent, name, mode, ctx):
         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
 
@@ -638,21 +608,18 @@ class Operations(llfuse.Operations):
         return self.getattr(d.inode)
 
     @catch_exceptions
-    @deferred_invalidate
     def unlink(self, inode_parent, name):
         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
 
     @catch_exceptions
-    @deferred_invalidate
     def rmdir(self, inode_parent, name):
         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
 
     @catch_exceptions
-    @deferred_invalidate
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
@@ -660,7 +627,6 @@ class Operations(llfuse.Operations):
         dest.rename(name_old, name_new, src)
 
     @catch_exceptions
-    @deferred_invalidate
     def flush(self, fh):
         if fh in self._filehandles:
             self._filehandles[fh].flush()
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 50ea0ea..16b3bb2 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -238,30 +238,18 @@ class CollectionDirectoryBase(Directory):
             name = sanitize_filename(name)
             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
             with llfuse.lock:
-                _logger.debug("on_event got llfuse.lock %s %s %s", event, collection, name)
                 if event == arvados.collection.ADD:
                     self.new_entry(name, item, self.mtime())
                 elif event == arvados.collection.DEL:
-                    _logger.debug("on_event (1) %s %s %s", event, collection, name)
                     ent = self._entries[name]
-                    _logger.debug("on_event (2) %s %s %s", event, collection, name)
                     del self._entries[name]
-
-                    _logger.debug("on_event (3) %s %s %s", event, collection, name)
-
                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
-
-                    _logger.debug("on_event (4) %s %s %s", event, collection, name)
-
                     self.inodes.del_entry(ent)
-
-                    _logger.debug("on_event (5) %s %s %s", event, collection, name)
                 elif event == arvados.collection.MOD:
                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
                         self.inodes.invalidate_inode(item.fuse_entry.inode)
                     elif name in self._entries:
                         self.inodes.invalidate_inode(self._entries[name].inode)
-                _logger.debug("on_event completed %s %s %s", event, collection, name)
 
     def populate(self, mtime):
         self._mtime = mtime
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index e61ba54..4d472cf 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -46,9 +46,7 @@ class FuseArvadosFile(File):
         self.arvfile = arvfile
 
     def size(self):
-        _logger.debug("started calling self.arvfile.size()")
         with llfuse.lock_released:
-            _logger.debug("locked_released and calling self.arvfile.size()")
             return self.arvfile.size()
 
     def readfrom(self, off, size, num_retries=0):
diff --git a/services/fuse/tests/fstest.py b/services/fuse/tests/fstest.py
new file mode 100644
index 0000000..cf081b7
--- /dev/null
+++ b/services/fuse/tests/fstest.py
@@ -0,0 +1,133 @@
+from multiprocessing import Process
+import os
+import subprocess
+import sys
+import prof
+
+def fn(n):
+    return "file%i" % n
+
+def createfiles(d, n):
+    for j in xrange(1, 5):
+        print "Starting small file %s %i, %i" % (d, n, j)
+        if d:
+            os.mkdir(d)
+            ld = os.listdir('.')
+            if d not in ld:
+                print "ERROR %s missing" % d
+            os.chdir(d)
+
+        for i in xrange(n, n+10):
+            with open(fn(i), "w") as f:
+                f.write(fn(i))
+
+        ld = os.listdir('.')
+        for i in xrange(n, n+10):
+            if fn(i) not in ld:
+                print "ERROR %s missing" % fn(i)
+
+        for i in xrange(n, n+10):
+            with open(fn(i), "r") as f:
+                if f.read() != fn(i):
+                    print "ERROR %s doesn't have expected contents" % fn(i)
+
+        for i in xrange(n, n+10):
+            os.remove(fn(i))
+
+        ld = os.listdir('.')
+        for i in xrange(n, n+10):
+            if fn(i) in ld:
+                print "ERROR %s should have been removed" % fn(i)
+
+        if d:
+            os.chdir('..')
+            os.rmdir(d)
+            ld = os.listdir('.')
+            if d in ld:
+                print "ERROR %s should have been removed" % d
+
+
+def createbigfile(d, n):
+    for j in xrange(1, 5):
+        print "Starting big file %s %i, %i" % (d, n, j)
+        i = n
+        if d:
+            os.mkdir(d)
+            ld = os.listdir('.')
+            if d not in ld:
+                print "ERROR %s missing" % d
+            os.chdir(d)
+
+        with open(fn(i), "w") as f:
+            for j in xrange(0, 1000):
+                f.write((str(j) + fn(i)) * 10000)
+
+        ld = os.listdir('.')
+        if fn(i) not in ld:
+            print "ERROR %s missing" % fn(i)
+
+        with open(fn(i), "r") as f:
+            for j in xrange(0, 1000):
+                expect = (str(j) + fn(i)) * 10000
+                if f.read(len(expect)) != expect:
+                    print "ERROR %s doesn't have expected contents" % fn(i)
+
+        os.remove(fn(i))
+
+        ld = os.listdir('.')
+        if fn(i) in ld:
+            print "ERROR %s should have been removed" % fn(i)
+
+        if d:
+            os.chdir('..')
+            os.rmdir(d)
+            ld = os.listdir('.')
+            if d in ld:
+                print "ERROR %s should have been removed" % d
+
+def do_ls():
+    with open("/dev/null", "w") as nul:
+        for j in xrange(1, 50):
+            subprocess.call(["ls", "-l"], stdout=nul, stderr=nul)
+
+def runit(target, indir):
+    procs = []
+    for n in xrange(0, 20):
+        if indir:
+            p = Process(target=target, args=("dir%i" % n, n*10,))
+        else:
+            p = Process(target=target, args=("", n*10,))
+        p.start()
+        procs.append(p)
+
+    p = Process(target=do_ls, args=())
+    p.start()
+    procs.append(p)
+
+    for p in procs:
+        p.join()
+
+    if os.listdir('.'):
+        print "ERROR there are left over files in the directory"
+
+
+if __name__ == '__main__':
+    if os.listdir('.'):
+        print "ERROR starting directory is not empty"
+        sys.exit()
+
+    print "Single directory small files"
+    with prof.CountTime():
+        runit(createfiles, False)
+
+    print "Separate directories small files"
+    with prof.CountTime():
+        runit(createfiles, True)
+
+    print "Single directory large files"
+    with prof.CountTime():
+        runit(createbigfile, False)
+
+    print "Separate directories large files"
+    with prof.CountTime():
+        runit(createbigfile, True)
diff --git a/services/fuse/tests/prof.py b/services/fuse/tests/prof.py
new file mode 100644
index 0000000..49b9f24
--- /dev/null
+++ b/services/fuse/tests/prof.py
@@ -0,0 +1,17 @@
+import time
+
+class CountTime(object):
+    def __init__(self, tag="", size=None):
+        self.tag = tag
+        self.size = size
+
+    def __enter__(self):
+        self.start = time.time()
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        sec = (time.time() - self.start)
+        th = ""
+        if self.size:
+            th = "throughput %s/sec" % (self.size / sec)
+        print "%s time %s micoseconds %s" % (self.tag, sec*1000000, th)

commit 08284382b53f621c09c4ffc87d82fa0261a69d32
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 17 17:06:57 2015 -0400

    WIP

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 38e794c..fda1b63 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1215,6 +1215,7 @@ class Collection(RichCollectionBase):
                 # We've merged this record this before.  Don't do anything.
                 return
             else:
+                _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
@@ -1245,6 +1246,7 @@ class Collection(RichCollectionBase):
 
     def _remember_api_response(self, response):
         self._api_response = response
+        _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
 
     def _populate_from_api_server(self):
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a2b91a6..eba17b3 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -67,6 +67,14 @@ import ciso8601
 import collections
 import functools
 
+import Queue
+
+# Default _notify_queue has a limit of 1000 items, but it really needs to be
+# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
+# details.
+
+llfuse.capi._notify_queue = Queue.Queue()
+
 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
 from fusefile import StringFile, FuseArvadosFile
 
@@ -188,6 +196,7 @@ class Inodes(object):
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
         self.encoding = encoding
+        self.deferred_invalidations = []
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -220,14 +229,39 @@ class Inodes(object):
         if entry.ref_count == 0:
             _logger.debug("Deleting inode %i", entry.inode)
             self.inode_cache.unmanage(entry)
-            llfuse.invalidate_inode(entry.inode)
-            entry.finalize()
+            _logger.debug("(1) unmanaged inode %i", entry.inode)
+
             del self._entries[entry.inode]
+            _logger.debug("(2) deleted inode %i", entry.inode)
+
+            with llfuse.lock_released:
+                entry.finalize()
+                _logger.debug("(3) finalized inode %i", entry.inode)
+
+            self.invalidate_inode(entry.inode)
+            _logger.debug("(4) invalidated inode %i", entry.inode)
+
             entry.inode = None
         else:
             entry.dead = True
             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
 
+    def invalidate_inode(self, inode):
+        self.deferred_invalidations.append((inode,))
+
+    def invalidate_entry(self, inode, name):
+        self.deferred_invalidations.append((inode, name))
+
+    def do_invalidations(self):
+        di = self.deferred_invalidations
+        self.deferred_invalidations = []
+
+        with llfuse.lock_released:
+            for d in di:
+                if len(d) == 1:
+                    llfuse.invalidate_inode(d[0])
+                elif len(d) == 2:
+                    llfuse.invalidate_entry(d[0], d[1])
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -246,6 +280,13 @@ def catch_exceptions(orig_func):
 
     return catch_exceptions_wrapper
 
+def deferred_invalidate(orig_func):
+    @functools.wraps(orig_func)
+    def deferred_invalidate_wrapper(self, *args, **kwargs):
+        n = orig_func(self, *args, **kwargs)
+        self.inodes.do_invalidations()
+        return n
+    return deferred_invalidate_wrapper
 
 class Operations(llfuse.Operations):
     """This is the main interface with llfuse.
@@ -313,7 +354,12 @@ class Operations(llfuse.Operations):
                     item.invalidate()
                     if ev["object_kind"] == "arvados#collection":
                         new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
-                        record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+
+                        # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
+                        # should always be the same.
+                        #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+                        record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
+
                         item.update(to_record_version=record_version)
                     else:
                         item.update()
@@ -329,6 +375,8 @@ class Operations(llfuse.Operations):
                     itemparent.invalidate()
                     itemparent.update()
 
+                self.inodes.do_invalidations()
+
     @catch_exceptions
     def getattr(self, inode):
         if inode not in self.inodes:
@@ -360,6 +408,8 @@ class Operations(llfuse.Operations):
 
         entry.st_size = e.size()
 
+        _logger.debug("getattr got size")
+
         entry.st_blksize = 512
         entry.st_blocks = (entry.st_size/512)+1
         entry.st_atime = int(e.atime())
@@ -407,6 +457,7 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.ENOENT)
 
     @catch_exceptions
+    @deferred_invalidate
     def forget(self, inodes):
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
@@ -464,6 +515,7 @@ class Operations(llfuse.Operations):
         return handle.obj.writeto(off, buf, self.num_retries)
 
     @catch_exceptions
+    @deferred_invalidate
     def release(self, fh):
         if fh in self._filehandles:
             try:
@@ -480,6 +532,7 @@ class Operations(llfuse.Operations):
         self.release(fh)
 
     @catch_exceptions
+    @deferred_invalidate
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
 
@@ -554,7 +607,10 @@ class Operations(llfuse.Operations):
         return p
 
     @catch_exceptions
+    @deferred_invalidate
     def create(self, inode_parent, name, mode, flags, ctx):
+        _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
+
         p = self._check_writable(inode_parent)
         p.create(name)
 
@@ -568,6 +624,7 @@ class Operations(llfuse.Operations):
         return (fh, self.getattr(f.inode))
 
     @catch_exceptions
+    @deferred_invalidate
     def mkdir(self, inode_parent, name, mode, ctx):
         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
 
@@ -581,18 +638,21 @@ class Operations(llfuse.Operations):
         return self.getattr(d.inode)
 
     @catch_exceptions
+    @deferred_invalidate
     def unlink(self, inode_parent, name):
         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
 
     @catch_exceptions
+    @deferred_invalidate
     def rmdir(self, inode_parent, name):
         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
 
     @catch_exceptions
+    @deferred_invalidate
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
@@ -600,6 +660,7 @@ class Operations(llfuse.Operations):
         dest.rename(name_old, name_new, src)
 
     @catch_exceptions
+    @deferred_invalidate
     def flush(self, fh):
         if fh in self._filehandles:
             self._filehandles[fh].flush()
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index f661e41..50ea0ea 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -144,12 +144,12 @@ class Directory(FreshBase):
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
-            llfuse.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
+            self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
             self.inodes.del_entry(oldentries[i])
             changed = True
 
         if changed:
-            llfuse.invalidate_inode(self.inode)
+            self.inodes.invalidate_inode(self.inode)
             self._mtime = time.time()
 
         self.fresh()
@@ -165,9 +165,9 @@ class Directory(FreshBase):
                     self._entries = oldentries
                     return False
             for n in oldentries:
-                llfuse.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+                self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
                 self.inodes.del_entry(oldentries[n])
-            llfuse.invalidate_inode(self.inode)
+            self.inodes.invalidate_inode(self.inode)
             self.invalidate()
             return True
         else:
@@ -238,18 +238,30 @@ class CollectionDirectoryBase(Directory):
             name = sanitize_filename(name)
             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
             with llfuse.lock:
+                _logger.debug("on_event got llfuse.lock %s %s %s", event, collection, name)
                 if event == arvados.collection.ADD:
                     self.new_entry(name, item, self.mtime())
                 elif event == arvados.collection.DEL:
+                    _logger.debug("on_event (1) %s %s %s", event, collection, name)
                     ent = self._entries[name]
+                    _logger.debug("on_event (2) %s %s %s", event, collection, name)
                     del self._entries[name]
-                    llfuse.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
+
+                    _logger.debug("on_event (3) %s %s %s", event, collection, name)
+
+                    self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
+
+                    _logger.debug("on_event (4) %s %s %s", event, collection, name)
+
                     self.inodes.del_entry(ent)
+
+                    _logger.debug("on_event (5) %s %s %s", event, collection, name)
                 elif event == arvados.collection.MOD:
                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
-                        llfuse.invalidate_inode(item.fuse_entry.inode)
+                        self.inodes.invalidate_inode(item.fuse_entry.inode)
                     elif name in self._entries:
-                        llfuse.invalidate_inode(self._entries[name].inode)
+                        self.inodes.invalidate_inode(self._entries[name].inode)
+                _logger.debug("on_event completed %s %s %s", event, collection, name)
 
     def populate(self, mtime):
         self._mtime = mtime
@@ -387,7 +399,7 @@ class CollectionDirectory(CollectionDirectoryBase):
                     if not self.stale():
                         return
 
-                    _logger.debug("Updating %s", self.collection_locator)
+                    _logger.debug("Updating %s", to_record_version)
                     if self.collection is not None:
                         if self.collection.known_past_version(to_record_version):
                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
@@ -769,7 +781,7 @@ class ProjectDirectory(Directory):
         # Acually move the entry from source directory to this directory.
         del src._entries[name_old]
         self._entries[name_new] = ent
-        llfuse.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+        self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
 
 class SharedDirectory(Directory):
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 4d472cf..e61ba54 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -46,7 +46,9 @@ class FuseArvadosFile(File):
         self.arvfile = arvfile
 
     def size(self):
+        _logger.debug("started calling self.arvfile.size()")
         with llfuse.lock_released:
+            _logger.debug("locked_released and calling self.arvfile.size()")
             return self.arvfile.size()
 
     def readfrom(self, off, size, num_retries=0):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list