[ARVADOS] updated: 951149b9258803cb46162ceb78bbec0afae50a04
git at public.curoverse.com
git at public.curoverse.com
Mon Feb 2 15:14:37 EST 2015
Summary of changes:
sdk/python/arvados/api.py | 28 ++
sdk/python/arvados/arvfile.py | 38 ++-
sdk/python/arvados/collection.py | 464 +++++++++++++++++++++------------
services/fuse/arvados_fuse/__init__.py | 32 ---
4 files changed, 359 insertions(+), 203 deletions(-)
via 951149b9258803cb46162ceb78bbec0afae50a04 (commit)
from 438752b8a79d58a615e51bd5ddcbea74b1452a63 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 951149b9258803cb46162ceb78bbec0afae50a04
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Feb 2 15:16:47 2015 -0500
4823: Refactored Collection into SynchronizedCollectionBase, Subcollection, and
Collection. Added clone(). Added method decorators for locking.
diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index 2f1f740..0803db8 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -147,3 +147,31 @@ def api(version=None, cache=True, host=None, token=None, insecure=False, **kwarg
svc.api_token = token
kwargs['http'].cache = None
return svc
+
+class SafeApi(object):
+ """Threadsafe wrapper for API object. This stores and returns a different api
+ object per thread, because httplib2 which underlies apiclient is not
+ threadsafe.
+ """
+
+ def __init__(self, config=None, keep_params={}):
+ if not config:
+ config = arvados.config.settings()
+ self.host = config.get('ARVADOS_API_HOST')
+ self.api_token = config.get('ARVADOS_API_TOKEN')
+ self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+ self.local = threading.local()
+ self.keep = arvados.KeepClient(api_client=self, **keep_params)
+
+ def localapi(self):
+ if 'api' not in self.local.__dict__:
+ self.local.api = arvados.api('v1', False, self.host,
+ self.api_token, self.insecure)
+ return self.local.api
+
+ def __getattr__(self, name):
+ # Proxy nonexistent attributes to the thread-local API client.
+ try:
+ return getattr(self.localapi(), name)
+ except AttributeError:
+ return super(SafeApi, self).__getattr__(name)
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 29db2cb..0e37f3d 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -306,6 +306,29 @@ def _synchronized(orig_func):
return orig_func(self, *args, **kwargs)
return wrapper
+class NoopLock(object):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ def acquire(self, blocking=False):
+ pass
+
+ def release(self):
+ pass
+
+def _must_be_writable(orig_func):
+ # Decorator for methods that read actual Collection data.
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ if self.sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+ raise IOError((errno.EROFS, "Collection is read only"))
+ return orig_func(self, *args, **kwargs)
+ return wrapper
+
+
class BlockManager(object):
"""
BlockManager handles buffer blocks, background block uploads, and
@@ -528,17 +551,23 @@ class ArvadosFile(object):
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
- self.lock = threading.Lock()
+ if parent.sync_mode() == SYNC_READONLY:
+ self.lock = NoopLock()
+ else:
+ self.lock = threading.Lock()
+
+ def sync_mode(self):
+ return self.parent.sync_mode()
@_synchronized
def segments(self):
return copy.copy(self._segments)
@_synchronized
- def clone(self, num_retries):
+ def clone(self, new_parent):
"""Make a copy of this file."""
cp = ArvadosFile()
- cp.parent = self.parent
+ cp.parent = new_parent
cp._modified = False
map_loc = {}
@@ -563,6 +592,7 @@ class ArvadosFile(object):
"""Test the modified flag"""
return self._modified
+ @_must_be_writable
@_synchronized
def truncate(self, size):
"""
@@ -634,6 +664,7 @@ class ArvadosFile(object):
self._current_bblock = new_bb
+ @_must_be_writable
@_synchronized
def writeto(self, offset, data, num_retries):
"""
@@ -663,6 +694,7 @@ class ArvadosFile(object):
self._current_bblock.append(data)
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ @_must_be_writable
@_synchronized
def add_segment(self, blocks, pos, size):
# Synchronized public api, see _add_segment
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index c09520e..24362cd 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -152,6 +152,7 @@ class CollectionReader(CollectionBase):
for sline in self._manifest_text.split("\n")
if sline]
+ @staticmethod
def _populate_first(orig_func):
# Decorator for methods that read actual Collection data.
@functools.wraps(orig_func)
@@ -640,154 +641,34 @@ class ResumableCollectionWriter(CollectionWriter):
return super(ResumableCollectionWriter, self).write(data)
-class Collection(CollectionBase):
- """An abstract Arvados collection, consisting of a set of files and
- sub-collections.
- """
-
+class SynchronizedCollectionBase(CollectionBase):
SYNC_READONLY = 1
SYNC_EXPLICIT = 2
SYNC_LIVE = 3
- def __init__(self, manifest_locator_or_text=None,
- parent=None,
- api_client=None,
- keep_client=None,
- num_retries=0,
- block_manager=None,
- sync=Collection.SYNC_READONLY):
- """:manifest_locator_or_text:
- One of Arvados collection UUID, block locator of
- a manifest, raw manifest text, or None (to create an empty collection).
- :parent:
- the parent Collection, may be None.
- :api_client:
- The API client object to use for requests. If None, use default.
- :keep_client:
- the Keep client to use for requests. If None, use default.
- :num_retries:
- the number of retries for API and Keep requests.
- :block_manager:
- the block manager to use. If None, use parent's block
- manager or create one.
- :sync:
- Desired synchronization policy with API server collection record.
- :SYNC_READONLY:
- Collection is read only. No synchronization. This mode will
- also forego locking, which gives better performance.
- :SYNC_EXPLICIT:
- Synchronize on explicit request via `merge()` or `save()`
- :SYNC_LIVE:
- Synchronize with server in response to background websocket events,
- on block write, or on file close.
- """
-
+ def __init__(self, parent=None):
self.parent = parent
self._items = None
- self._api_client = api_client
- self._keep_client = keep_client
- self._block_manager = block_manager
-
- self.num_retries = num_retries
- self._manifest_locator = None
- self._manifest_text = None
- self._api_response = None
-
- if manifest_locator_or_text:
- if re.match(util.keep_locator_pattern, manifest_locator_or_text):
- self._manifest_locator = manifest_locator_or_text
- elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
- self._manifest_locator = manifest_locator_or_text
- elif re.match(util.manifest_pattern, manifest_locator_or_text):
- self._manifest_text = manifest_locator_or_text
- else:
- raise errors.ArgumentError(
- "Argument to CollectionReader must be a manifest or a collection UUID")
def _my_api(self):
- if self._api_client is None:
- if self.parent is not None:
- return self.parent._my_api()
- self._api_client = arvados.api('v1')
- self._keep_client = None # Make a new one with the new api.
- return self._api_client
+ raise NotImplementedError()
def _my_keep(self):
- if self._keep_client is None:
- if self.parent is not None:
- return self.parent._my_keep()
- self._keep_client = KeepClient(api_client=self._my_api(),
- num_retries=self.num_retries)
- return self._keep_client
+ raise NotImplementedError()
def _my_block_manager(self):
- if self._block_manager is None:
- if self.parent is not None:
- return self.parent._my_block_manager()
- self._block_manager = BlockManager(self._my_keep())
- return self._block_manager
+ raise NotImplementedError()
- def _populate_from_api_server(self):
- # As in KeepClient itself, we must wait until the last
- # possible moment to instantiate an API client, in order to
- # avoid tripping up clients that don't have access to an API
- # server. If we do build one, make sure our Keep client uses
- # it. If instantiation fails, we'll fall back to the except
- # clause, just like any other Collection lookup
- # failure. Return an exception, or None if successful.
- try:
- self._api_response = self._my_api().collections().get(
- uuid=self._manifest_locator).execute(
- num_retries=self.num_retries)
- self._manifest_text = self._api_response['manifest_text']
- return None
- except Exception as e:
- return e
-
- def _populate_from_keep(self):
- # Retrieve a manifest directly from Keep. This has a chance of
- # working if [a] the locator includes a permission signature
- # or [b] the Keep services are operating in world-readable
- # mode. Return an exception, or None if successful.
- try:
- self._manifest_text = self._my_keep().get(
- self._manifest_locator, num_retries=self.num_retries)
- except Exception as e:
- return e
+ def _root_lock(self):
+ raise NotImplementedError()
def _populate(self):
- self._items = {}
- if self._manifest_locator is None and self._manifest_text is None:
- return
- error_via_api = None
- error_via_keep = None
- should_try_keep = ((self._manifest_text is None) and
- util.keep_locator_pattern.match(
- self._manifest_locator))
- if ((self._manifest_text is None) and
- util.signed_locator_pattern.match(self._manifest_locator)):
- error_via_keep = self._populate_from_keep()
- if self._manifest_text is None:
- error_via_api = self._populate_from_api_server()
- if error_via_api is not None and not should_try_keep:
- raise error_via_api
- if ((self._manifest_text is None) and
- not error_via_keep and
- should_try_keep):
- # Looks like a keep locator, and we didn't already try keep above
- error_via_keep = self._populate_from_keep()
- if self._manifest_text is None:
- # Nothing worked!
- raise arvados.errors.NotFoundError(
- ("Failed to retrieve collection '{}' " +
- "from either API server ({}) or Keep ({})."
- ).format(
- self._manifest_locator,
- error_via_api,
- error_via_keep))
- # populate
- import_manifest(self._manifest_text, self)
+ raise NotImplementedError()
+ def _sync_mode(self):
+ raise NotImplementedError()
+
+ @staticmethod
def _populate_first(orig_func):
# Decorator for methods that read actual Collection data.
@functools.wraps(orig_func)
@@ -797,15 +678,7 @@ class Collection(CollectionBase):
return orig_func(self, *args, **kwargs)
return wrapper
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- """Support scoped auto-commit in a with: block"""
- self.save(no_locator=True)
- if self._block_manager is not None:
- self._block_manager.stop_threads()
-
+ @arvfile._synchronized
@_populate_first
def find(self, path, create=False, create_collection=False):
"""Recursively search the specified file path. May return either a Collection
@@ -824,6 +697,9 @@ class Collection(CollectionBase):
component.
"""
+ if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+ raise IOError((errno.EROFS, "Collection is read only"))
+
p = path.split("/")
if p[0] == '.':
del p[0]
@@ -835,7 +711,7 @@ class Collection(CollectionBase):
if item is None and create:
# create new file
if create_collection:
- item = Collection(parent=self, num_retries=self.num_retries)
+ item = Subcollection(self)
else:
item = ArvadosFile(self)
self._items[p[0]] = item
@@ -843,24 +719,13 @@ class Collection(CollectionBase):
else:
if item is None and create:
# create new collection
- item = Collection(parent=self, num_retries=self.num_retries)
+ item = Subcollection(self)
self._items[p[0]] = item
del p[0]
return item.find("/".join(p), create=create)
else:
return self
- @_populate_first
- def api_response(self):
- """
- api_response() -> dict or None
-
- Returns information about this Collection fetched from the API server.
- If the Collection exists in Keep but not the API server, currently
- returns None. Future versions may provide a synthetic response.
- """
- return self._api_response
-
def open(self, path, mode):
"""Open a file-like object for access.
@@ -884,6 +749,9 @@ class Collection(CollectionBase):
raise ArgumentError("Bad mode '%s'" % mode)
create = (mode != "r")
+ if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+ raise IOError((errno.EROFS, "Collection is read only"))
+
f = self.find(path, create=create)
if f is None:
raise IOError((errno.ENOENT, "File not found"))
@@ -898,6 +766,7 @@ class Collection(CollectionBase):
else:
return ArvadosFileWriter(f, path, mode)
+ @arvfile._synchronized
@_populate_first
def modified(self):
"""Test if the collection (or any subcollection or file) has been modified
@@ -907,69 +776,86 @@ class Collection(CollectionBase):
return True
return False
+ @arvfile._synchronized
@_populate_first
def set_unmodified(self):
"""Recursively clear modified flag"""
for k,v in self._items.items():
v.set_unmodified()
+ @arvfile._synchronized
@_populate_first
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return self._items.iterkeys()
+ return self._items.keys()
+ @arvfile._synchronized
@_populate_first
def iterkeys(self):
"""Iterate over names of files and collections directly contained in this collection."""
- return self._items.iterkeys()
+ return self._items.keys()
+ @arvfile._synchronized
@_populate_first
def __getitem__(self, k):
- """Get a file or collection that is directly contained by this collection. Use
- find() for path serach."""
+ """Get a file or collection that is directly contained by this collection. If
+ you want to search a path, use `find()` instead.
+ """
return self._items[k]
+ @arvfile._synchronized
@_populate_first
def __contains__(self, k):
"""If there is a file or collection a directly contained by this collection
with name "k"."""
return k in self._items
+ @arvfile._synchronized
@_populate_first
def __len__(self):
"""Get the number of items directly contained in this collection"""
return len(self._items)
+ @_must_be_writable
+ @arvfile._synchronized
@_populate_first
def __delitem__(self, p):
"""Delete an item by name which is directly contained by this collection."""
del self._items[p]
+ @arvfile._synchronized
@_populate_first
def keys(self):
"""Get a list of names of files and collections directly contained in this collection."""
return self._items.keys()
+ @arvfile._synchronized
@_populate_first
def values(self):
"""Get a list of files and collection objects directly contained in this collection."""
return self._items.values()
+ @arvfile._synchronized
@_populate_first
def items(self):
"""Get a list of (name, object) tuples directly contained in this collection."""
return self._items.items()
- @_populate_first
def exists(self, path):
"""Test if there is a file or collection at "path" """
return self.find(path) != None
+ @_must_be_writable
+ @arvfile._synchronized
@_populate_first
- def remove(self, path):
- """Test if there is a file or collection at "path" """
+ def remove(self, path, rm_r=False):
+ """Remove the file or subcollection (directory) at `path`.
+ :rm_r:
+ Specify whether to remove non-empty subcollections (True), or raise an error (False).
+ """
p = path.split("/")
if p[0] == '.':
+ # Remove '.' from the front of the path
del p[0]
if len(p) > 0:
@@ -977,6 +863,8 @@ class Collection(CollectionBase):
if item is None:
raise IOError((errno.ENOENT, "File not found"))
if len(p) == 1:
+ if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r:
+ raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
del self._items[p[0]]
else:
del p[0]
@@ -984,6 +872,14 @@ class Collection(CollectionBase):
else:
raise IOError((errno.ENOENT, "File not found"))
+ def _cloneinto(self, target):
+ for k,v in self._items:
+ target._items[k] = v.clone(new_parent=target)
+
+ def clone(self):
+ raise NotImplementedError()
+
+ @arvfile._synchronized
@_populate_first
def manifest_text(self, strip=False, normalize=False):
"""Get the manifest text for this collection, sub collections and files.
@@ -1012,14 +908,210 @@ class Collection(CollectionBase):
stripped = self.manifest_text(strip=True)
return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+
+class Collection(SynchronizedCollectionBase):
+ """Store an Arvados collection, consisting of a set of files and
+ sub-collections.
+ """
+
+ def __init__(self, manifest_locator_or_text=None,
+ parent=None,
+ config=None,
+ api_client=None,
+ keep_client=None,
+ num_retries=0,
+ block_manager=None,
+ sync=Collection.SYNC_READONLY):
+ """:manifest_locator_or_text:
+ One of Arvados collection UUID, block locator of
+ a manifest, raw manifest text, or None (to create an empty collection).
+ :parent:
+ the parent Collection, may be None.
+ :config:
+ the arvados configuration to get the hostname and api token.
+ Prefer this over supplying your own api_client and keep_client (except in testing).
+ Will use default config settings if not specified.
+ :api_client:
+ The API client object to use for requests. If not specified, create one using `config`.
+ :keep_client:
+ the Keep client to use for requests. If not specified, create one using `config`.
+ :num_retries:
+ the number of retries for API and Keep requests.
+ :block_manager:
+ the block manager to use. If not specified, create one.
+ :sync:
+ Set synchronization policy with API server collection record.
+ :SYNC_READONLY:
+ Collection is read only. No synchronization. This mode will
+ also forego locking, which gives better performance.
+ :SYNC_EXPLICIT:
+ Synchronize on explicit request via `merge()` or `save()`
+ :SYNC_LIVE:
+ Synchronize with server in response to background websocket events,
+ on block write, or on file close.
+
+ """
+
+ self.parent = parent
+ self._items = None
+ self._api_client = api_client
+ self._keep_client = keep_client
+ self._block_manager = block_manager
+ self._config = config
+ self.num_retries = num_retries
+ self._manifest_locator = None
+ self._manifest_text = None
+ self._api_response = None
+ self._sync = sync
+ self.lock = threading.RLock()
+
+ if manifest_locator_or_text:
+ if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ elif re.match(util.manifest_pattern, manifest_locator_or_text):
+ self._manifest_text = manifest_locator_or_text
+ else:
+ raise errors.ArgumentError(
+ "Argument to CollectionReader must be a manifest or a collection UUID")
+
+ def _root_lock(self):
+ return self.lock
+
+ def sync_mode(self):
+ return self._sync
+
+ @arvfile._synchronized
+ def _my_api(self):
+ if self._api_client is None:
+ self._api_client = arvados.api.SafeApi(self._config)
+ self._keep_client = self._api_client.keep
+ return self._api_client
+
+ @arvfile._synchronized
+ def _my_keep(self):
+ if self._keep_client is None:
+ if self._api_client is None:
+ self._my_api()
+ else:
+ self._keep_client = KeepClient(api=self._api_client)
+ return self._keep_client
+
+ @arvfile._synchronized
+ def _my_block_manager(self):
+ if self._block_manager is None:
+ self._block_manager = BlockManager(self._my_keep())
+ return self._block_manager
+
+ def _populate_from_api_server(self):
+ # As in KeepClient itself, we must wait until the last
+ # possible moment to instantiate an API client, in order to
+ # avoid tripping up clients that don't have access to an API
+ # server. If we do build one, make sure our Keep client uses
+ # it. If instantiation fails, we'll fall back to the except
+ # clause, just like any other Collection lookup
+ # failure. Return an exception, or None if successful.
+ try:
+ self._api_response = self._my_api().collections().get(
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries)
+ self._manifest_text = self._api_response['manifest_text']
+ return None
+ except Exception as e:
+ return e
+
+ def _populate_from_keep(self):
+ # Retrieve a manifest directly from Keep. This has a chance of
+ # working if [a] the locator includes a permission signature
+ # or [b] the Keep services are operating in world-readable
+ # mode. Return an exception, or None if successful.
+ try:
+ self._manifest_text = self._my_keep().get(
+ self._manifest_locator, num_retries=self.num_retries)
+ except Exception as e:
+ return e
+
+ def _populate(self):
+ self._items = {}
+ if self._manifest_locator is None and self._manifest_text is None:
+ return
+ error_via_api = None
+ error_via_keep = None
+ should_try_keep = ((self._manifest_text is None) and
+ util.keep_locator_pattern.match(
+ self._manifest_locator))
+ if ((self._manifest_text is None) and
+ util.signed_locator_pattern.match(self._manifest_locator)):
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ error_via_api = self._populate_from_api_server()
+ if error_via_api is not None and not should_try_keep:
+ raise error_via_api
+ if ((self._manifest_text is None) and
+ not error_via_keep and
+ should_try_keep):
+ # Looks like a keep locator, and we didn't already try keep above
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ # Nothing worked!
+ raise arvados.errors.NotFoundError(
+ ("Failed to retrieve collection '{}' " +
+ "from either API server ({}) or Keep ({})."
+ ).format(
+ self._manifest_locator,
+ error_via_api,
+ error_via_keep))
+ # populate
+ import_manifest(self._manifest_text, self)
+
+ if self._sync == SYNC_READONLY:
+ # Now that we're populated, knowing that this will be readonly,
+ # forego any further locking.
+ self.lock = NoopLock()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Support scoped auto-commit in a with: block"""
+ self.save(allow_no_locator=True)
+ if self._block_manager is not None:
+ self._block_manager.stop_threads()
+
+ @arvfile._synchronized
@_populate_first
- def save(self, no_locator=False):
+ def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
+ c = Collection(parent=new_parent, config=new_config, sync=new_sync)
+ if new_sync == Collection.SYNC_READONLY:
+ c.lock = NoopLock()
+ c._items = {}
+ self._cloneinto(c)
+ return c
+
+ @arvfile._synchronized
+ @_populate_first
+ def api_response(self):
+ """
+ api_response() -> dict or None
+
+ Returns information about this Collection fetched from the API server.
+ If the Collection exists in Keep but not the API server, currently
+ returns None. Future versions may provide a synthetic response.
+ """
+ return self._api_response
+
+ @_must_be_writable
+ @arvfile._synchronized
+ @_populate_first
+ def save(self, allow_no_locator=False):
"""Commit pending buffer blocks to Keep, write the manifest to Keep, and
update the collection record to Keep.
- :no_locator:
- If False and there is no collection uuid associated with
- this Collection, raise an error. If True, do not raise an error.
+ :allow_no_locator:
+ If there is no collection uuid associated with this
+ Collection and `allow_no_locator` is False, raise an error. If True,
+ do not raise an error.
"""
if self.modified():
self._my_block_manager().commit_all()
@@ -1030,10 +1122,12 @@ class Collection(CollectionBase):
body={'manifest_text': self.manifest_text(strip=False)}
).execute(
num_retries=self.num_retries)
- elif not no_locator:
+ elif not allow_no_locator:
raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
self.set_unmodified()
+ @_must_be_writable
+ @arvfile._synchronized
@_populate_first
def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
"""Save a new collection record.
@@ -1062,6 +1156,40 @@ class Collection(CollectionBase):
self.set_unmodified()
+class Subcollection(SynchronizedCollectionBase):
+ """This is a subdirectory within a collection that doesn't have its own API
+ server record. It falls under the umbrella of the root collection."""
+
+ def __init__(self, parent):
+ super(Subcollection, self).__init__(parent)
+ self.lock = parent._root_lock()
+
+ def _root_lock():
+ return self.parent._root_lock()
+
+ def sync_mode(self):
+ return self.parent.sync_mode()
+
+ def _my_api(self):
+ return self.parent._my_api()
+
+ def _my_keep(self):
+ return self.parent._my_keep()
+
+ def _my_block_manager(self):
+ return self.parent._my_block_manager()
+
+ def _populate(self):
+ self.parent._populate()
+
+ @arvfile._synchronized
+ @_populate_first
+ def clone(self, new_parent):
+ c = Subcollection(parent=new_parent)
+ c._items = {}
+ self._cloneinto(c)
+ return c
+
def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
"""Import a manifest into a `Collection`.
@@ -1150,7 +1278,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
If False, use block locators as-is.
"""
buf = ""
- if isinstance(item, Collection):
+ if isinstance(item, SynchronizedCollectionBase):
stream = {}
sorted_keys = sorted(item.keys())
for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
@@ -1168,7 +1296,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
if stream:
buf += ' '.join(normalize_stream(stream_name, stream))
buf += "\n"
- for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
+ for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
elif isinstance(item, ArvadosFile):
st = []
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 9bd954c..8ab2e8d 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -30,38 +30,6 @@ _logger = logging.getLogger('arvados.arvados_fuse')
# appear as underscores in the fuse mount.)
_disallowed_filename_characters = re.compile('[\x00/]')
-class SafeApi(object):
- '''Threadsafe wrapper for API object. This stores and returns a different api
- object per thread, because httplib2 which underlies apiclient is not
- threadsafe.
- '''
-
- def __init__(self, config):
- self.host = config.get('ARVADOS_API_HOST')
- self.api_token = config.get('ARVADOS_API_TOKEN')
- self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
- self.local = threading.local()
- self.block_cache = arvados.KeepBlockCache()
-
- def localapi(self):
- if 'api' not in self.local.__dict__:
- self.local.api = arvados.api('v1', False, self.host,
- self.api_token, self.insecure)
- return self.local.api
-
- def localkeep(self):
- if 'keep' not in self.local.__dict__:
- self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
- return self.local.keep
-
- def __getattr__(self, name):
- # Proxy nonexistent attributes to the local API client.
- try:
- return getattr(self.localapi(), name)
- except AttributeError:
- return super(SafeApi, self).__getattr__(name)
-
-
def convertTime(t):
'''Parse Arvados timestamp to unix time.'''
try:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list