[ARVADOS] created: 408cbfd4f3737ef5bfbf908d96ac9c469fcf6cdd

Git user git at public.curoverse.com
Thu Jul 27 09:27:06 EDT 2017


        at  408cbfd4f3737ef5bfbf908d96ac9c469fcf6cdd (commit)


commit 408cbfd4f3737ef5bfbf908d96ac9c469fcf6cdd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 11 10:58:50 2017 -0400

    11158: Support incremental update of project contents based on websocket events.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index a43a556..175ec3b 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -394,30 +394,30 @@ class Operations(llfuse.Operations):
 
     @catch_exceptions
     def on_event(self, ev):
-        if 'event_type' not in ev:
+        if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
             return
         with llfuse.lock:
-            new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
-            pdh = new_attrs.get("portable_data_hash")
-            # new_attributes.modified_at currently lacks
-            # subsecond precision (see #6347) so use event_at
-            # which should always be the same.
-            stamp = ev.get("event_at")
+            properties = ev.get("properties") or {}
+            old_attrs = properties.get("old_attributes") or {}
+            new_attrs = properties.get("new_attributes") or {}
 
             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if stamp and pdh and ev.get("object_kind") == "arvados#collection":
-                    item.update(to_record_version=(stamp, pdh))
-                else:
-                    item.update()
-
-            oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+                if ev.get("object_kind") == "arvados#collection":
+                    pdh = new_attrs.get("portable_data_hash")
+                    # new_attributes.modified_at currently lacks
+                    # subsecond precision (see #6347) so use event_at
+                    # which should always be the same.
+                    stamp = ev.get("event_at")
+                    if stamp and pdh and item.writable() and item.collection is not None and item.collection.modified():
+                        item.update(to_record_version=(stamp, pdh))
+
+            oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
                     self.inodes.inode_cache.find_by_uuid(oldowner) +
                     self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.invalidate()
-                parent.update()
+                parent.child_event(ev)
 
     @catch_exceptions
     def getattr(self, inode, ctx=None):
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 34295ef..a51dd90 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -139,3 +139,6 @@ class FreshBase(object):
 
     def finalize(self):
         pass
+
+    def child_event(self, ev):
+        pass
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 70bc982..9869b9f 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -802,7 +802,7 @@ class ProjectDirectory(Directory):
         if 'name' in i:
             if i['name'] is None or len(i['name']) == 0:
                 return None
-            elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
+            elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
                 # collection or subproject
                 return i['name']
             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
@@ -861,6 +861,11 @@ class ProjectDirectory(Directory):
         finally:
             self._updating_lock.release()
 
+    def _add_entry(self, i, name):
+        ent = self.createDirectory(i)
+        self._entries[name] = self.inodes.add_entry(ent)
+        return self._entries[name]
+
     @use_counter
     @check_update
     def __getitem__(self, k):
@@ -878,13 +883,11 @@ class ProjectDirectory(Directory):
                                                                 ["name", "=", k]],
                                                        limit=1).execute(num_retries=self.num_retries)["items"]
         if contents:
-            i = contents[0]
-            name = sanitize_filename(self.namefn(i))
+            name = sanitize_filename(self.namefn(contents[0]))
             if name != k:
                 raise KeyError(k)
-            ent = self.createDirectory(i)
-            self._entries[name] = self.inodes.add_entry(ent)
-            return self._entries[name]
+            return self._add_entry(contents[0], name)
+
         # Didn't find item
         raise KeyError(k)
 
@@ -962,6 +965,49 @@ class ProjectDirectory(Directory):
         self._entries[name_new] = ent
         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
+    @use_counter
+    def child_event(self, ev):
+        properties = ev.get("properties") or {}
+        old_attrs = properties.get("old_attributes") or {}
+        new_attrs = properties.get("new_attributes") or {}
+        old_attrs["uuid"] = ev["object_uuid"]
+        new_attrs["uuid"] = ev["object_uuid"]
+        old_name = sanitize_filename(self.namefn(old_attrs))
+        new_name = sanitize_filename(self.namefn(new_attrs))
+
+        # create events will have a new name, but not an old name
+        # delete events will have an old name, but not a new name
+        # update events will have an old and new name, and they may be same or different
+        # if they are the same, an unrelated field changed and there is nothing to do.
+
+        if old_attrs.get("owner_uuid") != self.project_uuid:
+            # Was moved from somewhere else, so don't try to remove entry.
+            old_name = None
+        if ev.get("object_owner_uuid") != self.project_uuid:
+            # Was moved to somewhere else, so don't try to add entry
+            new_name = None
+
+        if ev.get("object_kind") == "arvados#collection" and old_attrs == new_attrs:
+            with llfuse.lock_released:
+                cr = self.api.collections().list(filters=[["uuid", "=", ev["object_uuid"]]], include_trash=True).execute(num_retries=self.num_retries)
+                if cr['items'] and cr['items'][0]['is_trashed']:
+                    new_name = None
+
+        if new_name != old_name:
+            ent = None
+            if old_name in self._entries:
+                ent = self._entries[old_name]
+                del self._entries[old_name]
+            self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+            if new_name:
+                if ent:
+                    self._entries[new_name] = ent
+                else:
+                    self._add_entry(new_attrs, new_name)
+            elif ent:
+                self.inodes.del_entry(ent)
+
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""

commit 5d0cd36a015e2479cc18a68c6484857cdaf3737a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jul 10 16:52:41 2017 -0400

    Revert python sdk changes.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4103b30..e6e93f0 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,7 +27,6 @@ import sys
 import threading
 from . import timer
 import urllib.parse
-import errno
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -541,7 +540,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-
+    
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -552,19 +551,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-
+        
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-
+        
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-
+        
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -613,25 +612,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-
+        
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-
+        
         def done(self):
             return self.queue.successful_copies
-
+        
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-
+        
         def response(self):
             return self.queue.response
-
-
+    
+    
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -1131,7 +1130,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1188,16 +1187,8 @@ class KeepClient(object):
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             return b''
-
-        try:
-            with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
-                return f.read()
-        except IOError as e:
-            if e.errno == errno.ENOENT:
-                raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
-            else:
-                raise
-
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+            return f.read()
 
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 1ab9996..97e1d26 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -369,17 +369,14 @@ def is_hex(s, *length_args):
 def list_all(fn, num_retries=0, **kwargs):
     # Default limit to (effectively) api server's MAX_LIMIT
     kwargs.setdefault('limit', sys.maxsize)
-    kwargs.setdefault('order', 'uuid asc')
-    kwargs.setdefault('count', 'none')
-    addfilters = kwargs.get("filters", [])
     items = []
-    while True:
-        c = fn(**kwargs).execute(num_retries=num_retries)
-        items.extend(c['items'])
-        if len(c['items']) < c['limit']:
-            # Didn't return a full page, so we're done.
-            break
-        kwargs["filters"] = addfilters + [["uuid", ">", c['items'][-1]['uuid']]]
+    offset = 0
+    items_available = sys.maxsize
+    while len(items) < items_available:
+        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
+        items += c['items']
+        items_available = c['items_available']
+        offset = c['offset'] + len(c['items'])
     return items
 
 def ca_certs_path(fallback=httplib2.CA_CERTS):

commit bffd1e4ee8532992b3790e4f232804a6731a9685
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jul 3 16:24:08 2017 -0400

    11158: Fixes & test updates for ProjectDirectory.
    
    * list_all() orders by uuid and uses it for paging.
    
    * Move dynamic item lookup from __contains__ to __getitem__ to fix some tests.
    
    * Remove test that check for pipeline objects because ProjectDirectory doesn't
    create them any more.
    
    * FuseSharedTest now runs in a multiprocess worker to avoid deadlock.
    
    * Tweak local_store_get used in testing to raise NotFoundError for consistency
    with real get().
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e6e93f0..4103b30 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,6 +27,7 @@ import sys
 import threading
 from . import timer
 import urllib.parse
+import errno
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -540,7 +541,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
+
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -551,19 +552,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -612,25 +613,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -1130,7 +1131,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1187,8 +1188,16 @@ class KeepClient(object):
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             return b''
-        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
-            return f.read()
+
+        try:
+            with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+                return f.read()
+        except IOError as e:
+            if e.errno == errno.ENOENT:
+                raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
+            else:
+                raise
+
 
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 779b416..1ab9996 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -369,17 +369,17 @@ def is_hex(s, *length_args):
 def list_all(fn, num_retries=0, **kwargs):
     # Default limit to (effectively) api server's MAX_LIMIT
     kwargs.setdefault('limit', sys.maxsize)
-    kwargs.setdefault('order', 'created_at asc')
+    kwargs.setdefault('order', 'uuid asc')
     kwargs.setdefault('count', 'none')
+    addfilters = kwargs.get("filters", [])
     items = []
-    offset = 0
     while True:
-        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
+        c = fn(**kwargs).execute(num_retries=num_retries)
         items.extend(c['items'])
         if len(c['items']) < c['limit']:
             # Didn't return a full page, so we're done.
             break
-        offset = c['offset'] + len(c['items'])
+        kwargs["filters"] = addfilters + [["uuid", ">", c['items'][-1]['uuid']]]
     return items
 
 def ca_certs_path(fallback=httplib2.CA_CERTS):
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index b3717ff..4dad90c 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -205,12 +205,16 @@ class Mount(object):
         self.logger.info("enable write is %s", self.args.enable_write)
 
     def _setup_api(self):
-        self.api = arvados.safeapi.ThreadSafeApiCache(
-            apiconfig=arvados.config.settings(),
-            keep_params={
-                'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
-                'num_retries': self.args.retries,
-            })
+        try:
+            self.api = arvados.safeapi.ThreadSafeApiCache(
+                apiconfig=arvados.config.settings(),
+                keep_params={
+                    'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+                    'num_retries': self.args.retries,
+                })
+        except KeyError as e:
+            self.logger.error("Missing environment: %s", e)
+            exit(1)
         # Do a sanity check that we have a working arvados host + token.
         self.api.users().current().execute()
 
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 49d9711..70bc982 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -847,12 +847,10 @@ class ProjectDirectory(Directory):
                 contents = arvados.util.list_all(self.api.groups().list,
                                                  self.num_retries,
                                                  filters=[["owner_uuid", "=", self.project_uuid],
-                                                          ["group_class", "=", "project"]],
-                                                 limit=1000)
+                                                          ["group_class", "=", "project"]])
                 contents.extend(arvados.util.list_all(self.api.collections().list,
                                                       self.num_retries,
-                                                      filters=[["owner_uuid", "=", self.project_uuid]],
-                                                      limit=1000))
+                                                      filters=[["owner_uuid", "=", self.project_uuid]]))
 
             # end with llfuse.lock_released, re-acquire lock
 
@@ -861,39 +859,43 @@ class ProjectDirectory(Directory):
                        samefn,
                        self.createDirectory)
         finally:
-            self._full_listing = False
             self._updating_lock.release()
 
     @use_counter
     @check_update
-    def __getitem__(self, item):
-        if item == '.arvados#project':
+    def __getitem__(self, k):
+        if k == '.arvados#project':
             return self.project_object_file
-        else:
-            return super(ProjectDirectory, self).__getitem__(item)
+        elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+            return super(ProjectDirectory, self).__getitem__(k)
+        with llfuse.lock_released:
+            contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                       ["group_class", "=", "project"],
+                                                       ["name", "=", k]],
+                                              limit=1).execute(num_retries=self.num_retries)["items"]
+            if not contents:
+                contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                ["name", "=", k]],
+                                                       limit=1).execute(num_retries=self.num_retries)["items"]
+        if contents:
+            i = contents[0]
+            name = sanitize_filename(self.namefn(i))
+            if name != k:
+                raise KeyError(k)
+            ent = self.createDirectory(i)
+            self._entries[name] = self.inodes.add_entry(ent)
+            return self._entries[name]
+        # Didn't find item
+        raise KeyError(k)
 
     def __contains__(self, k):
         if k == '.arvados#project':
             return True
-        else:
-            if super(ProjectDirectory, self).__contains__(k):
-                return True
-            elif not self._full_listing:
-                with llfuse.lock_released:
-                    contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                               ["group_class", "=", "project"],
-                                                               ["name", "=", k]],
-                                                      limit=1).execute(num_retries=self.num_retries)["items"]
-                    if not contents:
-                        contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                                        ["name", "=", k]],
-                                                               limit=1).execute(num_retries=self.num_retries)["items"]
-                if contents:
-                    i = contents[0]
-                    name = sanitize_filename(self.namefn(i))
-                    ent = self.createDirectory(i)
-                    self._entries[name] = self.inodes.add_entry(ent)
-                    return True
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
         return False
 
     @use_counter
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 225e4b2..ec8868a 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -220,66 +220,62 @@ class FuseTagsUpdateTest(MountTestBase):
             attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
 
 
+def fuseSharedTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            # Double check that we can open and read objects in this folder as a file,
+            # and that its contents are what we expect.
+            baz_path = os.path.join(
+                mounttmp,
+                'FUSE User',
+                'FUSE Test Project',
+                'collection in FUSE project',
+                'baz')
+            with open(baz_path) as f:
+                self.assertEqual("baz", f.read())
+
+            # check mtime on collection
+            st = os.stat(baz_path)
+            try:
+                mtime = st.st_mtime_ns / 1000000000
+            except AttributeError:
+                mtime = st.st_mtime
+            self.assertEqual(mtime, 1391448174)
+
+            # shared_dirs is a list of the directories exposed
+            # by fuse.SharedDirectory (i.e. any object visible
+            # to the current user)
+            shared_dirs = llfuse.listdir(mounttmp)
+            shared_dirs.sort()
+            self.assertIn('FUSE User', shared_dirs)
+
+            # fuse_user_objs is a list of the objects owned by the FUSE
+            # test user (which present as files in the 'FUSE User'
+            # directory)
+            fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+            fuse_user_objs.sort()
+            self.assertEqual(['FUSE Test Project',                    # project owned by user
+                              'collection #1 owned by FUSE',          # collection owned by user
+                              'collection #2 owned by FUSE'          # collection owned by user
+                          ], fuse_user_objs)
+
+            # test_proj_files is a list of the files in the FUSE Test Project.
+            test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+            test_proj_files.sort()
+            self.assertEqual(['collection in FUSE project'
+                          ], test_proj_files)
+
+
+    Test().runTest()
+
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.SharedDirectory,
                         exclude=self.api.users().current().execute()['uuid'])
+        keep = arvados.keep.KeepClient()
+        keep.put("baz")
 
-        # shared_dirs is a list of the directories exposed
-        # by fuse.SharedDirectory (i.e. any object visible
-        # to the current user)
-        shared_dirs = llfuse.listdir(self.mounttmp)
-        shared_dirs.sort()
-        self.assertIn('FUSE User', shared_dirs)
-
-        # fuse_user_objs is a list of the objects owned by the FUSE
-        # test user (which present as files in the 'FUSE User'
-        # directory)
-        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
-        fuse_user_objs.sort()
-        self.assertEqual(['FUSE Test Project',                    # project owned by user
-                          'collection #1 owned by FUSE',          # collection owned by user
-                          'collection #2 owned by FUSE',          # collection owned by user
-                          'pipeline instance owned by FUSE.pipelineInstance',  # pipeline instance owned by user
-                      ], fuse_user_objs)
-
-        # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
-        test_proj_files.sort()
-        self.assertEqual(['collection in FUSE project',
-                          'pipeline instance in FUSE project.pipelineInstance',
-                          'pipeline template in FUSE project.pipelineTemplate'
-                      ], test_proj_files)
-
-        # Double check that we can open and read objects in this folder as a file,
-        # and that its contents are what we expect.
-        pipeline_template_path = os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'FUSE Test Project',
-                'pipeline template in FUSE project.pipelineTemplate')
-        with open(pipeline_template_path) as f:
-            j = json.load(f)
-            self.assertEqual("pipeline template in FUSE project", j['name'])
-
-        # check mtime on template
-        st = os.stat(pipeline_template_path)
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1397493304)
-
-        # check mtime on collection
-        st = os.stat(os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'collection #1 owned by FUSE'))
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1391448174)
+        self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
 
 
 class FuseHomeTest(MountTestBase):

commit 007d544703715904e492c584412233fd37873f99
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 30 16:27:43 2017 -0400

    11158: Improve arvados.util.list_all() implementation to avoid relying on
    items_available.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 97e1d26..779b416 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -369,13 +369,16 @@ def is_hex(s, *length_args):
 def list_all(fn, num_retries=0, **kwargs):
     # Default limit to (effectively) api server's MAX_LIMIT
     kwargs.setdefault('limit', sys.maxsize)
+    kwargs.setdefault('order', 'created_at asc')
+    kwargs.setdefault('count', 'none')
     items = []
     offset = 0
-    items_available = sys.maxsize
-    while len(items) < items_available:
+    while True:
         c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
-        items += c['items']
-        items_available = c['items_available']
+        items.extend(c['items'])
+        if len(c['items']) < c['limit']:
+            # Didn't return a full page, so we're done.
+            break
         offset = c['offset'] + len(c['items'])
     return items
 
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index b951755..49d9711 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -848,11 +848,11 @@ class ProjectDirectory(Directory):
                                                  self.num_retries,
                                                  filters=[["owner_uuid", "=", self.project_uuid],
                                                           ["group_class", "=", "project"]],
-                                                 limit=1000, order="modified_at desc")
+                                                 limit=1000)
                 contents.extend(arvados.util.list_all(self.api.collections().list,
                                                       self.num_retries,
                                                       filters=[["owner_uuid", "=", self.project_uuid]],
-                                                      limit=1000, order="modified_at desc"))
+                                                      limit=1000))
 
             # end with llfuse.lock_released, re-acquire lock
 

commit d0379cea8627ec6a2f507def8b5587bbd6edd72b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 30 14:52:10 2017 -0400

    11158: Restore paging to get full list of collections and projects (but still
    don't load other object types).  Defer full listing load until first call to
    items() to support efficient access of individual collections/subprojects by
    name.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index ca18009..b951755 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -771,7 +771,7 @@ class ProjectDirectory(Directory):
         self._poll_time = poll_time
         self._updating_lock = threading.Lock()
         self._current_user = None
-        self._extra = set()
+        self._full_listing = False
 
     def want_event_subscribe(self):
         return True
@@ -794,27 +794,35 @@ class ProjectDirectory(Directory):
     def uuid(self):
         return self.project_uuid
 
+    def items(self):
+        self._full_listing = True
+        return super(ProjectDirectory, self).items()
+
+    def namefn(self, i):
+        if 'name' in i:
+            if i['name'] is None or len(i['name']) == 0:
+                return None
+            elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
+                # collection or subproject
+                return i['name']
+            elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
+                # name link
+                return i['name']
+            elif 'kind' in i and i['kind'].startswith('arvados#'):
+                # something else
+                return "{}.{}".format(i['name'], i['kind'][8:])
+        else:
+            return None
+
+
     @use_counter
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
             self.inodes.add_entry(self.project_object_file)
 
-        def namefn(i):
-            if 'name' in i:
-                if i['name'] is None or len(i['name']) == 0:
-                    return None
-                elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
-                    # collection or subproject
-                    return i['name']
-                elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
-                    # name link
-                    return i['name']
-                elif 'kind' in i and i['kind'].startswith('arvados#'):
-                    # something else
-                    return "{}.{}".format(i['name'], i['kind'][8:])
-            else:
-                return None
+        if not self._full_listing:
+            return
 
         def samefn(a, i):
             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
@@ -836,27 +844,24 @@ class ProjectDirectory(Directory):
                     self.project_object = self.api.users().get(
                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-                contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                           ["group_class", "=", "project"]],
-                                                  limit=1000, order="modified_at desc").execute(num_retries=self.num_retries)["items"]
-                contents.extend(self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid]],
-                                                            limit=1000, order="modified_at desc").execute(num_retries=self.num_retries)["items"])
-                if self._extra:
-                    contents.extend(self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                                    ["group_class", "=", "project"],
-                                                                    ["uuid", "in", list(self._extra)]],
-                                                      limit=1000).execute(num_retries=self.num_retries)["items"])
-                    contents.extend(self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                                         ["uuid", "in", list(self._extra)]],
-                                                                limit=1000).execute(num_retries=self.num_retries)["items"])
+                contents = arvados.util.list_all(self.api.groups().list,
+                                                 self.num_retries,
+                                                 filters=[["owner_uuid", "=", self.project_uuid],
+                                                          ["group_class", "=", "project"]],
+                                                 limit=1000, order="modified_at desc")
+                contents.extend(arvados.util.list_all(self.api.collections().list,
+                                                      self.num_retries,
+                                                      filters=[["owner_uuid", "=", self.project_uuid]],
+                                                      limit=1000, order="modified_at desc"))
 
             # end with llfuse.lock_released, re-acquire lock
 
             self.merge(contents,
-                       namefn,
+                       self.namefn,
                        samefn,
                        self.createDirectory)
         finally:
+            self._full_listing = False
             self._updating_lock.release()
 
     @use_counter
@@ -873,22 +878,23 @@ class ProjectDirectory(Directory):
         else:
             if super(ProjectDirectory, self).__contains__(k):
                 return True
-            else:
+            elif not self._full_listing:
                 with llfuse.lock_released:
-                    contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                                    ["name", "=", k]],
-                                                           limit=1).execute(num_retries=self.num_retries)["items"]
+                    contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                               ["group_class", "=", "project"],
+                                                               ["name", "=", k]],
+                                                      limit=1).execute(num_retries=self.num_retries)["items"]
                     if not contents:
-                        contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                                   ["group_class", "=", "project"],
-                                                                   ["name", "=", k]],
-                                                          limit=1).execute(num_retries=self.num_retries)["items"]
+                        contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                        ["name", "=", k]],
+                                                               limit=1).execute(num_retries=self.num_retries)["items"]
                 if contents:
-                    self._extra.add(contents[0]["uuid"])
-                    self.invalidate()
+                    i = contents[0]
+                    name = sanitize_filename(self.namefn(i))
+                    ent = self.createDirectory(i)
+                    self._entries[name] = self.inodes.add_entry(ent)
                     return True
-                else:
-                    return False
+        return False
 
     @use_counter
     @check_update

commit 14032da4e6df97baf353f2783c7e6be2efc907c1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 30 13:13:12 2017 -0400

    11158: Fix order="modified_at".
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 34df3ad..ca18009 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -838,9 +838,9 @@ class ProjectDirectory(Directory):
 
                 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
                                                            ["group_class", "=", "project"]],
-                                                  limit=1000, order=["modified_at desc"]).execute(num_retries=self.num_retries)["items"]
+                                                  limit=1000, order="modified_at desc").execute(num_retries=self.num_retries)["items"]
                 contents.extend(self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid]],
-                                                            limit=1000, order=["modified_at desc"]).execute(num_retries=self.num_retries)["items"])
+                                                            limit=1000, order="modified_at desc").execute(num_retries=self.num_retries)["items"])
                 if self._extra:
                     contents.extend(self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
                                                                     ["group_class", "=", "project"],

commit e2ec2f6f9930ad850b22338866b3f59df006f012
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 29 18:30:33 2017 -0400

    11158: New behavior for project directories.  Allow 'cd' into any collection or
    subproject that exists on the server, even if it isn't returned in the listing.
    Now applies listing limit, ordered by most recent.  No longer lists other
    object types (pipelines, links etc.)
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 30ae6b4..34df3ad 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -771,6 +771,7 @@ class ProjectDirectory(Directory):
         self._poll_time = poll_time
         self._updating_lock = threading.Lock()
         self._current_user = None
+        self._extra = set()
 
     def want_event_subscribe(self):
         return True
@@ -835,8 +836,19 @@ class ProjectDirectory(Directory):
                     self.project_object = self.api.users().get(
                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-                contents = arvados.util.list_all(self.api.groups().contents,
-                                                 self.num_retries, uuid=self.project_uuid)
+                contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                           ["group_class", "=", "project"]],
+                                                  limit=1000, order=["modified_at desc"]).execute(num_retries=self.num_retries)["items"]
+                contents.extend(self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid]],
+                                                            limit=1000, order=["modified_at desc"]).execute(num_retries=self.num_retries)["items"])
+                if self._extra:
+                    contents.extend(self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                    ["group_class", "=", "project"],
+                                                                    ["uuid", "in", list(self._extra)]],
+                                                      limit=1000).execute(num_retries=self.num_retries)["items"])
+                    contents.extend(self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                         ["uuid", "in", list(self._extra)]],
+                                                                limit=1000).execute(num_retries=self.num_retries)["items"])
 
             # end with llfuse.lock_released, re-acquire lock
 
@@ -859,7 +871,24 @@ class ProjectDirectory(Directory):
         if k == '.arvados#project':
             return True
         else:
-            return super(ProjectDirectory, self).__contains__(k)
+            if super(ProjectDirectory, self).__contains__(k):
+                return True
+            else:
+                with llfuse.lock_released:
+                    contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                    ["name", "=", k]],
+                                                           limit=1).execute(num_retries=self.num_retries)["items"]
+                    if not contents:
+                        contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                   ["group_class", "=", "project"],
+                                                                   ["name", "=", k]],
+                                                          limit=1).execute(num_retries=self.num_retries)["items"]
+                if contents:
+                    self._extra.add(contents[0]["uuid"])
+                    self.invalidate()
+                    return True
+                else:
+                    return False
 
     @use_counter
     @check_update

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list