[ARVADOS] updated: b25cbdbd1b54a622f4428e29ccffeb165ef29b45
Git user
git at public.curoverse.com
Mon Jul 3 16:28:33 EDT 2017
Summary of changes:
sdk/python/arvados/keep.py | 35 ++++++-----
sdk/python/arvados/util.py | 8 +--
services/fuse/arvados_fuse/command.py | 16 +++--
services/fuse/arvados_fuse/fusedir.py | 58 ++++++++++---------
services/fuse/tests/test_mount.py | 106 ++++++++++++++++------------------
5 files changed, 117 insertions(+), 106 deletions(-)
via b25cbdbd1b54a622f4428e29ccffeb165ef29b45 (commit)
from 62f1bc50cd623e1fb2e62b68db46aa40ede5520d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b25cbdbd1b54a622f4428e29ccffeb165ef29b45
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jul 3 16:24:08 2017 -0400
11158: Fixes & test updates for ProjectDirectory.
* list_all() orders by uuid and uses it for paging.
* Move dynamic item lookup from __contains__ to __getitem__ to fix some tests.
* Remove test that check for pipeline objects because ProjectDirectory doesn't
create them any more.
* FuseSharedTest now runs in a multiprocess worker to avoid deadlock.
* Tweak local_store_get used in testing to raise NotFoundError for consistency
with real get().
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e6e93f0..4103b30 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,6 +27,7 @@ import sys
import threading
from . import timer
import urllib.parse
+import errno
if sys.version_info >= (3, 0):
from io import BytesIO
@@ -540,7 +541,7 @@ class KeepClient(object):
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
+
class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
@@ -551,19 +552,19 @@ class KeepClient(object):
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
@@ -612,25 +613,25 @@ class KeepClient(object):
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
@@ -1130,7 +1131,7 @@ class KeepClient(object):
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
@@ -1187,8 +1188,16 @@ class KeepClient(object):
"Invalid data locator: '%s'" % loc_s)
if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
return b''
- with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
- return f.read()
+
+ try:
+ with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+ return f.read()
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
+ else:
+ raise
+
def is_cached(self, locator):
return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 779b416..1ab9996 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -369,17 +369,17 @@ def is_hex(s, *length_args):
def list_all(fn, num_retries=0, **kwargs):
# Default limit to (effectively) api server's MAX_LIMIT
kwargs.setdefault('limit', sys.maxsize)
- kwargs.setdefault('order', 'created_at asc')
+ kwargs.setdefault('order', 'uuid asc')
kwargs.setdefault('count', 'none')
+ addfilters = kwargs.get("filters", [])
items = []
- offset = 0
while True:
- c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
+ c = fn(**kwargs).execute(num_retries=num_retries)
items.extend(c['items'])
if len(c['items']) < c['limit']:
# Didn't return a full page, so we're done.
break
- offset = c['offset'] + len(c['items'])
+ kwargs["filters"] = addfilters + [["uuid", ">", c['items'][-1]['uuid']]]
return items
def ca_certs_path(fallback=httplib2.CA_CERTS):
diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index b3717ff..4dad90c 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -205,12 +205,16 @@ class Mount(object):
self.logger.info("enable write is %s", self.args.enable_write)
def _setup_api(self):
- self.api = arvados.safeapi.ThreadSafeApiCache(
- apiconfig=arvados.config.settings(),
- keep_params={
- 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
- 'num_retries': self.args.retries,
- })
+ try:
+ self.api = arvados.safeapi.ThreadSafeApiCache(
+ apiconfig=arvados.config.settings(),
+ keep_params={
+ 'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
+ 'num_retries': self.args.retries,
+ })
+ except KeyError as e:
+ self.logger.error("Missing environment: %s", e)
+ exit(1)
# Do a sanity check that we have a working arvados host + token.
self.api.users().current().execute()
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index a7be437..2d032bd 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -818,12 +818,10 @@ class ProjectDirectory(Directory):
contents = arvados.util.list_all(self.api.groups().list,
self.num_retries,
filters=[["owner_uuid", "=", self.project_uuid],
- ["group_class", "=", "project"]],
- limit=1000)
+ ["group_class", "=", "project"]])
contents.extend(arvados.util.list_all(self.api.collections().list,
self.num_retries,
- filters=[["owner_uuid", "=", self.project_uuid]],
- limit=1000))
+ filters=[["owner_uuid", "=", self.project_uuid]]))
# end with llfuse.lock_released, re-acquire lock
@@ -832,39 +830,43 @@ class ProjectDirectory(Directory):
samefn,
self.createDirectory)
finally:
- self._full_listing = False
self._updating_lock.release()
@use_counter
@check_update
- def __getitem__(self, item):
- if item == '.arvados#project':
+ def __getitem__(self, k):
+ if k == '.arvados#project':
return self.project_object_file
- else:
- return super(ProjectDirectory, self).__getitem__(item)
+ elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
+ return super(ProjectDirectory, self).__getitem__(k)
+ with llfuse.lock_released:
+ contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["group_class", "=", "project"],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if not contents:
+ contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
+ ["name", "=", k]],
+ limit=1).execute(num_retries=self.num_retries)["items"]
+ if contents:
+ i = contents[0]
+ name = sanitize_filename(self.namefn(i))
+ if name != k:
+ raise KeyError(k)
+ ent = self.createDirectory(i)
+ self._entries[name] = self.inodes.add_entry(ent)
+ return self._entries[name]
+ # Didn't find item
+ raise KeyError(k)
def __contains__(self, k):
if k == '.arvados#project':
return True
- else:
- if super(ProjectDirectory, self).__contains__(k):
- return True
- elif not self._full_listing:
- with llfuse.lock_released:
- contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
- ["group_class", "=", "project"],
- ["name", "=", k]],
- limit=1).execute(num_retries=self.num_retries)["items"]
- if not contents:
- contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
- ["name", "=", k]],
- limit=1).execute(num_retries=self.num_retries)["items"]
- if contents:
- i = contents[0]
- name = sanitize_filename(self.namefn(i))
- ent = self.createDirectory(i)
- self._entries[name] = self.inodes.add_entry(ent)
- return True
+ try:
+ self[k]
+ return True
+ except KeyError:
+ pass
return False
@use_counter
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 225e4b2..ec8868a 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -220,66 +220,62 @@ class FuseTagsUpdateTest(MountTestBase):
attempt(self.assertDirContents, 'fuse_test_tag', [bar_uuid])
+def fuseSharedTestHelper(mounttmp):
+ class Test(unittest.TestCase):
+ def runTest(self):
+ # Double check that we can open and read objects in this folder as a file,
+ # and that its contents are what we expect.
+ baz_path = os.path.join(
+ mounttmp,
+ 'FUSE User',
+ 'FUSE Test Project',
+ 'collection in FUSE project',
+ 'baz')
+ with open(baz_path) as f:
+ self.assertEqual("baz", f.read())
+
+ # check mtime on collection
+ st = os.stat(baz_path)
+ try:
+ mtime = st.st_mtime_ns / 1000000000
+ except AttributeError:
+ mtime = st.st_mtime
+ self.assertEqual(mtime, 1391448174)
+
+ # shared_dirs is a list of the directories exposed
+ # by fuse.SharedDirectory (i.e. any object visible
+ # to the current user)
+ shared_dirs = llfuse.listdir(mounttmp)
+ shared_dirs.sort()
+ self.assertIn('FUSE User', shared_dirs)
+
+ # fuse_user_objs is a list of the objects owned by the FUSE
+ # test user (which present as files in the 'FUSE User'
+ # directory)
+ fuse_user_objs = llfuse.listdir(os.path.join(mounttmp, 'FUSE User'))
+ fuse_user_objs.sort()
+ self.assertEqual(['FUSE Test Project', # project owned by user
+ 'collection #1 owned by FUSE', # collection owned by user
+ 'collection #2 owned by FUSE' # collection owned by user
+ ], fuse_user_objs)
+
+ # test_proj_files is a list of the files in the FUSE Test Project.
+ test_proj_files = llfuse.listdir(os.path.join(mounttmp, 'FUSE User', 'FUSE Test Project'))
+ test_proj_files.sort()
+ self.assertEqual(['collection in FUSE project'
+ ], test_proj_files)
+
+
+ Test().runTest()
+
class FuseSharedTest(MountTestBase):
def runTest(self):
self.make_mount(fuse.SharedDirectory,
exclude=self.api.users().current().execute()['uuid'])
+ keep = arvados.keep.KeepClient()
+ keep.put("baz")
- # shared_dirs is a list of the directories exposed
- # by fuse.SharedDirectory (i.e. any object visible
- # to the current user)
- shared_dirs = llfuse.listdir(self.mounttmp)
- shared_dirs.sort()
- self.assertIn('FUSE User', shared_dirs)
-
- # fuse_user_objs is a list of the objects owned by the FUSE
- # test user (which present as files in the 'FUSE User'
- # directory)
- fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
- fuse_user_objs.sort()
- self.assertEqual(['FUSE Test Project', # project owned by user
- 'collection #1 owned by FUSE', # collection owned by user
- 'collection #2 owned by FUSE', # collection owned by user
- 'pipeline instance owned by FUSE.pipelineInstance', # pipeline instance owned by user
- ], fuse_user_objs)
-
- # test_proj_files is a list of the files in the FUSE Test Project.
- test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
- test_proj_files.sort()
- self.assertEqual(['collection in FUSE project',
- 'pipeline instance in FUSE project.pipelineInstance',
- 'pipeline template in FUSE project.pipelineTemplate'
- ], test_proj_files)
-
- # Double check that we can open and read objects in this folder as a file,
- # and that its contents are what we expect.
- pipeline_template_path = os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'FUSE Test Project',
- 'pipeline template in FUSE project.pipelineTemplate')
- with open(pipeline_template_path) as f:
- j = json.load(f)
- self.assertEqual("pipeline template in FUSE project", j['name'])
-
- # check mtime on template
- st = os.stat(pipeline_template_path)
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1397493304)
-
- # check mtime on collection
- st = os.stat(os.path.join(
- self.mounttmp,
- 'FUSE User',
- 'collection #1 owned by FUSE'))
- try:
- mtime = st.st_mtime_ns / 1000000000
- except AttributeError:
- mtime = st.st_mtime
- self.assertEqual(mtime, 1391448174)
+ self.pool.apply(fuseSharedTestHelper, (self.mounttmp,))
class FuseHomeTest(MountTestBase):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list