[ARVADOS] created: 416a7c3a1b96abf7982362682048481f2afda0c9
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 14 15:10:52 EDT 2015
at 416a7c3a1b96abf7982362682048481f2afda0c9 (commit)
commit 416a7c3a1b96abf7982362682048481f2afda0c9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Apr 14 15:10:32 2015 -0400
3198: Manage inode cache based on (approximate) object size instead of object
count. It's only a soft limit on memory usage but still a big improvement.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 3129bdf..ce342b5 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -703,7 +703,7 @@ class ArvadosFile(object):
# segment is past the trucate size, all done
break
elif size < range_end:
- nr = Range(r.locator, r.range_start, size - r.range_start)
+ nr = Range(r.locator, r.range_start, size - r.range_start, 0)
nr.segment_offset = r.segment_offset
new_segs.append(nr)
break
@@ -815,7 +815,7 @@ class ArvadosFile(object):
"""Internal implementation of add_segment."""
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
- last = self._segments[-1] if self._segments else Range(0, 0, 0)
+ last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
self._segments.append(r)
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 3d48652..f03deed 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1381,7 +1381,7 @@ class Collection(RichCollectionBase):
block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
if block_locator:
blocksize = long(block_locator.group(1))
- blocks.append(Range(tok, streamoffset, blocksize))
+ blocks.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
else:
state = SEGMENTS
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 3a42aa0..afc202e 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -35,7 +35,7 @@ class StreamReader(object):
s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
if s:
blocksize = long(s.group(1))
- self._data_locators.append(Range(tok, streamoffset, blocksize))
+ self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
streamoffset += blocksize
continue
@@ -45,7 +45,7 @@ class StreamReader(object):
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
if name not in self._files:
- self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name)
+ self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
else:
filereader = self._files[name]
filereader.segments.append(Range(pos, filereader.size(), size))
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 5bb21c6..83b4710 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -61,40 +61,52 @@ class InodeCache(object):
self._entries = collections.OrderedDict()
self._counter = itertools.count(1)
self.cap = cap
+ self._total = 0
+
+ def _remove(self, obj, clear):
+ if clear and not obj.clear():
+ _logger.warn("Could not clear %s in_use %s", obj, obj.in_use())
+ return False
+ self._total -= obj._cache_size
+ del self._entries[obj._cache_priority]
+ _logger.warn("Cleared %s total now %i", obj, self._total)
+ return True
def cap_cache(self):
- if len(self._entries) > self.cap:
- ent = iter(self._entries)
- ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
- for key in ents:
- capobj = self._entries[key]
- if capobj.clear():
- _logger.debug("Cleared %s", self._entries[key])
- del self._entries[key]
+ _logger.warn("total is %i cap is %i", self._total, self.cap)
+ if self._total > self.cap:
+ need_gc = False
+ for key in list(self._entries.keys()):
+ if self._total < self.cap or len(self._entries) < 4:
+ break
+ self._remove(self._entries[key], True)
+
def manage(self, obj):
- obj._cache_priority = next(self._counter)
- self._entries[obj._cache_priority] = obj
- _logger.debug("Managing %s", obj)
- self.cap_cache()
+ if obj.persisted():
+ obj._cache_priority = next(self._counter)
+ obj._cache_size = obj.objsize()
+ self._entries[obj._cache_priority] = obj
+ self._total += obj.objsize()
+ _logger.warn("Managing %s total now %i", obj, self._total)
+ self.cap_cache()
def touch(self, obj):
- if obj._cache_priority in self._entries:
- del self._entries[obj._cache_priority]
- self.manage(obj)
+ if obj.persisted():
+ if obj._cache_priority in self._entries:
+ self._remove(obj, False)
+ self.manage(obj)
+ _logger.warn("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
def unmanage(self, obj):
- if obj._cache_priority in self._entries:
- if obj.clear():
- _logger.debug("Cleared %s", obj)
- del self._entries[obj._cache_priority]
-
+ if obj.persisted() and obj._cache_priority in self._entries:
+ self._remove(obj, True)
class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
- def __init__(self, inode_cache=1000):
+ def __init__(self, inode_cache=256*1024*1024):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
self._obj_cache = InodeCache(cap=inode_cache)
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 8bb810b..9da3a5c 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -1,6 +1,7 @@
import time
import ciso8601
import calendar
+import functools
def convertTime(t):
"""Parse Arvados timestamp to unix time."""
@@ -11,6 +12,16 @@ def convertTime(t):
except (TypeError, ValueError):
return 0
+def use_counter(orig_func):
+ @functools.wraps(orig_func)
+ def use_counter_wrapper(self, *args, **kwargs):
+ try:
+ self.inc_use()
+ return orig_func(self, *args, **kwargs)
+ finally:
+ self.dec_use()
+ return use_counter_wrapper
+
class FreshBase(object):
"""Base class for maintaining fresh/stale state to determine when to update."""
def __init__(self):
@@ -19,6 +30,7 @@ class FreshBase(object):
self._last_update = time.time()
self._atime = time.time()
self._poll_time = 60
+ self.use_count = 0
# Mark the value as stale
def invalidate(self):
@@ -38,3 +50,21 @@ class FreshBase(object):
def atime(self):
return self._atime
+
+ def persisted(self):
+ return False
+
+ def clear(self, force=False):
+ pass
+
+ def in_use(self):
+ return self.use_count > 0
+
+ def inc_use(self):
+ self.use_count += 1
+
+ def dec_use(self):
+ self.use_count -= 1
+
+ def objsize(self):
+ return 0
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index cfa81b3..08a5168 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -7,7 +7,7 @@ import apiclient
import functools
from fusefile import StringFile, StreamReaderFile, ObjectFile
-from fresh import FreshBase, convertTime
+from fresh import FreshBase, convertTime, use_counter
from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
@@ -32,16 +32,6 @@ def sanitize_filename(dirty):
else:
return _disallowed_filename_characters.sub('_', dirty)
-def use_counter(orig_func):
- @functools.wraps(orig_func)
- def use_counter_wrapper(self, *args, **kwargs):
- try:
- self.inc_use()
- return orig_func(self, *args, **kwargs)
- finally:
- self.dec_use()
- return use_counter_wrapper
-
class Directory(FreshBase):
"""Generic directory object, backed by a dict.
@@ -61,7 +51,6 @@ class Directory(FreshBase):
self.inodes = inodes
self._entries = {}
self._mtime = time.time()
- self.use_count = 0
# Overriden by subclasses to implement logic to update the entries dict
# when the directory is stale
@@ -74,14 +63,8 @@ class Directory(FreshBase):
def size(self):
return 0
- def in_use(self):
- return self.use_count > 0
-
- def inc_use(self):
- self.use_count += 1
-
- def dec_use(self):
- self.use_count -= 1
+ def persisted(self):
+ return False
def checkupdate(self):
if self.stale():
@@ -165,14 +148,12 @@ class Directory(FreshBase):
oldentries = self._entries
self._entries = {}
for n in oldentries:
- if isinstance(n, Directory):
- if not n.clear(force):
- self._entries = oldentries
- return False
+ if not oldentries[n].clear(force):
+ self._entries = oldentries
+ return False
for n in oldentries:
- if isinstance(n, Directory):
- llfuse.invalidate_entry(self.inode, str(n))
- self.inodes.del_entry(oldentries[n])
+ llfuse.invalidate_entry(self.inode, str(n))
+ self.inodes.del_entry(oldentries[n])
llfuse.invalidate_inode(self.inode)
self.invalidate()
return True
@@ -198,6 +179,7 @@ class CollectionDirectory(Directory):
else:
self.collection_locator = collection
self._mtime = 0
+ self._manifest_size = 0
def same(self, i):
return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
@@ -214,6 +196,8 @@ class CollectionDirectory(Directory):
self.update()
def new_collection(self, new_collection_object, coll_reader):
+ self.clear(force=True)
+
self.collection_object = new_collection_object
self._mtime = convertTime(self.collection_object.get('modified_at'))
@@ -221,7 +205,6 @@ class CollectionDirectory(Directory):
if self.collection_object_file is not None:
self.collection_object_file.update(self.collection_object)
- self.clear(force=True)
for s in coll_reader.all_streams():
cwd = self
for part in s.name().split('/'):
@@ -229,9 +212,6 @@ class CollectionDirectory(Directory):
partname = sanitize_filename(part)
if partname not in cwd._entries:
cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
- # (hack until using new API)
- cwd._entries[partname].inc_use()
- # end hack
cwd = cwd._entries[partname]
for k, v in s.files().items():
cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
@@ -264,6 +244,9 @@ class CollectionDirectory(Directory):
if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
self.new_collection(new_collection_object, coll_reader)
+ self._manifest_size = len(coll_reader.manifest_text())
+ _logger.debug("%s manifest_size %i", self, self._manifest_size)
+
self.fresh()
return True
except arvados.errors.NotFoundError:
@@ -295,15 +278,15 @@ class CollectionDirectory(Directory):
return super(CollectionDirectory, self).__contains__(k)
def invalidate(self):
- super(CollectionDirectory, self).invalidate()
self.collection_object = None
+ self.collection_object_file = None
+ super(CollectionDirectory, self).invalidate()
- def clear(self, force=False):
- if self.collection_locator is None:
- return False
- else:
- return super(CollectionDirectory, self).clear(force)
+ def persisted(self):
+ return (self.collection_locator is not None)
+ def objsize(self):
+ return self._manifest_size * 128
class MagicDirectory(Directory):
"""A special directory that logically contains the set of all extant keep locators.
@@ -525,6 +508,11 @@ class ProjectDirectory(Directory):
else:
return super(ProjectDirectory, self).__contains__(k)
+ def persisted(self):
+ return False
+
+ def objsize(self):
+ return len(self.project_object) * 1024 if self.project_object else 0
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index e122d9d..efe31c3 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -24,14 +24,9 @@ class File(FreshBase):
def mtime(self):
return self._mtime
- def clear(self):
- pass
+ def clear(self, force=False):
+ return True
- def inc_use(self):
- pass
-
- def dec_use(self):
- pass
class StreamReaderFile(File):
"""Wraps a StreamFileReader as a file."""
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 9c0ef10..3c96a56 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -46,7 +46,7 @@ with "--".
parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
parser.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
- parser.add_argument('--inode-cache', type=int, help="Inode cache size", default=1024)
+ parser.add_argument('--inode-cache', type=int, help="Inode cache size (default 128MiB)", default=128*1024*1024)
parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
dest="exec_args", metavar=('command', 'args', '...', '--'),
commit 72cb9197fe4d7a1cc8b822e724d3fc03d77541dd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Apr 13 17:08:42 2015 -0400
3198: Fixed arv-mount for refactoring.
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index deb0cd3..5bb21c6 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -24,7 +24,7 @@ import ciso8601
import collections
from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
-from fusefile import StreamReaderFile
+from fusefile import StreamReaderFile, StringFile
_logger = logging.getLogger('arvados.arvados_fuse')
@@ -56,7 +56,7 @@ class DirectoryHandle(object):
self.dirobj.dec_use()
-class ObjectCache(object):
+class InodeCache(object):
def __init__(self, cap):
self._entries = collections.OrderedDict()
self._counter = itertools.count(1)
@@ -69,11 +69,13 @@ class ObjectCache(object):
for key in ents:
capobj = self._entries[key]
if capobj.clear():
+ _logger.debug("Cleared %s", self._entries[key])
del self._entries[key]
def manage(self, obj):
obj._cache_priority = next(self._counter)
self._entries[obj._cache_priority] = obj
+ _logger.debug("Managing %s", obj)
self.cap_cache()
def touch(self, obj):
@@ -84,6 +86,7 @@ class ObjectCache(object):
def unmanage(self, obj):
if obj._cache_priority in self._entries:
if obj.clear():
+ _logger.debug("Cleared %s", obj)
del self._entries[obj._cache_priority]
@@ -91,10 +94,10 @@ class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
- def __init__(self, cache_cap=1000):
+ def __init__(self, inode_cache=1000):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
- self._obj_cache = ObjectCache(cap=cache_cap)
+ self._obj_cache = InodeCache(cap=inode_cache)
def __getitem__(self, item):
return self._entries[item]
@@ -112,6 +115,7 @@ class Inodes(object):
return k in self._entries
def touch(self, entry):
+ entry._atime = time.time()
self._obj_cache.touch(entry)
def cap_cache(self):
@@ -141,10 +145,10 @@ class Operations(llfuse.Operations):
"""
- def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000):
+ def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000):
super(Operations, self).__init__()
- self.inodes = Inodes(cache_cap)
+ self.inodes = Inodes(inode_cache)
self.uid = uid
self.gid = gid
self.encoding = encoding
@@ -236,6 +240,7 @@ class Operations(llfuse.Operations):
fh = self._filehandles_counter
self._filehandles_counter += 1
self._filehandles[fh] = FileHandle(fh, p)
+ self.inodes.touch(p)
return fh
def read(self, fh, off, size):
@@ -245,8 +250,7 @@ class Operations(llfuse.Operations):
else:
raise llfuse.FUSEError(errno.EBADF)
- # update atime
- handle.fileobj._atime = time.time()
+ self.inodes.touch(handle.fileobj)
try:
with llfuse.lock_released:
@@ -286,14 +290,10 @@ class Operations(llfuse.Operations):
raise llfuse.FUSEError(errno.EIO)
# update atime
- p._atime = time.time()
+ self.inodes.touch(p)
- try:
- p.inc_use()
- self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
- return fh
- finally:
- p.dec_use()
+ self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+ return fh
def readdir(self, fh, off):
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index b540efe..9c0ef10 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -45,6 +45,9 @@ with "--".
parser.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
parser.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
+
+ parser.add_argument('--inode-cache', type=int, help="Inode cache size", default=1024)
+
parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
dest="exec_args", metavar=('command', 'args', '...', '--'),
help="""Mount, run a command, then unmount and exit""")
@@ -81,7 +84,7 @@ with "--".
try:
# Create the request handler
- operations = Operations(os.getuid(), os.getgid(), args.encoding)
+ operations = Operations(os.getuid(), os.getgid(), args.encoding, args.inode_cache)
api = ThreadSafeApiCache(arvados.config.settings())
usr = api.users().current().execute(num_retries=args.retries)
@@ -112,7 +115,7 @@ with "--".
if dir_class is not None:
operations.inodes.add_entry(dir_class(*dir_args))
else:
- e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
+ e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE, operations.inodes))
dir_args[0] = e.inode
e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 4802d69..ecc0888 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -25,7 +25,7 @@ class MountTestBase(unittest.TestCase):
self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
def make_mount(self, root_class, **root_kwargs):
- operations = fuse.Operations(os.getuid(), os.getgid(), cache_cap=2)
+ operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
operations.inodes.add_entry(root_class(
llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
llfuse.init(operations, self.mounttmp, [])
commit d8d82841f43394b3781804844d4860bc8205d5fc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Apr 13 16:42:46 2015 -0400
3198: Implement cache management for directory objects.
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a886e37..deb0cd3 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -21,6 +21,7 @@ import calendar
import threading
import itertools
import ciso8601
+import collections
from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
from fusefile import StreamReaderFile
@@ -29,21 +30,71 @@ _logger = logging.getLogger('arvados.arvados_fuse')
class FileHandle(object):
- """Connects a numeric file handle to a File or Directory object that has
+ """Connects a numeric file handle to a File object that has
been opened by the client."""
- def __init__(self, fh, entry):
+ def __init__(self, fh, fileobj):
self.fh = fh
- self.entry = entry
+ self.fileobj = fileobj
+ self.fileobj.inc_use()
+
+ def release(self):
+ self.fileobj.dec_use()
+
+
+class DirectoryHandle(object):
+ """Connects a numeric file handle to a Directory object that has
+ been opened by the client."""
+
+ def __init__(self, fh, dirobj, entries):
+ self.fh = fh
+ self.entries = entries
+ self.dirobj = dirobj
+ self.dirobj.inc_use()
+
+ def release(self):
+ self.dirobj.dec_use()
+
+
+class ObjectCache(object):
+ def __init__(self, cap):
+ self._entries = collections.OrderedDict()
+ self._counter = itertools.count(1)
+ self.cap = cap
+
+ def cap_cache(self):
+ if len(self._entries) > self.cap:
+ ent = iter(self._entries)
+ ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
+ for key in ents:
+ capobj = self._entries[key]
+ if capobj.clear():
+ del self._entries[key]
+
+ def manage(self, obj):
+ obj._cache_priority = next(self._counter)
+ self._entries[obj._cache_priority] = obj
+ self.cap_cache()
+
+ def touch(self, obj):
+ if obj._cache_priority in self._entries:
+ del self._entries[obj._cache_priority]
+ self.manage(obj)
+
+ def unmanage(self, obj):
+ if obj._cache_priority in self._entries:
+ if obj.clear():
+ del self._entries[obj._cache_priority]
class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
- def __init__(self):
+ def __init__(self, cache_cap=1000):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
+ self._obj_cache = ObjectCache(cap=cache_cap)
def __getitem__(self, item):
return self._entries[item]
@@ -60,12 +111,20 @@ class Inodes(object):
def __contains__(self, k):
return k in self._entries
+ def touch(self, entry):
+ self._obj_cache.touch(entry)
+
+ def cap_cache(self):
+ self._obj_cache.cap_cache()
+
def add_entry(self, entry):
entry.inode = next(self._counter)
self._entries[entry.inode] = entry
+ self._obj_cache.manage(entry)
return entry
def del_entry(self, entry):
+ self._obj_cache.unmanage(entry)
llfuse.invalidate_inode(entry.inode)
del self._entries[entry.inode]
@@ -82,10 +141,10 @@ class Operations(llfuse.Operations):
"""
- def __init__(self, uid, gid, encoding="utf-8"):
+ def __init__(self, uid, gid, encoding="utf-8", cache_cap=1000):
super(Operations, self).__init__()
- self.inodes = Inodes()
+ self.inodes = Inodes(cache_cap)
self.uid = uid
self.gid = gid
self.encoding = encoding
@@ -187,11 +246,11 @@ class Operations(llfuse.Operations):
raise llfuse.FUSEError(errno.EBADF)
# update atime
- handle.entry._atime = time.time()
+ handle.fileobj._atime = time.time()
try:
with llfuse.lock_released:
- return handle.entry.readfrom(off, size)
+ return handle.fileobj.readfrom(off, size)
except arvados.errors.NotFoundError as e:
_logger.warning("Block not found: " + str(e))
raise llfuse.FUSEError(errno.EIO)
@@ -201,7 +260,12 @@ class Operations(llfuse.Operations):
def release(self, fh):
if fh in self._filehandles:
+ self._filehandles[fh].release()
del self._filehandles[fh]
+ self.inodes.cap_cache()
+
+ def releasedir(self, fh):
+ self.release(fh)
def opendir(self, inode):
_logger.debug("arv-mount opendir: inode %i", inode)
@@ -224,8 +288,13 @@ class Operations(llfuse.Operations):
# update atime
p._atime = time.time()
- self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
- return fh
+ try:
+ p.inc_use()
+ self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+ return fh
+ finally:
+ p.dec_use()
+
def readdir(self, fh, off):
_logger.debug("arv-mount readdir: fh %i off %i", fh, off)
@@ -235,20 +304,17 @@ class Operations(llfuse.Operations):
else:
raise llfuse.FUSEError(errno.EBADF)
- _logger.debug("arv-mount handle.entry %s", handle.entry)
+ _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
e = off
- while e < len(handle.entry):
- if handle.entry[e][1].inode in self.inodes:
+ while e < len(handle.entries):
+ if handle.entries[e][1].inode in self.inodes:
try:
- yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
+ yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
except UnicodeEncodeError:
pass
e += 1
- def releasedir(self, fh):
- del self._filehandles[fh]
-
def statfs(self):
st = llfuse.StatvfsData()
st.f_bsize = 64 * 1024
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 1144622..cfa81b3 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -4,6 +4,7 @@ import time
import llfuse
import arvados
import apiclient
+import functools
from fusefile import StringFile, StreamReaderFile, ObjectFile
from fresh import FreshBase, convertTime
@@ -31,6 +32,16 @@ def sanitize_filename(dirty):
else:
return _disallowed_filename_characters.sub('_', dirty)
+def use_counter(orig_func):
+ @functools.wraps(orig_func)
+ def use_counter_wrapper(self, *args, **kwargs):
+ try:
+ self.inc_use()
+ return orig_func(self, *args, **kwargs)
+ finally:
+ self.dec_use()
+ return use_counter_wrapper
+
class Directory(FreshBase):
"""Generic directory object, backed by a dict.
@@ -39,7 +50,7 @@ class Directory(FreshBase):
and the value referencing a File or Directory object.
"""
- def __init__(self, parent_inode):
+ def __init__(self, parent_inode, inodes):
super(Directory, self).__init__()
"""parent_inode is the integer inode number"""
@@ -47,11 +58,14 @@ class Directory(FreshBase):
if not isinstance(parent_inode, int):
raise Exception("parent_inode should be an int")
self.parent_inode = parent_inode
+ self.inodes = inodes
self._entries = {}
self._mtime = time.time()
+ self.use_count = 0
# Overriden by subclasses to implement logic to update the entries dict
# when the directory is stale
+ @use_counter
def update(self):
pass
@@ -60,6 +74,15 @@ class Directory(FreshBase):
def size(self):
return 0
+ def in_use(self):
+ return self.use_count > 0
+
+ def inc_use(self):
+ self.use_count += 1
+
+ def dec_use(self):
+ self.use_count -= 1
+
def checkupdate(self):
if self.stale():
try:
@@ -67,22 +90,25 @@ class Directory(FreshBase):
except apiclient.errors.HttpError as e:
_logger.debug(e)
+ @use_counter
def __getitem__(self, item):
self.checkupdate()
return self._entries[item]
+ @use_counter
def items(self):
self.checkupdate()
- return self._entries.items()
-
- def __iter__(self):
- self.checkupdate()
- return self._entries.iterkeys()
+ return list(self._entries.items())
+ @use_counter
def __contains__(self, k):
self.checkupdate()
return k in self._entries
+ def fresh(self):
+ self.inodes.touch(self)
+ super(Directory, self).fresh()
+
def merge(self, items, fn, same, new_entry):
"""Helper method for updating the contents of the directory.
@@ -132,17 +158,26 @@ class Directory(FreshBase):
self.fresh()
- def clear(self):
+ def clear(self, force=False):
"""Delete all entries"""
- oldentries = self._entries
- self._entries = {}
- for n in oldentries:
- if isinstance(n, Directory):
- n.clear()
- llfuse.invalidate_entry(self.inode, str(n))
- self.inodes.del_entry(oldentries[n])
- llfuse.invalidate_inode(self.inode)
- self.invalidate()
+
+ if not self.in_use() or force:
+ oldentries = self._entries
+ self._entries = {}
+ for n in oldentries:
+ if isinstance(n, Directory):
+ if not n.clear(force):
+ self._entries = oldentries
+ return False
+ for n in oldentries:
+ if isinstance(n, Directory):
+ llfuse.invalidate_entry(self.inode, str(n))
+ self.inodes.del_entry(oldentries[n])
+ llfuse.invalidate_inode(self.inode)
+ self.invalidate()
+ return True
+ else:
+ return False
def mtime(self):
return self._mtime
@@ -152,8 +187,7 @@ class CollectionDirectory(Directory):
"""Represents the root of a directory tree holding a collection."""
def __init__(self, parent_inode, inodes, api, num_retries, collection):
- super(CollectionDirectory, self).__init__(parent_inode)
- self.inodes = inodes
+ super(CollectionDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
self.collection_object_file = None
@@ -187,14 +221,17 @@ class CollectionDirectory(Directory):
if self.collection_object_file is not None:
self.collection_object_file.update(self.collection_object)
- self.clear()
+ self.clear(force=True)
for s in coll_reader.all_streams():
cwd = self
for part in s.name().split('/'):
if part != '' and part != '.':
partname = sanitize_filename(part)
if partname not in cwd._entries:
- cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
+ cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
+ # (hack until using new API)
+ cwd._entries[partname].inc_use()
+ # end hack
cwd = cwd._entries[partname]
for k, v in s.files().items():
cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
@@ -257,6 +294,16 @@ class CollectionDirectory(Directory):
else:
return super(CollectionDirectory, self).__contains__(k)
+ def invalidate(self):
+ super(CollectionDirectory, self).invalidate()
+ self.collection_object = None
+
+ def clear(self, force=False):
+ if self.collection_locator is None:
+ return False
+ else:
+ return super(CollectionDirectory, self).clear(force)
+
class MagicDirectory(Directory):
"""A special directory that logically contains the set of all extant keep locators.
@@ -281,8 +328,7 @@ will appear if it exists.
""".lstrip()
def __init__(self, parent_inode, inodes, api, num_retries):
- super(MagicDirectory, self).__init__(parent_inode)
- self.inodes = inodes
+ super(MagicDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
@@ -308,6 +354,7 @@ will appear if it exists.
try:
e = self.inodes.add_entry(CollectionDirectory(
self.inode, self.inodes, self.api, self.num_retries, k))
+
if e.update():
self._entries[k] = e
return True
@@ -323,28 +370,25 @@ will appear if it exists.
else:
raise KeyError("No collection with id " + item)
+ def clear(self, force=False):
+ pass
+
class RecursiveInvalidateDirectory(Directory):
def invalidate(self):
- if self.inode == llfuse.ROOT_INODE:
- llfuse.lock.acquire()
try:
super(RecursiveInvalidateDirectory, self).invalidate()
for a in self._entries:
self._entries[a].invalidate()
except Exception:
_logger.exception()
- finally:
- if self.inode == llfuse.ROOT_INODE:
- llfuse.lock.release()
class TagsDirectory(RecursiveInvalidateDirectory):
"""A special directory that contains as subdirectories all tags visible to the user."""
def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode)
- self.inodes = inodes
+ super(TagsDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
self._poll = True
@@ -370,8 +414,7 @@ class TagDirectory(Directory):
def __init__(self, parent_inode, inodes, api, num_retries, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode)
- self.inodes = inodes
+ super(TagDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
self.tag = tag
@@ -397,8 +440,7 @@ class ProjectDirectory(Directory):
def __init__(self, parent_inode, inodes, api, num_retries, project_object,
poll=False, poll_time=60):
- super(ProjectDirectory, self).__init__(parent_inode)
- self.inodes = inodes
+ super(ProjectDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
@@ -462,11 +504,6 @@ class ProjectDirectory(Directory):
contents = arvados.util.list_all(self.api.groups().contents,
self.num_retries, uuid=self.uuid)
- # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
- contents += arvados.util.list_all(
- self.api.links().list, self.num_retries,
- filters=[['tail_uuid', '=', self.uuid],
- ['link_class', '=', 'name']])
# end with llfuse.lock_released, re-acquire lock
@@ -494,8 +531,7 @@ class SharedDirectory(Directory):
def __init__(self, parent_inode, inodes, api, num_retries, exclude,
poll=False, poll_time=60):
- super(SharedDirectory, self).__init__(parent_inode)
- self.inodes = inodes
+ super(SharedDirectory, self).__init__(parent_inode, inodes)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index 6dd2d8a..e122d9d 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -24,6 +24,14 @@ class File(FreshBase):
def mtime(self):
return self._mtime
+ def clear(self):
+ pass
+
+ def inc_use(self):
+ pass
+
+ def dec_use(self):
+ pass
class StreamReaderFile(File):
"""Wraps a StreamFileReader as a file."""
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 764a099..4802d69 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -25,7 +25,7 @@ class MountTestBase(unittest.TestCase):
self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
def make_mount(self, root_class, **root_kwargs):
- operations = fuse.Operations(os.getuid(), os.getgid())
+ operations = fuse.Operations(os.getuid(), os.getgid(), cache_cap=2)
operations.inodes.add_entry(root_class(
llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
llfuse.init(operations, self.mounttmp, [])
@@ -243,8 +243,7 @@ class FuseSharedTest(MountTestBase):
# directory)
fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
fuse_user_objs.sort()
- self.assertEqual(['Empty collection.link', # permission link on collection
- 'FUSE Test Project', # project owned by user
+ self.assertEqual(['FUSE Test Project', # project owned by user
'collection #1 owned by FUSE', # collection owned by user
'collection #2 owned by FUSE', # collection owned by user
'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list