[ARVADOS] updated: e6aa5b48be1209d74c4c69be3eeca549f02418f3
git at public.curoverse.com
git at public.curoverse.com
Tue May 6 11:43:36 EDT 2014
Summary of changes:
sdk/python/arvados/fuse/__init__.py | 222 +++++++++++++++++++++--------------
sdk/python/bin/arv-mount | 18 +++-
sdk/python/requirements.txt | 1 +
sdk/python/setup_fuse.py.src | 3 +-
sdk/python/test_mount.py | 60 +++++++++-
5 files changed, 207 insertions(+), 97 deletions(-)
via e6aa5b48be1209d74c4c69be3eeca549f02418f3 (commit)
via 2e524eb008f8b70ca1263ccde460365109c66a90 (commit)
from b5f193aa657134822d4df67cabc25c631926395b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e6aa5b48be1209d74c4c69be3eeca549f02418f3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 6 11:43:31 2014 -0400
Added test for group mount. Passes.
diff --git a/sdk/python/arvados/fuse/__init__.py b/sdk/python/arvados/fuse/__init__.py
index 15fbbc9..8b734f2 100644
--- a/sdk/python/arvados/fuse/__init__.py
+++ b/sdk/python/arvados/fuse/__init__.py
@@ -345,6 +345,8 @@ class GroupDirectory(Directory):
def same(a, i):
if isinstance(a, CollectionDirectory):
return a.collection_locator == i['uuid']
+ elif isinstance(a, GroupDirectory):
+ return a.uuid == i['uuid']
elif isinstance(a, ObjectFile):
return a.uuid == i['uuid'] and not a.stale()
return False
diff --git a/sdk/python/test_mount.py b/sdk/python/test_mount.py
index 06252af..a462799 100644
--- a/sdk/python/test_mount.py
+++ b/sdk/python/test_mount.py
@@ -10,7 +10,7 @@ import shutil
import subprocess
import glob
import run_test_server
-
+import json
class MountTestBase(unittest.TestCase):
def setUp(self):
@@ -281,3 +281,51 @@ class FuseTagsUpdateTestPoll(FuseTagsUpdateTestBase):
def tearDown(self):
run_test_server.stop(False)
super(FuseTagsUpdateTestPoll, self).tearDown()
+
+
+class FuseGroupsTest(MountTestBase):
+ def setUp(self):
+ super(FuseGroupsTest, self).setUp()
+ run_test_server.run()
+
+ def runTest(self):
+ run_test_server.authorize_with("admin")
+ api = arvados.api('v1')
+
+ operations = fuse.Operations(os.getuid(), os.getgid())
+ e = operations.inodes.add_entry(fuse.GroupsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+
+ llfuse.init(operations, self.mounttmp, [])
+ t = threading.Thread(None, lambda: llfuse.main())
+ t.start()
+
+ # wait until the driver is finished initializing
+ operations.initlock.wait()
+
+ d1 = os.listdir(self.mounttmp)
+ d1.sort()
+ self.assertIn('zzzzz-j7d0g-v955i6s2oi1cbso', d1)
+
+ d2 = os.listdir(os.path.join(self.mounttmp, 'zzzzz-j7d0g-v955i6s2oi1cbso'))
+ d2.sort()
+ self.assertEqual(["I'm a job in a folder",
+ "I'm a template in a folder",
+ "zzzzz-j58dm-5gid26432uujf79",
+ "zzzzz-j58dm-7r18rnd5nzhg5yk",
+ "zzzzz-j7d0g-axqo7eu9pwvna1x"
+ ], d2)
+
+ d3 = os.listdir(os.path.join(self.mounttmp, 'zzzzz-j7d0g-v955i6s2oi1cbso', 'zzzzz-j7d0g-axqo7eu9pwvna1x'))
+ d3.sort()
+ self.assertEqual(["I'm in a subfolder, too",
+ "zzzzz-j58dm-c40lddwcqqr1ffs",
+ "zzzzz-o0j2j-ryhm1bn83ni03sn"
+ ], d3)
+
+ with open(os.path.join(self.mounttmp, 'zzzzz-j7d0g-v955i6s2oi1cbso', "I'm a template in a folder")) as f:
+ j = json.load(f)
+ self.assertEqual("Two Part Pipeline Template", j['name'])
+
+ def tearDown(self):
+ run_test_server.stop()
+ super(FuseGroupsTest, self).tearDown()
commit 2e524eb008f8b70ca1263ccde460365109c66a90
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 6 11:25:34 2014 -0400
Improved fresh/stale handling with base class, added property fuse inode cache
invalidation. Added common merge function for handling directory updates.
arv-mount now daemonizes by default.
diff --git a/sdk/python/arvados/fuse/__init__.py b/sdk/python/arvados/fuse/__init__.py
index da1582c..15fbbc9 100644
--- a/sdk/python/arvados/fuse/__init__.py
+++ b/sdk/python/arvados/fuse/__init__.py
@@ -13,21 +13,49 @@ import arvados
import pprint
import arvados.events
import re
+import apiclient
+import json
from time import time
from llfuse import FUSEError
-class File(object):
- '''Wraps a StreamFileReader for use by Directory.'''
+class FreshBase(object):
+ '''Base class for maintaining fresh/stale state to determine when to update.'''
+ def __init__(self):
+ self._stale = True
+ self._poll = False
+ self._last_update = time()
+ self._poll_time = 60
+
+ # Mark the value as stale
+ def invalidate(self):
+ self._stale = True
+
+ # Test if the entries dict is stale
+ def stale(self):
+ if self._stale:
+ return True
+ if self._poll:
+ return (self._last_update + self._poll_time) < time()
+ return False
+
+ def fresh(self):
+ self._stale = False
+ self._last_update = time()
+
+
+class File(FreshBase):
+ '''Base for file objects.'''
def __init__(self, parent_inode):
+ super(File, self).__init__()
self.inode = None
self.parent_inode = parent_inode
def size(self):
return 0
- def readfrom(self, off, size)):
+ def readfrom(self, off, size):
return ''
@@ -35,22 +63,27 @@ class StreamReaderFile(File):
'''Wraps a StreamFileReader as a file.'''
def __init__(self, parent_inode, reader):
- super(StreamReaderFile, self).__init__(parent_inode, reader)
+ super(StreamReaderFile, self).__init__(parent_inode)
self.reader = reader
def size(self):
return self.reader.size()
- def readfrom(self, off, size)):
+ def readfrom(self, off, size):
return self.reader.readfrom(off, size)
+ def stale(self):
+ return False
+
class ObjectFile(File):
- '''Wraps a serialized object as a file.'''
+ '''Wraps a dict as a serialized json object.'''
def __init__(self, parent_inode, contents):
- super(ObjectFile, self).__init__(parent_inode, reader)
- self.contents = contents
+ super(ObjectFile, self).__init__(parent_inode)
+ self.contentsdict = contents
+ self.uuid = self.contentsdict['uuid']
+ self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
def size(self):
return len(self.contents)
@@ -59,70 +92,83 @@ class ObjectFile(File):
return self.contents[off:(off+size)]
-class Directory(object):
+class Directory(FreshBase):
'''Generic directory object, backed by a dict.
Consists of a set of entries with the key representing the filename
and the value referencing a File or Directory object.
'''
def __init__(self, parent_inode):
+ super(Directory, self).__init__()
+
'''parent_inode is the integer inode number'''
self.inode = None
if not isinstance(parent_inode, int):
raise Exception("parent_inode should be an int")
self.parent_inode = parent_inode
self._entries = {}
- self._stale = True
- self._poll = False
- self._last_update = time()
- self._poll_time = 60
# Overriden by subclasses to implement logic to update the entries dict
# when the directory is stale
def update(self):
pass
- # Mark the entries dict as stale
- def invalidate(self):
- self._stale = True
-
- # Test if the entries dict is stale
- def stale(self):
- if self._stale:
- return True
- if self._poll:
- return (self._last_update + self._poll_time) < time()
- return False
-
- def fresh(self):
- self._stale = False
- self._last_update = time()
-
# Only used when computing the size of the disk footprint of the directory
# (stub)
def size(self):
return 0
- def __getitem__(self, item):
+ def checkupdate(self):
if self.stale():
- self.update()
+ try:
+ self.update()
+ except apiclient.errors.HttpError as e:
+ print e
+
+ def __getitem__(self, item):
+ self.checkupdate()
return self._entries[item]
def items(self):
- if self.stale():
- self.update()
+ self.checkupdate()
return self._entries.items()
def __iter__(self):
- if self.stale():
- self.update()
+ self.checkupdate()
return self._entries.iterkeys()
def __contains__(self, k):
- if self.stale():
- self.update()
+ self.checkupdate()
return k in self._entries
+ def merge(self, items, fn, same, new_entry):
+ '''Helper method for updating the contents of the directory.
+
+ items: array with new directory contents
+
+ fn: function to take an entry in 'items' and return the desired file or
+ directory name
+
+ same: function to compare an existing entry with an entry in the items
+ list to determine whether to keep the existing entry.
+
+ new_entry: function to create a new directory entry from array entry.
+ '''
+
+ oldentries = self._entries
+ self._entries = {}
+ for i in items:
+ n = fn(i)
+ if n in oldentries and same(oldentries[n], i):
+ self._entries[n] = oldentries[n]
+ del oldentries[n]
+ else:
+ self._entries[n] = self.inodes.add_entry(new_entry(i))
+ for n in oldentries:
+ llfuse.invalidate_entry(self.inode, str(n))
+ self.inodes.del_entry(oldentries[n])
+ self.fresh()
+
class CollectionDirectory(Directory):
'''Represents the root of a directory tree holding a collection.'''
@@ -132,6 +178,9 @@ class CollectionDirectory(Directory):
self.inodes = inodes
self.collection_locator = collection_locator
+ def same(self, i):
+ return i['uuid'] == self.collection_locator
+
def update(self):
collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
for s in collection.all_streams():
@@ -199,16 +248,10 @@ class TagsDirectory(Directory):
def update(self):
tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
- oldentries = self._entries
- self._entries = {}
- for n in tags['items']:
- n = n['name']
- if n in oldentries:
- self._entries[n] = oldentries[n]
- else:
- self._entries[n] = self.inodes.add_entry(TagDirectory(self.inode, self.inodes, self.api, n, poll=self._poll, poll_time=self._poll_time))
- self.fresh()
-
+ self.merge(tags['items'],
+ lambda i: i['name'],
+ lambda a, i: a.tag == i,
+ lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
class TagDirectory(Directory):
'''A special directory that contains as subdirectories all collections visible
@@ -224,19 +267,14 @@ class TagDirectory(Directory):
self._poll_time = poll_time
def update(self):
- collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
+ taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
['name', '=', self.tag],
['head_uuid', 'is_a', 'arvados#collection']],
select=['head_uuid']).execute()
- oldentries = self._entries
- self._entries = {}
- for c in collections['items']:
- n = c['head_uuid']
- if n in oldentries:
- self._entries[n] = oldentries[n]
- else:
- self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
- self.fresh()
+ self.merge(taggedcollections['items'],
+ lambda i: i['head_uuid'],
+ lambda a, i: a.collection_locator == i['head_uuid'],
+ lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
class GroupsDirectory(Directory):
@@ -260,16 +298,10 @@ class GroupsDirectory(Directory):
def update(self):
groups = self.api.groups().list().execute()
- oldentries = self._entries
- self._entries = {}
- for n in groups['items']:
- id = n['name']
- if id in oldentries and oldentries[id].uuid == n['uuid']:
- self._entries[id] = oldentries[id]
- else:
- self._entries[id] = self.inodes.add_entry(GroupDirectory(self.inode, self.inodes, self.api,
- n['uuid'], poll=self._poll, poll_time=self._poll_time))
- self.fresh()
+ self.merge(groups['items'],
+ lambda i: i['uuid'],
+ lambda a, i: a.uuid == i['uuid'],
+ lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
class GroupDirectory(Directory):
@@ -279,7 +311,7 @@ class GroupDirectory(Directory):
super(GroupDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
- self.uuid = uuid
+ self.uuid = uuid['uuid']
self._poll = poll
self._poll_time = poll_time
@@ -289,38 +321,38 @@ class GroupDirectory(Directory):
for a in self._entries:
self._entries[a].invalidate()
- def createDirectory(self, parent_inode, inodes, api, uuid, poll, poll_time):
- print uuid
+ def createDirectory(self, i):
if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
- return CollectionDirectory(parent_inode, inodes, i['uuid'])
- if re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
- return ObjectFile(parent_inode, inodes, json.dumps(i))
+ return CollectionDirectory(self.inode, self.inodes, i['uuid'])
+ elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
+ return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
+ elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
+ return ObjectFile(self.parent_inode, i)
return None
def update(self):
- contents = self.api.groups().contents(uuid=self.uuid).execute()
+ contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
links = {}
for a in contents['links']:
links[a['head_uuid']] = a['name']
- oldentries = self._entries
- self._entries = {}
-
- for i in contents['items']:
+ def choose_name(i):
if i['uuid'] in links:
- n = links[i['uuid']]
- elif 'name' in i and len(i['name']) > 0:
- n = i['name']
+ return links[i['uuid']]
else:
- n = i['uuid']
+ return i['uuid']
- if n in oldentries and oldentries[n].uuid == i['uuid']:
- self._entries[n] = oldentries[n]
- else:
- d = self.createDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
- if d != None:
- self._entries[n] = self.inodes.add_entry(d)
- self.fresh()
+ def same(a, i):
+ if isinstance(a, CollectionDirectory):
+ return a.collection_locator == i['uuid']
+ elif isinstance(a, ObjectFile):
+ return a.uuid == i['uuid'] and not a.stale()
+ return False
+
+ self.merge(contents['items'],
+ choose_name,
+ same,
+ self.createDirectory)
class FileHandle(object):
@@ -361,6 +393,10 @@ class Inodes(object):
self._counter += 1
return entry
+ def del_entry(self, entry):
+ llfuse.invalidate_inode(entry.inode)
+ del self._entries[entry.inode]
+
class Operations(llfuse.Operations):
'''This is the main interface with llfuse. The methods on this object are
called by llfuse threads to service FUSE events to query and read from
@@ -394,6 +430,9 @@ class Operations(llfuse.Operations):
return True
def getattr(self, inode):
+ if inode not in self.inodes:
+ raise llfuse.FUSEError(errno.ENOENT)
+
e = self.inodes[inode]
entry = llfuse.EntryAttributes()
@@ -511,7 +550,8 @@ class Operations(llfuse.Operations):
e = off
while e < len(handle.entry):
- yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
+ if handle.entry[e][1].inode in self.inodes:
+ yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
e += 1
def releasedir(self, fh):
diff --git a/sdk/python/bin/arv-mount b/sdk/python/bin/arv-mount
index 849af3b..fc5491f 100755
--- a/sdk/python/bin/arv-mount
+++ b/sdk/python/bin/arv-mount
@@ -4,6 +4,7 @@ from arvados.fuse import *
import arvados
import subprocess
import argparse
+import daemon
if __name__ == '__main__':
# Handle command line parameters
@@ -24,6 +25,7 @@ with "--".
collections on the server.""")
parser.add_argument('--groups', action='store_true', help="""Mount as a virtual directory consisting of subdirectories representing groups on the server.""")
parser.add_argument('--debug', action='store_true', help="""Debug mode""")
+ parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
dest="exec_args", metavar=('command', 'args', '...', '--'),
help="""Mount, run a command, then unmount and exit""")
@@ -53,10 +55,10 @@ collections on the server.""")
if args.debug:
opts += ['debug']
- # Initialize the fuse connection
- llfuse.init(operations, args.mountpoint, opts)
-
if args.exec_args:
+ # Initialize the fuse connection
+ llfuse.init(operations, args.mountpoint, opts)
+
t = threading.Thread(None, lambda: llfuse.main())
t.start()
@@ -76,4 +78,12 @@ collections on the server.""")
exit(rc)
else:
- llfuse.main()
+ if args.foreground:
+ # Initialize the fuse connection
+ llfuse.init(operations, args.mountpoint, opts)
+ llfuse.main()
+ else:
+ with daemon.DaemonContext():
+ # Initialize the fuse connection
+ llfuse.init(operations, args.mountpoint, opts)
+ llfuse.main()
diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt
index 1a8219b..a6a7591 100644
--- a/sdk/python/requirements.txt
+++ b/sdk/python/requirements.txt
@@ -5,3 +5,4 @@ urllib3==1.7.1
llfuse==0.40
ws4py==0.3.4
PyYAML==3.11
+python-daemon==1.6
diff --git a/sdk/python/setup_fuse.py.src b/sdk/python/setup_fuse.py.src
index 9e191fb..9628712 100644
--- a/sdk/python/setup_fuse.py.src
+++ b/sdk/python/setup_fuse.py.src
@@ -22,6 +22,7 @@ setup(name='arvados-fuse-driver',
],
install_requires=[
'arvados-python-client',
- 'llfuse'
+ 'llfuse',
+ 'python-daemon'
],
zip_safe=False)
diff --git a/sdk/python/test_mount.py b/sdk/python/test_mount.py
index 9251f69..06252af 100644
--- a/sdk/python/test_mount.py
+++ b/sdk/python/test_mount.py
@@ -236,7 +236,7 @@ class FuseTagsUpdateTestBase(MountTestBase):
d3.sort()
self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d3)
- api.links().create(body={'link': {
+ l = api.links().create(body={'link': {
'head_uuid': 'ea10d51bcf88862dbcc36eb292017dfd+45',
'link_class': 'tag',
'name': 'bar_tag'
@@ -248,6 +248,14 @@ class FuseTagsUpdateTestBase(MountTestBase):
d4.sort()
self.assertEqual(['ea10d51bcf88862dbcc36eb292017dfd+45', 'fa7aeb5140e2848d39b416daeef4ffc5+45'], d4)
+ api.links().delete(uuid=l['uuid']).execute()
+
+ time.sleep(1)
+
+ d5 = os.listdir(os.path.join(self.mounttmp, 'bar_tag'))
+ d5.sort()
+ self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d5)
+
class FuseTagsUpdateTestWebsockets(FuseTagsUpdateTestBase):
def setUp(self):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list