[ARVADOS] updated: d1dbdfd430be8a1bdd21fc1f02f8fe5e2d989092
git at public.curoverse.com
git at public.curoverse.com
Thu Jun 18 16:38:00 EDT 2015
Summary of changes:
services/fuse/arvados_fuse/__init__.py | 57 ++++++++++----
services/fuse/arvados_fuse/fusedir.py | 18 ++---
services/fuse/tests/fstest.py | 133 +++++++++++++++++++++++++++++++++
services/fuse/tests/prof.py | 17 +++++
4 files changed, 201 insertions(+), 24 deletions(-)
create mode 100644 services/fuse/tests/fstest.py
create mode 100644 services/fuse/tests/prof.py
via d1dbdfd430be8a1bdd21fc1f02f8fe5e2d989092 (commit)
via 08284382b53f621c09c4ffc87d82fa0261a69d32 (commit)
from 20ade56019456b41c98021c2ed5a848bd8d018bb (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d1dbdfd430be8a1bdd21fc1f02f8fe5e2d989092
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 18 16:32:21 2015 -0400
3198: concurrency test and associated fixes
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index fda1b63..38e794c 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1215,7 +1215,6 @@ class Collection(RichCollectionBase):
# We've merged this record this before. Don't do anything.
return
else:
- _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
@@ -1246,7 +1245,6 @@ class Collection(RichCollectionBase):
def _remember_api_response(self, response):
self._api_response = response
- _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
def _populate_from_api_server(self):
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index eba17b3..cab44dd 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -227,41 +227,22 @@ class Inodes(object):
def del_entry(self, entry):
if entry.ref_count == 0:
- _logger.debug("Deleting inode %i", entry.inode)
self.inode_cache.unmanage(entry)
- _logger.debug("(1) unmanaged inode %i", entry.inode)
-
del self._entries[entry.inode]
- _logger.debug("(2) deleted inode %i", entry.inode)
-
with llfuse.lock_released:
entry.finalize()
- _logger.debug("(3) finalized inode %i", entry.inode)
-
self.invalidate_inode(entry.inode)
- _logger.debug("(4) invalidated inode %i", entry.inode)
-
entry.inode = None
else:
entry.dead = True
_logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
def invalidate_inode(self, inode):
- self.deferred_invalidations.append((inode,))
+ llfuse.invalidate_inode(inode)
def invalidate_entry(self, inode, name):
- self.deferred_invalidations.append((inode, name))
+ llfuse.invalidate_entry(inode, name)
- def do_invalidations(self):
- di = self.deferred_invalidations
- self.deferred_invalidations = []
-
- with llfuse.lock_released:
- for d in di:
- if len(d) == 1:
- llfuse.invalidate_inode(d[0])
- elif len(d) == 2:
- llfuse.invalidate_entry(d[0], d[1])
def catch_exceptions(orig_func):
"""Catch uncaught exceptions and log them consistently."""
@@ -274,19 +255,18 @@ def catch_exceptions(orig_func):
raise
except EnvironmentError as e:
raise llfuse.FUSEError(e.errno)
+ except arvados.errors.KeepWriteError as e:
+ _logger.error("Keep write error: " + str(e))
+ raise llfuse.FUSEError(errno.EIO)
+ except arvados.errors.NotFoundError as e:
+ _logger.error("Block not found error: " + str(e))
+ raise llfuse.FUSEError(errno.EIO)
except:
_logger.exception("Unhandled exception during FUSE operation")
raise llfuse.FUSEError(errno.EIO)
return catch_exceptions_wrapper
-def deferred_invalidate(orig_func):
- @functools.wraps(orig_func)
- def deferred_invalidate_wrapper(self, *args, **kwargs):
- n = orig_func(self, *args, **kwargs)
- self.inodes.do_invalidations()
- return n
- return deferred_invalidate_wrapper
class Operations(llfuse.Operations):
"""This is the main interface with llfuse.
@@ -334,7 +314,10 @@ class Operations(llfuse.Operations):
self.events = None
for k,v in self.inodes.items():
- v.finalize()
+ try:
+ v.finalize()
+ except Exception as e:
+ _logger.exception("Error during finalize of inode %i", k)
self.inodes = None
def access(self, inode, mode, ctx):
@@ -375,7 +358,6 @@ class Operations(llfuse.Operations):
itemparent.invalidate()
itemparent.update()
- self.inodes.do_invalidations()
@catch_exceptions
def getattr(self, inode):
@@ -408,8 +390,6 @@ class Operations(llfuse.Operations):
entry.st_size = e.size()
- _logger.debug("getattr got size")
-
entry.st_blksize = 512
entry.st_blocks = (entry.st_size/512)+1
entry.st_atime = int(e.atime())
@@ -457,7 +437,6 @@ class Operations(llfuse.Operations):
raise llfuse.FUSEError(errno.ENOENT)
@catch_exceptions
- @deferred_invalidate
def forget(self, inodes):
for inode, nlookup in inodes:
ent = self.inodes[inode]
@@ -493,11 +472,7 @@ class Operations(llfuse.Operations):
self.inodes.touch(handle.obj)
- try:
- return handle.obj.readfrom(off, size, self.num_retries)
- except arvados.errors.NotFoundError as e:
- _logger.error("Block not found: " + str(e))
- raise llfuse.FUSEError(errno.EIO)
+ return handle.obj.readfrom(off, size, self.num_retries)
@catch_exceptions
def write(self, fh, off, buf):
@@ -515,24 +490,21 @@ class Operations(llfuse.Operations):
return handle.obj.writeto(off, buf, self.num_retries)
@catch_exceptions
- @deferred_invalidate
def release(self, fh):
if fh in self._filehandles:
try:
self._filehandles[fh].flush()
- except EnvironmentError as e:
- raise llfuse.FUSEError(e.errno)
except Exception:
- _logger.exception("Flush error")
- self._filehandles[fh].release()
- del self._filehandles[fh]
+ raise
+ finally:
+ self._filehandles[fh].release()
+ del self._filehandles[fh]
self.inodes.inode_cache.cap_cache()
def releasedir(self, fh):
self.release(fh)
@catch_exceptions
- @deferred_invalidate
def opendir(self, inode):
_logger.debug("arv-mount opendir: inode %i", inode)
@@ -607,7 +579,6 @@ class Operations(llfuse.Operations):
return p
@catch_exceptions
- @deferred_invalidate
def create(self, inode_parent, name, mode, flags, ctx):
_logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
@@ -624,7 +595,6 @@ class Operations(llfuse.Operations):
return (fh, self.getattr(f.inode))
@catch_exceptions
- @deferred_invalidate
def mkdir(self, inode_parent, name, mode, ctx):
_logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
@@ -638,21 +608,18 @@ class Operations(llfuse.Operations):
return self.getattr(d.inode)
@catch_exceptions
- @deferred_invalidate
def unlink(self, inode_parent, name):
_logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.unlink(name)
@catch_exceptions
- @deferred_invalidate
def rmdir(self, inode_parent, name):
_logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.rmdir(name)
@catch_exceptions
- @deferred_invalidate
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
_logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
src = self._check_writable(inode_parent_old)
@@ -660,7 +627,6 @@ class Operations(llfuse.Operations):
dest.rename(name_old, name_new, src)
@catch_exceptions
- @deferred_invalidate
def flush(self, fh):
if fh in self._filehandles:
self._filehandles[fh].flush()
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 50ea0ea..16b3bb2 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -238,30 +238,18 @@ class CollectionDirectoryBase(Directory):
name = sanitize_filename(name)
_logger.debug("collection notify %s %s %s %s", event, collection, name, item)
with llfuse.lock:
- _logger.debug("on_event got llfuse.lock %s %s %s", event, collection, name)
if event == arvados.collection.ADD:
self.new_entry(name, item, self.mtime())
elif event == arvados.collection.DEL:
- _logger.debug("on_event (1) %s %s %s", event, collection, name)
ent = self._entries[name]
- _logger.debug("on_event (2) %s %s %s", event, collection, name)
del self._entries[name]
-
- _logger.debug("on_event (3) %s %s %s", event, collection, name)
-
self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
-
- _logger.debug("on_event (4) %s %s %s", event, collection, name)
-
self.inodes.del_entry(ent)
-
- _logger.debug("on_event (5) %s %s %s", event, collection, name)
elif event == arvados.collection.MOD:
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
self.inodes.invalidate_inode(item.fuse_entry.inode)
elif name in self._entries:
self.inodes.invalidate_inode(self._entries[name].inode)
- _logger.debug("on_event completed %s %s %s", event, collection, name)
def populate(self, mtime):
self._mtime = mtime
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index e61ba54..4d472cf 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -46,9 +46,7 @@ class FuseArvadosFile(File):
self.arvfile = arvfile
def size(self):
- _logger.debug("started calling self.arvfile.size()")
with llfuse.lock_released:
- _logger.debug("locked_released and calling self.arvfile.size()")
return self.arvfile.size()
def readfrom(self, off, size, num_retries=0):
diff --git a/services/fuse/tests/fstest.py b/services/fuse/tests/fstest.py
new file mode 100644
index 0000000..cf081b7
--- /dev/null
+++ b/services/fuse/tests/fstest.py
@@ -0,0 +1,133 @@
+from multiprocessing import Process
+import os
+import subprocess
+import sys
+import prof
+
+def fn(n):
+ return "file%i" % n
+
+def createfiles(d, n):
+ for j in xrange(1, 5):
+ print "Starting small file %s %i, %i" % (d, n, j)
+ if d:
+ os.mkdir(d)
+ ld = os.listdir('.')
+ if d not in ld:
+ print "ERROR %s missing" % d
+ os.chdir(d)
+
+ for i in xrange(n, n+10):
+ with open(fn(i), "w") as f:
+ f.write(fn(i))
+
+ ld = os.listdir('.')
+ for i in xrange(n, n+10):
+ if fn(i) not in ld:
+ print "ERROR %s missing" % fn(i)
+
+ for i in xrange(n, n+10):
+ with open(fn(i), "r") as f:
+ if f.read() != fn(i):
+ print "ERROR %s doesn't have expected contents" % fn(i)
+
+ for i in xrange(n, n+10):
+ os.remove(fn(i))
+
+ ld = os.listdir('.')
+ for i in xrange(n, n+10):
+ if fn(i) in ld:
+ print "ERROR %s should have been removed" % fn(i)
+
+ if d:
+ os.chdir('..')
+ os.rmdir(d)
+ ld = os.listdir('.')
+ if d in ld:
+ print "ERROR %s should have been removed" % d
+
+
+def createbigfile(d, n):
+ for j in xrange(1, 5):
+ print "Starting big file %s %i, %i" % (d, n, j)
+ i = n
+ if d:
+ os.mkdir(d)
+ ld = os.listdir('.')
+ if d not in ld:
+ print "ERROR %s missing" % d
+ os.chdir(d)
+
+ with open(fn(i), "w") as f:
+ for j in xrange(0, 1000):
+ f.write((str(j) + fn(i)) * 10000)
+
+ ld = os.listdir('.')
+ if fn(i) not in ld:
+ print "ERROR %s missing" % fn(i)
+
+ with open(fn(i), "r") as f:
+ for j in xrange(0, 1000):
+ expect = (str(j) + fn(i)) * 10000
+ if f.read(len(expect)) != expect:
+ print "ERROR %s doesn't have expected contents" % fn(i)
+
+ os.remove(fn(i))
+
+ ld = os.listdir('.')
+ if fn(i) in ld:
+ print "ERROR %s should have been removed" % fn(i)
+
+ if d:
+ os.chdir('..')
+ os.rmdir(d)
+ ld = os.listdir('.')
+ if d in ld:
+ print "ERROR %s should have been removed" % d
+
+def do_ls():
+ with open("/dev/null", "w") as nul:
+ for j in xrange(1, 50):
+ subprocess.call(["ls", "-l"], stdout=nul, stderr=nul)
+
+def runit(target, indir):
+ procs = []
+ for n in xrange(0, 20):
+ if indir:
+ p = Process(target=target, args=("dir%i" % n, n*10,))
+ else:
+ p = Process(target=target, args=("", n*10,))
+ p.start()
+ procs.append(p)
+
+ p = Process(target=do_ls, args=())
+ p.start()
+ procs.append(p)
+
+ for p in procs:
+ p.join()
+
+ if os.listdir('.'):
+ print "ERROR there are left over files in the directory"
+
+
+if __name__ == '__main__':
+ if os.listdir('.'):
+ print "ERROR starting directory is not empty"
+ sys.exit()
+
+ print "Single directory small files"
+ with prof.CountTime():
+ runit(createfiles, False)
+
+ print "Separate directories small files"
+ with prof.CountTime():
+ runit(createfiles, True)
+
+ print "Single directory large files"
+ with prof.CountTime():
+ runit(createbigfile, False)
+
+ print "Separate directories large files"
+ with prof.CountTime():
+ runit(createbigfile, True)
diff --git a/services/fuse/tests/prof.py b/services/fuse/tests/prof.py
new file mode 100644
index 0000000..49b9f24
--- /dev/null
+++ b/services/fuse/tests/prof.py
@@ -0,0 +1,17 @@
+import time
+
+class CountTime(object):
+ def __init__(self, tag="", size=None):
+ self.tag = tag
+ self.size = size
+
+ def __enter__(self):
+ self.start = time.time()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ sec = (time.time() - self.start)
+ th = ""
+ if self.size:
+ th = "throughput %s/sec" % (self.size / sec)
+ print "%s time %s micoseconds %s" % (self.tag, sec*1000000, th)
commit 08284382b53f621c09c4ffc87d82fa0261a69d32
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 17 17:06:57 2015 -0400
WIP
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 38e794c..fda1b63 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1215,6 +1215,7 @@ class Collection(RichCollectionBase):
# We've merged this record this before. Don't do anything.
return
else:
+ _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
@@ -1245,6 +1246,7 @@ class Collection(RichCollectionBase):
def _remember_api_response(self, response):
self._api_response = response
+ _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
def _populate_from_api_server(self):
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a2b91a6..eba17b3 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -67,6 +67,14 @@ import ciso8601
import collections
import functools
+import Queue
+
+# Default _notify_queue has a limit of 1000 items, but it really needs to be
+# unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
+# details.
+
+llfuse.capi._notify_queue = Queue.Queue()
+
from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
from fusefile import StringFile, FuseArvadosFile
@@ -188,6 +196,7 @@ class Inodes(object):
self._counter = itertools.count(llfuse.ROOT_INODE)
self.inode_cache = inode_cache
self.encoding = encoding
+ self.deferred_invalidations = []
def __getitem__(self, item):
return self._entries[item]
@@ -220,14 +229,39 @@ class Inodes(object):
if entry.ref_count == 0:
_logger.debug("Deleting inode %i", entry.inode)
self.inode_cache.unmanage(entry)
- llfuse.invalidate_inode(entry.inode)
- entry.finalize()
+ _logger.debug("(1) unmanaged inode %i", entry.inode)
+
del self._entries[entry.inode]
+ _logger.debug("(2) deleted inode %i", entry.inode)
+
+ with llfuse.lock_released:
+ entry.finalize()
+ _logger.debug("(3) finalized inode %i", entry.inode)
+
+ self.invalidate_inode(entry.inode)
+ _logger.debug("(4) invalidated inode %i", entry.inode)
+
entry.inode = None
else:
entry.dead = True
_logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+ def invalidate_inode(self, inode):
+ self.deferred_invalidations.append((inode,))
+
+ def invalidate_entry(self, inode, name):
+ self.deferred_invalidations.append((inode, name))
+
+ def do_invalidations(self):
+ di = self.deferred_invalidations
+ self.deferred_invalidations = []
+
+ with llfuse.lock_released:
+ for d in di:
+ if len(d) == 1:
+ llfuse.invalidate_inode(d[0])
+ elif len(d) == 2:
+ llfuse.invalidate_entry(d[0], d[1])
def catch_exceptions(orig_func):
"""Catch uncaught exceptions and log them consistently."""
@@ -246,6 +280,13 @@ def catch_exceptions(orig_func):
return catch_exceptions_wrapper
+def deferred_invalidate(orig_func):
+ @functools.wraps(orig_func)
+ def deferred_invalidate_wrapper(self, *args, **kwargs):
+ n = orig_func(self, *args, **kwargs)
+ self.inodes.do_invalidations()
+ return n
+ return deferred_invalidate_wrapper
class Operations(llfuse.Operations):
"""This is the main interface with llfuse.
@@ -313,7 +354,12 @@ class Operations(llfuse.Operations):
item.invalidate()
if ev["object_kind"] == "arvados#collection":
new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
- record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+
+ # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
+ # should always be the same.
+ #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+ record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
+
item.update(to_record_version=record_version)
else:
item.update()
@@ -329,6 +375,8 @@ class Operations(llfuse.Operations):
itemparent.invalidate()
itemparent.update()
+ self.inodes.do_invalidations()
+
@catch_exceptions
def getattr(self, inode):
if inode not in self.inodes:
@@ -360,6 +408,8 @@ class Operations(llfuse.Operations):
entry.st_size = e.size()
+ _logger.debug("getattr got size")
+
entry.st_blksize = 512
entry.st_blocks = (entry.st_size/512)+1
entry.st_atime = int(e.atime())
@@ -407,6 +457,7 @@ class Operations(llfuse.Operations):
raise llfuse.FUSEError(errno.ENOENT)
@catch_exceptions
+ @deferred_invalidate
def forget(self, inodes):
for inode, nlookup in inodes:
ent = self.inodes[inode]
@@ -464,6 +515,7 @@ class Operations(llfuse.Operations):
return handle.obj.writeto(off, buf, self.num_retries)
@catch_exceptions
+ @deferred_invalidate
def release(self, fh):
if fh in self._filehandles:
try:
@@ -480,6 +532,7 @@ class Operations(llfuse.Operations):
self.release(fh)
@catch_exceptions
+ @deferred_invalidate
def opendir(self, inode):
_logger.debug("arv-mount opendir: inode %i", inode)
@@ -554,7 +607,10 @@ class Operations(llfuse.Operations):
return p
@catch_exceptions
+ @deferred_invalidate
def create(self, inode_parent, name, mode, flags, ctx):
+ _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
+
p = self._check_writable(inode_parent)
p.create(name)
@@ -568,6 +624,7 @@ class Operations(llfuse.Operations):
return (fh, self.getattr(f.inode))
@catch_exceptions
+ @deferred_invalidate
def mkdir(self, inode_parent, name, mode, ctx):
_logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
@@ -581,18 +638,21 @@ class Operations(llfuse.Operations):
return self.getattr(d.inode)
@catch_exceptions
+ @deferred_invalidate
def unlink(self, inode_parent, name):
_logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.unlink(name)
@catch_exceptions
+ @deferred_invalidate
def rmdir(self, inode_parent, name):
_logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.rmdir(name)
@catch_exceptions
+ @deferred_invalidate
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
_logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
src = self._check_writable(inode_parent_old)
@@ -600,6 +660,7 @@ class Operations(llfuse.Operations):
dest.rename(name_old, name_new, src)
@catch_exceptions
+ @deferred_invalidate
def flush(self, fh):
if fh in self._filehandles:
self._filehandles[fh].flush()
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index f661e41..50ea0ea 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -144,12 +144,12 @@ class Directory(FreshBase):
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
_logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
- llfuse.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
+ self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
self.inodes.del_entry(oldentries[i])
changed = True
if changed:
- llfuse.invalidate_inode(self.inode)
+ self.inodes.invalidate_inode(self.inode)
self._mtime = time.time()
self.fresh()
@@ -165,9 +165,9 @@ class Directory(FreshBase):
self._entries = oldentries
return False
for n in oldentries:
- llfuse.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+ self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
self.inodes.del_entry(oldentries[n])
- llfuse.invalidate_inode(self.inode)
+ self.inodes.invalidate_inode(self.inode)
self.invalidate()
return True
else:
@@ -238,18 +238,30 @@ class CollectionDirectoryBase(Directory):
name = sanitize_filename(name)
_logger.debug("collection notify %s %s %s %s", event, collection, name, item)
with llfuse.lock:
+ _logger.debug("on_event got llfuse.lock %s %s %s", event, collection, name)
if event == arvados.collection.ADD:
self.new_entry(name, item, self.mtime())
elif event == arvados.collection.DEL:
+ _logger.debug("on_event (1) %s %s %s", event, collection, name)
ent = self._entries[name]
+ _logger.debug("on_event (2) %s %s %s", event, collection, name)
del self._entries[name]
- llfuse.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
+
+ _logger.debug("on_event (3) %s %s %s", event, collection, name)
+
+ self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
+
+ _logger.debug("on_event (4) %s %s %s", event, collection, name)
+
self.inodes.del_entry(ent)
+
+ _logger.debug("on_event (5) %s %s %s", event, collection, name)
elif event == arvados.collection.MOD:
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
- llfuse.invalidate_inode(item.fuse_entry.inode)
+ self.inodes.invalidate_inode(item.fuse_entry.inode)
elif name in self._entries:
- llfuse.invalidate_inode(self._entries[name].inode)
+ self.inodes.invalidate_inode(self._entries[name].inode)
+ _logger.debug("on_event completed %s %s %s", event, collection, name)
def populate(self, mtime):
self._mtime = mtime
@@ -387,7 +399,7 @@ class CollectionDirectory(CollectionDirectoryBase):
if not self.stale():
return
- _logger.debug("Updating %s", self.collection_locator)
+ _logger.debug("Updating %s", to_record_version)
if self.collection is not None:
if self.collection.known_past_version(to_record_version):
_logger.debug("%s already processed %s", self.collection_locator, to_record_version)
@@ -769,7 +781,7 @@ class ProjectDirectory(Directory):
# Acually move the entry from source directory to this directory.
del src._entries[name_old]
self._entries[name_new] = ent
- llfuse.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+ self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
class SharedDirectory(Directory):
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 4d472cf..e61ba54 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -46,7 +46,9 @@ class FuseArvadosFile(File):
self.arvfile = arvfile
def size(self):
+ _logger.debug("started calling self.arvfile.size()")
with llfuse.lock_released:
+ _logger.debug("locked_released and calling self.arvfile.size()")
return self.arvfile.size()
def readfrom(self, off, size, num_retries=0):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list