[ARVADOS] updated: b25cbdbd1b54a622f4428e29ccffeb165ef29b45

Git user git at public.curoverse.com
Mon Jul 3 16:28:33 EDT 2017


Summary of changes:
 sdk/python/arvados/keep.py            |  35 ++++++-----
 sdk/python/arvados/util.py            |   8 +--
 services/fuse/arvados_fuse/command.py |  16 +++--
 services/fuse/arvados_fuse/fusedir.py |  58 ++++++++++---------
 services/fuse/tests/test_mount.py     | 106 ++++++++++++++++------------------
 5 files changed, 117 insertions(+), 106 deletions(-)

       via  b25cbdbd1b54a622f4428e29ccffeb165ef29b45 (commit)
      from  62f1bc50cd623e1fb2e62b68db46aa40ede5520d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b25cbdbd1b54a622f4428e29ccffeb165ef29b45
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jul 3 16:24:08 2017 -0400

    11158: Fixes & test updates for ProjectDirectory.
    
    * list_all() orders by uuid and uses it for paging.
    
    * Move dynamic item lookup from __contains__ to __getitem__ to fix some tests.
    
    * Remove test that check for pipeline objects because ProjectDirectory doesn't
    create them any more.
    
    * FuseSharedTest now runs in a multiprocess worker to avoid deadlock.
    
    * Tweak local_store_get used in testing to raise NotFoundError for consistency
    with real get().
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e6e93f0..4103b30 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,6 +27,7 @@ import sys
 import threading
 from . import timer
 import urllib.parse
+import errno
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -540,7 +541,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
+
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -551,19 +552,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -612,25 +613,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -1130,7 +1131,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1187,8 +1188,16 @@ class KeepClient(object):
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             return b''
-        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
-            return f.read()
+
+        try:
+            with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+                return f.read()
+        except IOError as e:
+            if e.errno == errno.ENOENT:
+                raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
+            else:
+                raise
+
 
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 779b416..1ab9996 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -369,17 +369,17 @@ def is_hex(s, *length_args):
 def list_all(fn, num_retries=0, **kwargs):
     # Default limit to (effectively) api server's MAX_LIMIT
     kwargs.setdefault('limit', sys.maxsize)
-    kwargs.setdefault('order', 'created_at asc')
+    kwargs.setdefault('order', 'uuid asc')
     kwargs.setdefault('count', 'none')
+    addfilters = kwargs.get("filters", [])
     items = []
-    offset = 0
     while True:
-        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
+        c = fn(**kwargs).execute(num_retries=num_retries)
         items.extend(c['items'])
         if len(c['items']) < c['limit']:
             # Didn't return a full page, so we're done.
             break
-        offset = c['offset'] + len(c['items'])
+        kwargs["filters"] = addfilters + [["uuid", ">", c['items'][-1]['uuid']]]
     return items
 
 def ca_certs_path(fallback=httplib2.CA_CERTS):
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index b3717ff..4dad90c 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -205,12 +205,16 @@ class Mount(object):
         self.logger.info("enable write is %s", self.args.enable_write)
 
     def _setup_api(self):
-        self.api = arvados.safeapi.ThreadSafeApiCache(
-            apiconfig=arvados.config.settings(),
-            keep_params={
-                'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
-                'num_retries': self.args.retries,
-            })
+        try:
+            self.api = arvados.safeapi.ThreadSafeApiCache(
+                apiconfig=arvados.config.settings(),
+                keep_params={
+                    'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+                    'num_retries': self.args.retries,
+                })
+        except KeyError as e:
+            self.logger.error("Missing environment: %s", e)
+            exit(1)
         # Do a sanity check that we have a working arvados host + token.
         self.api.users().current().execute()
 
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index a7be437..2d032bd 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -818,12 +818,10 @@ class ProjectDirectory(Directory):
                 contents = arvados.util.list_all(self.api.groups().list,
                                                  self.num_retries,
                                                  filters=[["owner_uuid", "=", self.project_uuid],
-                                                          ["group_class", "=", "project"]],
-                                                 limit=1000)
+                                                          ["group_class", "=", "project"]])
                 contents.extend(arvados.util.list_all(self.api.collections().list,
                                                       self.num_retries,
-                                                      filters=[["owner_uuid", "=", self.project_uuid]],
-                                                      limit=1000))
+                                                      filters=[["owner_uuid", "=", self.project_uuid]]))
 
             # end with llfuse.lock_released, re-acquire lock
 
@@ -832,39 +830,43 @@ class ProjectDirectory(Directory):
                        samefn,
                        self.createDirectory)
         finally:
-            self._full_listing = False
             self._updating_lock.release()
 
     @use_counter
     @check_update
-    def __getitem__(self, item):
-        if item == '.arvados#project':
+    def __getitem__(self, k):
+        if k == '.arvados#project':
             return self.project_object_file
-        else:
-            return super(ProjectDirectory, self).__getitem__(item)
+        elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+            return super(ProjectDirectory, self).__getitem__(k)
+        with llfuse.lock_released:
+            contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                       ["group_class", "=", "project"],
+                                                       ["name", "=", k]],
+                                              limit=1).execute(num_retries=self.num_retries)["items"]
+            if not contents:
+                contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+                                                                ["name", "=", k]],
+                                                       limit=1).execute(num_retries=self.num_retries)["items"]
+        if contents:
+            i = contents[0]
+            name = sanitize_filename(self.namefn(i))
+            if name != k:
+                raise KeyError(k)
+            ent = self.createDirectory(i)
+            self._entries[name] = self.inodes.add_entry(ent)
+            return self._entries[name]
+        # Didn't find item
+        raise KeyError(k)
 
     def __contains__(self, k):
         if k == '.arvados#project':
             return True
-        else:
-            if super(ProjectDirectory, self).__contains__(k):
-                return True
-            elif not self._full_listing:
-                with llfuse.lock_released:
-                    contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                               ["group_class", "=", "project"],
-                                                               ["name", "=", k]],
-                                                      limit=1).execute(num_retries=self.num_retries)["items"]
-                    if not contents:
-                        contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
-                                                                        ["name", "=", k]],
-                                                               limit=1).execute(num_retries=self.num_retries)["items"]
-                if contents:
-                    i = contents[0]
-                    name = sanitize_filename(self.namefn(i))
-                    ent = self.createDirectory(i)
-                    self._entries[name] = self.inodes.add_entry(ent)
-                    return True
+        try:
+            self[k]
+            return True
+        except KeyError:
+            pass
         return False
 
     @use_counter
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 225e4b2..ec8868a 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -220,66 +220,62 @@ class FuseTagsUpdateTest(MountTestBase):
             attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
 
 
+def fuseSharedTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            # Double check that we can open and read objects in this folder as a file,
+            # and that its contents are what we expect.
+            baz_path = os.path.join(
+                mounttmp,
+                'FUSE User',
+                'FUSE Test Project',
+                'collection in FUSE project',
+                'baz')
+            with open(baz_path) as f:
+                self.assertEqual("baz", f.read())
+
+            # check mtime on collection
+            st = os.stat(baz_path)
+            try:
+                mtime = st.st_mtime_ns / 1000000000
+            except AttributeError:
+                mtime = st.st_mtime
+            self.assertEqual(mtime, 1391448174)
+
+            # shared_dirs is a list of the directories exposed
+            # by fuse.SharedDirectory (i.e. any object visible
+            # to the current user)
+            shared_dirs = llfuse.listdir(mounttmp)
+            shared_dirs.sort()
+            self.assertIn('FUSE User', shared_dirs)
+
+            # fuse_user_objs is a list of the objects owned by the FUSE
+            # test user (which present as files in the 'FUSE User'
+            # directory)
+            fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+            fuse_user_objs.sort()
+            self.assertEqual(['FUSE Test Project',                    # project owned by user
+                              'collection #1 owned by FUSE',          # collection owned by user
+                              'collection #2 owned by FUSE'          # collection owned by user
+                          ], fuse_user_objs)
+
+            # test_proj_files is a list of the files in the FUSE Test Project.
+            test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+            test_proj_files.sort()
+            self.assertEqual(['collection in FUSE project'
+                          ], test_proj_files)
+
+
+    Test().runTest()
+
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.SharedDirectory,
                         exclude=self.api.users().current().execute()['uuid'])
+        keep = arvados.keep.KeepClient()
+        keep.put("baz")
 
-        # shared_dirs is a list of the directories exposed
-        # by fuse.SharedDirectory (i.e. any object visible
-        # to the current user)
-        shared_dirs = llfuse.listdir(self.mounttmp)
-        shared_dirs.sort()
-        self.assertIn('FUSE User', shared_dirs)
-
-        # fuse_user_objs is a list of the objects owned by the FUSE
-        # test user (which present as files in the 'FUSE User'
-        # directory)
-        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
-        fuse_user_objs.sort()
-        self.assertEqual(['FUSE Test Project',                    # project owned by user
-                          'collection #1 owned by FUSE',          # collection owned by user
-                          'collection #2 owned by FUSE',          # collection owned by user
-                          'pipeline instance owned by FUSE.pipelineInstance',  # pipeline instance owned by user
-                      ], fuse_user_objs)
-
-        # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
-        test_proj_files.sort()
-        self.assertEqual(['collection in FUSE project',
-                          'pipeline instance in FUSE project.pipelineInstance',
-                          'pipeline template in FUSE project.pipelineTemplate'
-                      ], test_proj_files)
-
-        # Double check that we can open and read objects in this folder as a file,
-        # and that its contents are what we expect.
-        pipeline_template_path = os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'FUSE Test Project',
-                'pipeline template in FUSE project.pipelineTemplate')
-        with open(pipeline_template_path) as f:
-            j = json.load(f)
-            self.assertEqual("pipeline template in FUSE project", j['name'])
-
-        # check mtime on template
-        st = os.stat(pipeline_template_path)
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1397493304)
-
-        # check mtime on collection
-        st = os.stat(os.path.join(
-                self.mounttmp,
-                'FUSE User',
-                'collection #1 owned by FUSE'))
-        try:
-            mtime = st.st_mtime_ns / 1000000000
-        except AttributeError:
-            mtime = st.st_mtime
-        self.assertEqual(mtime, 1391448174)
+        self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
 
 
 class FuseHomeTest(MountTestBase):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list