[ARVADOS] updated: 6b4405e36bb59ebd4714690eeb8518c4a2fa019b
git at public.curoverse.com
git at public.curoverse.com
Thu Sep 4 10:59:06 EDT 2014
Summary of changes:
sdk/python/arvados/collection.py | 2 +-
services/fuse/arvados_fuse/__init__.py | 172 +++++++++++++++++++++------------
services/fuse/bin/arv-mount | 2 +-
3 files changed, 112 insertions(+), 64 deletions(-)
via 6b4405e36bb59ebd4714690eeb8518c4a2fa019b (commit)
via 0abff82784887a0d8d8f6f4b67972296a2aa7728 (commit)
from b366f855557333cd99fce42ab56af1c66388b18e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 6b4405e36bb59ebd4714690eeb8518c4a2fa019b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Sep 4 10:59:03 2014 -0400
3644: Add threadsafe arvados api access. arv-mount now releases llfuse global
lock when performing network requests.
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 6d55b34..adf6186 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -18,13 +18,45 @@ import json
import logging
import time
import calendar
+import threading
_logger = logging.getLogger('arvados.arvados_fuse')
+class SafeApi(object):
+ '''Threadsafe wrapper for API object. This stores and returns a different api
+ object per thread, because httplib2 which underlies apiclient is not
+ threadsafe.
+ '''
+
+ def __init__(self, config):
+ self.host = config.get('ARVADOS_API_HOST')
+ self.token = config.get('ARVADOS_API_TOKEN')
+ self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+ self.local = threading.local()
+
+ def localapi(self):
+ if 'api' not in self.local.__dict__:
+ self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
+ return self.local.api
+
+ def collections(self):
+ return self.localapi().collections()
+
+ def links(self):
+ return self.localapi().links()
+
+ def groups(self):
+ return self.localapi().groups()
+
+ def users(self):
+ return self.localapi().users()
+
def convertTime(t):
+ '''Parse Arvados timestamp to unix time.'''
return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
def sanitize_filename(dirty):
+ '''Remove troublesome characters from filenames.'''
# http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
if dirty is None:
return None
@@ -259,10 +291,12 @@ class CollectionDirectory(Directory):
try:
if self.collection_object is not None and re.match(r'^[a-f0-9]{32}', self.collection_locator):
return True
- #with llfuse.lock_released:
- new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
- if "portable_data_hash" not in new_collection_object:
- new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
+
+ with llfuse.lock_released:
+ new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
+ if "portable_data_hash" not in new_collection_object:
+ new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
+ # end with llfuse.lock_released, re-acquire lock
if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
self.collection_object = new_collection_object
@@ -290,19 +324,30 @@ class CollectionDirectory(Directory):
cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.ctime(), self.mtime()))
self.fresh()
return True
+ except apiclient.errors.HttpError as e:
+ import pprint
+ pprint.pprint(self.resp.status)
+ if self.resp.status == 404:
+ _logger.warn("arv-mount %s: not found", self.collection_locator)
+ else:
+ _logger.error("arv-mount %s: error", self.collection_locator)
+ _logger.exception(detail)
+ return False
except Exception as detail:
_logger.error("arv-mount %s: error", self.collection_locator)
- _logger.exception(detail)
+ if "manifest_text" in self.collection_object:
+ _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+ _logger.exception(detail)
return False
def __getitem__(self, item):
self.checkupdate()
- if item == '.manifest_text':
+ if item == '.arvados#collection.manifest_text':
if self.manifest_text_file is None:
self.manifest_text_file = StringFile(self.inode, self.collection_object["manifest_text"], self.ctime(), self.mtime())
self.inodes.add_entry(self.manifest_text_file)
return self.manifest_text_file
- elif item == '.portable_data_hash':
+ elif item == '.arvados#collection.portable_data_hash':
if self.pdh_file is None:
self.pdh_file = StringFile(self.inode, self.collection_object["portable_data_hash"], self.ctime(), self.mtime())
self.inodes.add_entry(self.pdh_file)
@@ -311,7 +356,7 @@ class CollectionDirectory(Directory):
return super(CollectionDirectory, self).__getitem__(item)
def __contains__(self, k):
- if k in ('.manifest_text', '.portable_data_hash'):
+ if k in ('.arvados#collection.manifest_text', '.arvados#collection.portable_data_hash'):
return True
else:
return super(CollectionDirectory, self).__contains__(k)
@@ -387,7 +432,8 @@ class TagsDirectory(RecursiveInvalidateDirectory):
self._poll_time = poll_time
def update(self):
- tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
+ with llfuse.lock_released:
+ tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
if "items" in tags:
self.merge(tags['items'],
lambda i: i['name'] if 'name' in i else i['uuid'],
@@ -408,10 +454,11 @@ class TagDirectory(Directory):
self._poll_time = poll_time
def update(self):
- taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
- ['name', '=', self.tag],
- ['head_uuid', 'is_a', 'arvados#collection']],
- select=['head_uuid']).execute()
+ with llfuse.lock_released:
+ taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
+ ['name', '=', self.tag],
+ ['head_uuid', 'is_a', 'arvados#collection']],
+ select=['head_uuid']).execute()
self.merge(taggedcollections['items'],
lambda i: i['head_uuid'],
lambda a, i: a.collection_locator == i['head_uuid'],
@@ -468,17 +515,17 @@ class ProjectDirectory(RecursiveInvalidateDirectory):
return a.uuid == i['uuid'] and not a.stale()
return False
- #with llfuse.lock_released:
- if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid):
- self.project_object = self.api.groups().get(uuid=self.uuid).execute()
- elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid):
- self.project_object = self.api.users().get(uuid=self.uuid).execute()
+ with llfuse.lock_released:
+ if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid):
+ self.project_object = self.api.groups().get(uuid=self.uuid).execute()
+ elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid):
+ self.project_object = self.api.users().get(uuid=self.uuid).execute()
- contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
- # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
- contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
-
- #print contents
+ contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
+ # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
+ contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
+
+ # end with llfuse.lock_released, re-acquire lock
self.merge(contents,
namefn,
@@ -509,44 +556,45 @@ class SharedDirectory(RecursiveInvalidateDirectory):
self._poll_time = poll_time
def update(self):
- #with llfuse.lock_released:
- all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
- objects = {}
- for ob in all_projects:
- objects[ob['uuid']] = ob
-
- roots = []
- root_owners = {}
- for ob in all_projects:
- if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
- roots.append(ob)
- root_owners[ob['owner_uuid']] = True
-
- #with llfuse.lock_released:
- lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
- lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
-
- users = {}
- groups = {}
-
- for l in lusers:
- objects[l["uuid"]] = l
- for l in lgroups:
- objects[l["uuid"]] = l
-
- contents = {}
- for r in root_owners:
- if r in objects:
- obr = objects[r]
- if "name" in obr:
- contents[obr["name"]] = obr
- if "first_name" in obr:
- contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
-
- for r in roots:
- if r['owner_uuid'] not in objects:
- contents[r['name']] = r
-
+ with llfuse.lock_released:
+ all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
+ objects = {}
+ for ob in all_projects:
+ objects[ob['uuid']] = ob
+
+ roots = []
+ root_owners = {}
+ for ob in all_projects:
+ if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
+ roots.append(ob)
+ root_owners[ob['owner_uuid']] = True
+
+ lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
+ lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
+
+ users = {}
+ groups = {}
+
+ for l in lusers:
+ objects[l["uuid"]] = l
+ for l in lgroups:
+ objects[l["uuid"]] = l
+
+ contents = {}
+ for r in root_owners:
+ if r in objects:
+ obr = objects[r]
+ if "name" in obr:
+ contents[obr["name"]] = obr
+ if "first_name" in obr:
+ contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
+
+ for r in roots:
+ if r['owner_uuid'] not in objects:
+ contents[r['name']] = r
+
+ # end with llfuse.lock_released, re-acquire lock
+
try:
self.merge(contents.items(),
lambda i: i[0],
@@ -605,7 +653,7 @@ class Operations(llfuse.Operations):
llfuse has its own global lock which is acquired before calling a request handler,
so request handlers do not run concurrently unless the lock is explicitly released
- with llfuse.lock_released.'''
+ using "with llfuse.lock_released:"'''
def __init__(self, uid, gid):
super(Operations, self).__init__()
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 89acdd6..8fbb45e 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -86,7 +86,7 @@ with "--".
try:
# Create the request handler
operations = Operations(os.getuid(), os.getgid())
- api = arvados.api('v1')
+ api = SafeApi(arvados.config)
usr = api.users().current().execute()
if args.by_hash:
commit 0abff82784887a0d8d8f6f4b67972296a2aa7728
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Sep 4 10:51:04 2014 -0400
3644: Tweak CollectionReader manifest text regular expression to use multiline
match.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a86a1f5..a06a17f 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -101,7 +101,7 @@ class CollectionReader(object):
elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
- elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+ elif re.match(r'((\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
self._manifest_text = manifest_locator_or_text
self._manifest_locator = None
else:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list