[ARVADOS] updated: e6aa5b48be1209d74c4c69be3eeca549f02418f3

git at public.curoverse.com git at public.curoverse.com
Tue May 6 11:43:36 EDT 2014


Summary of changes:
 sdk/python/arvados/fuse/__init__.py |  222 +++++++++++++++++++++--------------
 sdk/python/bin/arv-mount            |   18 +++-
 sdk/python/requirements.txt         |    1 +
 sdk/python/setup_fuse.py.src        |    3 +-
 sdk/python/test_mount.py            |   60 +++++++++-
 5 files changed, 207 insertions(+), 97 deletions(-)

       via  e6aa5b48be1209d74c4c69be3eeca549f02418f3 (commit)
       via  2e524eb008f8b70ca1263ccde460365109c66a90 (commit)
      from  b5f193aa657134822d4df67cabc25c631926395b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e6aa5b48be1209d74c4c69be3eeca549f02418f3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue May 6 11:43:31 2014 -0400

    Added test for group mount.  Passes.

diff --git a/sdk/python/arvados/fuse/__init__.py b/sdk/python/arvados/fuse/__init__.py
index 15fbbc9..8b734f2 100644
--- a/sdk/python/arvados/fuse/__init__.py
+++ b/sdk/python/arvados/fuse/__init__.py
@@ -345,6 +345,8 @@ class GroupDirectory(Directory):
         def same(a, i):
             if isinstance(a, CollectionDirectory):
                 return a.collection_locator == i['uuid']
+            elif isinstance(a, GroupDirectory):
+                return a.uuid == i['uuid']
             elif isinstance(a, ObjectFile):
                 return a.uuid == i['uuid'] and not a.stale()
             return False
diff --git a/sdk/python/test_mount.py b/sdk/python/test_mount.py
index 06252af..a462799 100644
--- a/sdk/python/test_mount.py
+++ b/sdk/python/test_mount.py
@@ -10,7 +10,7 @@ import shutil
 import subprocess
 import glob
 import run_test_server
-
+import json
 
 class MountTestBase(unittest.TestCase):
     def setUp(self):
@@ -281,3 +281,51 @@ class FuseTagsUpdateTestPoll(FuseTagsUpdateTestBase):
     def tearDown(self):
         run_test_server.stop(False)
         super(FuseTagsUpdateTestPoll, self).tearDown()
+
+
+class FuseGroupsTest(MountTestBase):
+    def setUp(self):
+        super(FuseGroupsTest, self).setUp()
+        run_test_server.run()
+
+    def runTest(self):
+        run_test_server.authorize_with("admin")
+        api = arvados.api('v1')
+
+        operations = fuse.Operations(os.getuid(), os.getgid())
+        e = operations.inodes.add_entry(fuse.GroupsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+
+        llfuse.init(operations, self.mounttmp, [])
+        t = threading.Thread(None, lambda: llfuse.main())
+        t.start()
+
+        # wait until the driver is finished initializing
+        operations.initlock.wait()
+
+        d1 = os.listdir(self.mounttmp)
+        d1.sort()
+        self.assertIn('zzzzz-j7d0g-v955i6s2oi1cbso', d1)
+
+        d2 = os.listdir(os.path.join(self.mounttmp, 'zzzzz-j7d0g-v955i6s2oi1cbso'))
+        d2.sort()
+        self.assertEqual(["I'm a job in a folder",
+                          "I'm a template in a folder",
+                          "zzzzz-j58dm-5gid26432uujf79",
+                          "zzzzz-j58dm-7r18rnd5nzhg5yk",
+                          "zzzzz-j7d0g-axqo7eu9pwvna1x"
+                      ], d2)
+
+        d3 = os.listdir(os.path.join(self.mounttmp, 'zzzzz-j7d0g-v955i6s2oi1cbso', 'zzzzz-j7d0g-axqo7eu9pwvna1x'))
+        d3.sort()
+        self.assertEqual(["I'm in a subfolder, too",
+                          "zzzzz-j58dm-c40lddwcqqr1ffs",
+                          "zzzzz-o0j2j-ryhm1bn83ni03sn"
+                      ], d3)
+
+        with open(os.path.join(self.mounttmp, 'zzzzz-j7d0g-v955i6s2oi1cbso', "I'm a template in a folder")) as f:
+            j = json.load(f)
+            self.assertEqual("Two Part Pipeline Template", j['name'])
+
+    def tearDown(self):
+        run_test_server.stop()
+        super(FuseGroupsTest, self).tearDown()

commit 2e524eb008f8b70ca1263ccde460365109c66a90
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue May 6 11:25:34 2014 -0400

    Improved fresh/stale handling with base class, added property fuse inode cache
    invalidation.  Added common merge function for handling directory updates.
    arv-mount now daemonizes by default.

diff --git a/sdk/python/arvados/fuse/__init__.py b/sdk/python/arvados/fuse/__init__.py
index da1582c..15fbbc9 100644
--- a/sdk/python/arvados/fuse/__init__.py
+++ b/sdk/python/arvados/fuse/__init__.py
@@ -13,21 +13,49 @@ import arvados
 import pprint
 import arvados.events
 import re
+import apiclient
+import json
 
 from time import time
 from llfuse import FUSEError
 
-class File(object):
-    '''Wraps a StreamFileReader for use by Directory.'''
+class FreshBase(object):
+    '''Base class for maintaining fresh/stale state to determine when to update.'''
+    def __init__(self):
+        self._stale = True
+        self._poll = False
+        self._last_update = time()
+        self._poll_time = 60
+
+    # Mark the value as stale
+    def invalidate(self):
+        self._stale = True
+
+    # Test if the entries dict is stale
+    def stale(self):
+        if self._stale:
+            return True
+        if self._poll:
+            return (self._last_update + self._poll_time) < time()
+        return False
+
+    def fresh(self):
+        self._stale = False
+        self._last_update = time()
+
+
+class File(FreshBase):
+    '''Base for file objects.'''
 
     def __init__(self, parent_inode):
+        super(File, self).__init__()
         self.inode = None
         self.parent_inode = parent_inode
 
     def size(self):
         return 0
 
-    def readfrom(self, off, size)):
+    def readfrom(self, off, size):
         return ''
 
 
@@ -35,22 +63,27 @@ class StreamReaderFile(File):
     '''Wraps a StreamFileReader as a file.'''
 
     def __init__(self, parent_inode, reader):
-        super(StreamReaderFile, self).__init__(parent_inode, reader)
+        super(StreamReaderFile, self).__init__(parent_inode)
         self.reader = reader
 
     def size(self):
         return self.reader.size()
 
-    def readfrom(self, off, size)):
+    def readfrom(self, off, size):
         return self.reader.readfrom(off, size)
 
+    def stale(self):
+        return False
+
 
 class ObjectFile(File):
-    '''Wraps a serialized object as a file.'''
+    '''Wraps a dict as a serialized json object.'''
 
     def __init__(self, parent_inode, contents):
-        super(ObjectFile, self).__init__(parent_inode, reader)
-        self.contents = contents
+        super(ObjectFile, self).__init__(parent_inode)
+        self.contentsdict = contents
+        self.uuid = self.contentsdict['uuid']
+        self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
 
     def size(self):
         return len(self.contents)
@@ -59,70 +92,83 @@ class ObjectFile(File):
         return self.contents[off:(off+size)]
 
 
-class Directory(object):
+class Directory(FreshBase):
     '''Generic directory object, backed by a dict.
     Consists of a set of entries with the key representing the filename
     and the value referencing a File or Directory object.
     '''
 
     def __init__(self, parent_inode):
+        super(Directory, self).__init__()
+
         '''parent_inode is the integer inode number'''
         self.inode = None
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
         self._entries = {}
-        self._stale = True
-        self._poll = False
-        self._last_update = time()
-        self._poll_time = 60
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
     def update(self):
         pass
 
-    # Mark the entries dict as stale
-    def invalidate(self):
-        self._stale = True
-
-    # Test if the entries dict is stale
-    def stale(self):
-        if self._stale:
-            return True
-        if self._poll:
-            return (self._last_update + self._poll_time) < time()
-        return False
-
-    def fresh(self):
-        self._stale = False
-        self._last_update = time()
-
     # Only used when computing the size of the disk footprint of the directory
     # (stub)
     def size(self):
         return 0
 
-    def __getitem__(self, item):
+    def checkupdate(self):
         if self.stale():
-            self.update()
+            try:
+                self.update()
+            except apiclient.errors.HttpError as e:
+                print e
+
+    def __getitem__(self, item):
+        self.checkupdate()
         return self._entries[item]
 
     def items(self):
-        if self.stale():
-            self.update()
+        self.checkupdate()
         return self._entries.items()
 
     def __iter__(self):
-        if self.stale():
-            self.update()
+        self.checkupdate()
         return self._entries.iterkeys()
 
     def __contains__(self, k):
-        if self.stale():
-            self.update()
+        self.checkupdate()
         return k in self._entries
 
+    def merge(self, items, fn, same, new_entry):
+        '''Helper method for updating the contents of the directory.
+
+        items: array with new directory contents
+
+        fn: function to take an entry in 'items' and return the desired file or
+        directory name
+
+        same: function to compare an existing entry with an entry in the items
+        list to determine whether to keep the existing entry.
+
+        new_entry: function to create a new directory entry from array entry.
+        '''
+
+        oldentries = self._entries
+        self._entries = {}
+        for i in items:
+            n = fn(i)
+            if n in oldentries and same(oldentries[n], i):
+                self._entries[n] = oldentries[n]
+                del oldentries[n]
+            else:
+                self._entries[n] = self.inodes.add_entry(new_entry(i))
+        for n in oldentries:
+            llfuse.invalidate_entry(self.inode, str(n))
+            self.inodes.del_entry(oldentries[n])
+        self.fresh()
+
 
 class CollectionDirectory(Directory):
     '''Represents the root of a directory tree holding a collection.'''
@@ -132,6 +178,9 @@ class CollectionDirectory(Directory):
         self.inodes = inodes
         self.collection_locator = collection_locator
 
+    def same(self, i):
+        return i['uuid'] == self.collection_locator
+
     def update(self):
         collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
         for s in collection.all_streams():
@@ -199,16 +248,10 @@ class TagsDirectory(Directory):
 
     def update(self):
         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
-        oldentries = self._entries
-        self._entries = {}
-        for n in tags['items']:
-            n = n['name']
-            if n in oldentries:
-                self._entries[n] = oldentries[n]
-            else:
-                self._entries[n] = self.inodes.add_entry(TagDirectory(self.inode, self.inodes, self.api, n, poll=self._poll, poll_time=self._poll_time))
-        self.fresh()
-
+        self.merge(tags['items'],
+                   lambda i: i['name'],
+                   lambda a, i: a.tag == i,
+                   lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
 
 class TagDirectory(Directory):
     '''A special directory that contains as subdirectories all collections visible
@@ -224,19 +267,14 @@ class TagDirectory(Directory):
         self._poll_time = poll_time
 
     def update(self):
-        collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
+        taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
                                                ['name', '=', self.tag],
                                                ['head_uuid', 'is_a', 'arvados#collection']],
                                       select=['head_uuid']).execute()
-        oldentries = self._entries
-        self._entries = {}
-        for c in collections['items']:
-            n = c['head_uuid']
-            if n in oldentries:
-                self._entries[n] = oldentries[n]
-            else:
-                self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
-        self.fresh()
+        self.merge(taggedcollections['items'],
+                   lambda i: i['head_uuid'],
+                   lambda a, i: a.collection_locator == i['head_uuid'],
+                   lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
 
 
 class GroupsDirectory(Directory):
@@ -260,16 +298,10 @@ class GroupsDirectory(Directory):
 
     def update(self):
         groups = self.api.groups().list().execute()
-        oldentries = self._entries
-        self._entries = {}
-        for n in groups['items']:
-            id = n['name']
-            if id in oldentries and oldentries[id].uuid == n['uuid']:
-                self._entries[id] = oldentries[id]
-            else:
-                self._entries[id] = self.inodes.add_entry(GroupDirectory(self.inode, self.inodes, self.api,
-                                                                         n['uuid'], poll=self._poll, poll_time=self._poll_time))
-        self.fresh()
+        self.merge(groups['items'],
+                   lambda i: i['uuid'],
+                   lambda a, i: a.uuid == i['uuid'],
+                   lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
 
 
 class GroupDirectory(Directory):
@@ -279,7 +311,7 @@ class GroupDirectory(Directory):
         super(GroupDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
-        self.uuid = uuid
+        self.uuid = uuid['uuid']
         self._poll = poll
         self._poll_time = poll_time
 
@@ -289,38 +321,38 @@ class GroupDirectory(Directory):
             for a in self._entries:
                 self._entries[a].invalidate()
 
-    def createDirectory(self, parent_inode, inodes, api, uuid, poll, poll_time):
-        print uuid
+    def createDirectory(self, i):
         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
-            return CollectionDirectory(parent_inode, inodes, i['uuid'])
-        if re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
-            return ObjectFile(parent_inode, inodes, json.dumps(i))
+            return CollectionDirectory(self.inode, self.inodes, i['uuid'])
+        elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
+            return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
+        elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
+            return ObjectFile(self.parent_inode, i)
         return None
 
     def update(self):
-        contents = self.api.groups().contents(uuid=self.uuid).execute()
+        contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
         links = {}
         for a in contents['links']:
             links[a['head_uuid']] = a['name']
 
-        oldentries = self._entries
-        self._entries = {}
-
-        for i in contents['items']:
+        def choose_name(i):
             if i['uuid'] in links:
-                n = links[i['uuid']]
-            elif 'name' in i and len(i['name']) > 0:
-                n = i['name']
+                return links[i['uuid']]
             else:
-                n = i['uuid']
+                return i['uuid']
 
-            if n in oldentries and oldentries[n].uuid == i['uuid']:
-                self._entries[n] = oldentries[n]
-            else:
-                d = self.createDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
-                if d != None:
-                    self._entries[n] = self.inodes.add_entry(d)
-        self.fresh()
+        def same(a, i):
+            if isinstance(a, CollectionDirectory):
+                return a.collection_locator == i['uuid']
+            elif isinstance(a, ObjectFile):
+                return a.uuid == i['uuid'] and not a.stale()
+            return False
+
+        self.merge(contents['items'],
+                   choose_name,
+                   same,
+                   self.createDirectory)
 
 
 class FileHandle(object):
@@ -361,6 +393,10 @@ class Inodes(object):
         self._counter += 1
         return entry
 
+    def del_entry(self, entry):
+        llfuse.invalidate_inode(entry.inode)
+        del self._entries[entry.inode]
+
 class Operations(llfuse.Operations):
     '''This is the main interface with llfuse.  The methods on this object are
     called by llfuse threads to service FUSE events to query and read from
@@ -394,6 +430,9 @@ class Operations(llfuse.Operations):
         return True
 
     def getattr(self, inode):
+        if inode not in self.inodes:
+            raise llfuse.FUSEError(errno.ENOENT)
+
         e = self.inodes[inode]
 
         entry = llfuse.EntryAttributes()
@@ -511,7 +550,8 @@ class Operations(llfuse.Operations):
 
         e = off
         while e < len(handle.entry):
-            yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
+            if handle.entry[e][1].inode in self.inodes:
+                yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
             e += 1
 
     def releasedir(self, fh):
diff --git a/sdk/python/bin/arv-mount b/sdk/python/bin/arv-mount
index 849af3b..fc5491f 100755
--- a/sdk/python/bin/arv-mount
+++ b/sdk/python/bin/arv-mount
@@ -4,6 +4,7 @@ from arvados.fuse import *
 import arvados
 import subprocess
 import argparse
+import daemon
 
 if __name__ == '__main__':
     # Handle command line parameters
@@ -24,6 +25,7 @@ with "--".
 collections on the server.""")
     parser.add_argument('--groups', action='store_true', help="""Mount as a virtual directory consisting of subdirectories representing groups on the server.""")
     parser.add_argument('--debug', action='store_true', help="""Debug mode""")
+    parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
     parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                         dest="exec_args", metavar=('command', 'args', '...', '--'),
                         help="""Mount, run a command, then unmount and exit""")
@@ -53,10 +55,10 @@ collections on the server.""")
     if args.debug:
         opts += ['debug']
 
-    # Initialize the fuse connection
-    llfuse.init(operations, args.mountpoint, opts)
-
     if args.exec_args:
+        # Initialize the fuse connection
+        llfuse.init(operations, args.mountpoint, opts)
+
         t = threading.Thread(None, lambda: llfuse.main())
         t.start()
 
@@ -76,4 +78,12 @@ collections on the server.""")
 
         exit(rc)
     else:
-        llfuse.main()
+        if args.foreground:
+            # Initialize the fuse connection
+            llfuse.init(operations, args.mountpoint, opts)
+            llfuse.main()
+        else:
+            with daemon.DaemonContext():
+                # Initialize the fuse connection
+                llfuse.init(operations, args.mountpoint, opts)
+                llfuse.main()
diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt
index 1a8219b..a6a7591 100644
--- a/sdk/python/requirements.txt
+++ b/sdk/python/requirements.txt
@@ -5,3 +5,4 @@ urllib3==1.7.1
 llfuse==0.40
 ws4py==0.3.4
 PyYAML==3.11
+python-daemon==1.6
diff --git a/sdk/python/setup_fuse.py.src b/sdk/python/setup_fuse.py.src
index 9e191fb..9628712 100644
--- a/sdk/python/setup_fuse.py.src
+++ b/sdk/python/setup_fuse.py.src
@@ -22,6 +22,7 @@ setup(name='arvados-fuse-driver',
         ],
       install_requires=[
         'arvados-python-client',
-	'llfuse'
+	'llfuse',
+        'python-daemon'
         ],
       zip_safe=False)
diff --git a/sdk/python/test_mount.py b/sdk/python/test_mount.py
index 9251f69..06252af 100644
--- a/sdk/python/test_mount.py
+++ b/sdk/python/test_mount.py
@@ -236,7 +236,7 @@ class FuseTagsUpdateTestBase(MountTestBase):
         d3.sort()
         self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d3)
 
-        api.links().create(body={'link': {
+        l = api.links().create(body={'link': {
             'head_uuid': 'ea10d51bcf88862dbcc36eb292017dfd+45',
             'link_class': 'tag',
             'name': 'bar_tag'
@@ -248,6 +248,14 @@ class FuseTagsUpdateTestBase(MountTestBase):
         d4.sort()
         self.assertEqual(['ea10d51bcf88862dbcc36eb292017dfd+45', 'fa7aeb5140e2848d39b416daeef4ffc5+45'], d4)
 
+        api.links().delete(uuid=l['uuid']).execute()
+
+        time.sleep(1)
+
+        d5 = os.listdir(os.path.join(self.mounttmp, 'bar_tag'))
+        d5.sort()
+        self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d5)
+
 
 class FuseTagsUpdateTestWebsockets(FuseTagsUpdateTestBase):
     def setUp(self):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list