[ARVADOS] updated: 951149b9258803cb46162ceb78bbec0afae50a04

git at public.curoverse.com git at public.curoverse.com
Mon Feb 2 15:14:37 EST 2015


Summary of changes:
 sdk/python/arvados/api.py              |  28 ++
 sdk/python/arvados/arvfile.py          |  38 ++-
 sdk/python/arvados/collection.py       | 464 +++++++++++++++++++++------------
 services/fuse/arvados_fuse/__init__.py |  32 ---
 4 files changed, 359 insertions(+), 203 deletions(-)

       via  951149b9258803cb46162ceb78bbec0afae50a04 (commit)
      from  438752b8a79d58a615e51bd5ddcbea74b1452a63 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 951149b9258803cb46162ceb78bbec0afae50a04
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Feb 2 15:16:47 2015 -0500

    4823: Refactored Collection into SynchronizedCollectionBase, Subcollection, and
    Collection.  Added clone().  Added method decorators for locking.

diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index 2f1f740..0803db8 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -147,3 +147,31 @@ def api(version=None, cache=True, host=None, token=None, insecure=False, **kwarg
     svc.api_token = token
     kwargs['http'].cache = None
     return svc
+
+class SafeApi(object):
+    """Threadsafe wrapper for API object.  This stores and returns a different api
+    object per thread, because httplib2 which underlies apiclient is not
+    threadsafe.
+    """
+
+    def __init__(self, config=None, keep_params={}):
+        if not config:
+            config = arvados.config.settings()
+        self.host = config.get('ARVADOS_API_HOST')
+        self.api_token = config.get('ARVADOS_API_TOKEN')
+        self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+        self.local = threading.local()
+        self.keep = arvados.KeepClient(api_client=self, **keep_params)
+
+    def localapi(self):
+        if 'api' not in self.local.__dict__:
+            self.local.api = arvados.api('v1', False, self.host,
+                                         self.api_token, self.insecure)
+        return self.local.api
+
+    def __getattr__(self, name):
+        # Proxy nonexistent attributes to the thread-local API client.
+        try:
+            return getattr(self.localapi(), name)
+        except AttributeError:
+            return super(SafeApi, self).__getattr__(name)
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 29db2cb..0e37f3d 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -306,6 +306,29 @@ def _synchronized(orig_func):
             return orig_func(self, *args, **kwargs)
     return wrapper
 
+class NoopLock(object):
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        pass
+
+    def acquire(self, blocking=False):
+        pass
+
+    def release(self):
+        pass
+
+def _must_be_writable(orig_func):
+    # Decorator for methods that read actual Collection data.
+    @functools.wraps(orig_func)
+    def wrapper(self, *args, **kwargs):
+        if self.sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+            raise IOError((errno.EROFS, "Collection is read only"))
+        return orig_func(self, *args, **kwargs)
+    return wrapper
+
+
 class BlockManager(object):
     """
     BlockManager handles buffer blocks, background block uploads, and
@@ -528,17 +551,23 @@ class ArvadosFile(object):
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
-        self.lock = threading.Lock()
+        if parent.sync_mode() == SYNC_READONLY:
+            self.lock = NoopLock()
+        else:
+            self.lock = threading.Lock()
+
+    def sync_mode(self):
+        return self.parent.sync_mode()
 
     @_synchronized
     def segments(self):
         return copy.copy(self._segments)
 
     @_synchronized
-    def clone(self, num_retries):
+    def clone(self, new_parent):
         """Make a copy of this file."""
         cp = ArvadosFile()
-        cp.parent = self.parent
+        cp.parent = new_parent
         cp._modified = False
 
         map_loc = {}
@@ -563,6 +592,7 @@ class ArvadosFile(object):
         """Test the modified flag"""
         return self._modified
 
+    @_must_be_writable
     @_synchronized
     def truncate(self, size):
         """
@@ -634,6 +664,7 @@ class ArvadosFile(object):
 
             self._current_bblock = new_bb
 
+    @_must_be_writable
     @_synchronized
     def writeto(self, offset, data, num_retries):
         """
@@ -663,6 +694,7 @@ class ArvadosFile(object):
         self._current_bblock.append(data)
         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
 
+    @_must_be_writable
     @_synchronized
     def add_segment(self, blocks, pos, size):
         # Synchronized public api, see _add_segment
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index c09520e..24362cd 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -152,6 +152,7 @@ class CollectionReader(CollectionBase):
                          for sline in self._manifest_text.split("\n")
                          if sline]
 
+    @staticmethod
     def _populate_first(orig_func):
         # Decorator for methods that read actual Collection data.
         @functools.wraps(orig_func)
@@ -640,154 +641,34 @@ class ResumableCollectionWriter(CollectionWriter):
         return super(ResumableCollectionWriter, self).write(data)
 
 
-class Collection(CollectionBase):
-    """An abstract Arvados collection, consisting of a set of files and
-    sub-collections.
-    """
-
+class SynchronizedCollectionBase(CollectionBase):
     SYNC_READONLY = 1
     SYNC_EXPLICIT = 2
     SYNC_LIVE = 3
 
-    def __init__(self, manifest_locator_or_text=None,
-                 parent=None,
-                 api_client=None,
-                 keep_client=None,
-                 num_retries=0,
-                 block_manager=None,
-                 sync=Collection.SYNC_READONLY):
-        """:manifest_locator_or_text:
-          One of Arvados collection UUID, block locator of
-          a manifest, raw manifest text, or None (to create an empty collection).
-        :parent:
-          the parent Collection, may be None.
-        :api_client:
-          The API client object to use for requests.  If None, use default.
-        :keep_client:
-          the Keep client to use for requests.  If None, use default.
-        :num_retries:
-          the number of retries for API and Keep requests.
-        :block_manager:
-          the block manager to use.  If None, use parent's block
-          manager or create one.
-        :sync:
-          Desired synchronization policy with API server collection record.
-          :SYNC_READONLY:
-            Collection is read only.  No synchronization.  This mode will
-            also forego locking, which gives better performance.
-          :SYNC_EXPLICIT:
-            Synchronize on explicit request via `merge()` or `save()`
-          :SYNC_LIVE:
-            Synchronize with server in response to background websocket events,
-            on block write, or on file close.
-        """
-
+    def __init__(self, parent=None):
         self.parent = parent
         self._items = None
-        self._api_client = api_client
-        self._keep_client = keep_client
-        self._block_manager = block_manager
-
-        self.num_retries = num_retries
-        self._manifest_locator = None
-        self._manifest_text = None
-        self._api_response = None
-
-        if manifest_locator_or_text:
-            if re.match(util.keep_locator_pattern, manifest_locator_or_text):
-                self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
-                self._manifest_locator = manifest_locator_or_text
-            elif re.match(util.manifest_pattern, manifest_locator_or_text):
-                self._manifest_text = manifest_locator_or_text
-            else:
-                raise errors.ArgumentError(
-                    "Argument to CollectionReader must be a manifest or a collection UUID")
 
     def _my_api(self):
-        if self._api_client is None:
-            if self.parent is not None:
-                return self.parent._my_api()
-            self._api_client = arvados.api('v1')
-            self._keep_client = None  # Make a new one with the new api.
-        return self._api_client
+        raise NotImplementedError()
 
     def _my_keep(self):
-        if self._keep_client is None:
-            if self.parent is not None:
-                return self.parent._my_keep()
-            self._keep_client = KeepClient(api_client=self._my_api(),
-                                           num_retries=self.num_retries)
-        return self._keep_client
+        raise NotImplementedError()
 
     def _my_block_manager(self):
-        if self._block_manager is None:
-            if self.parent is not None:
-                return self.parent._my_block_manager()
-            self._block_manager = BlockManager(self._my_keep())
-        return self._block_manager
+        raise NotImplementedError()
 
-    def _populate_from_api_server(self):
-        # As in KeepClient itself, we must wait until the last
-        # possible moment to instantiate an API client, in order to
-        # avoid tripping up clients that don't have access to an API
-        # server.  If we do build one, make sure our Keep client uses
-        # it.  If instantiation fails, we'll fall back to the except
-        # clause, just like any other Collection lookup
-        # failure. Return an exception, or None if successful.
-        try:
-            self._api_response = self._my_api().collections().get(
-                uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
-            self._manifest_text = self._api_response['manifest_text']
-            return None
-        except Exception as e:
-            return e
-
-    def _populate_from_keep(self):
-        # Retrieve a manifest directly from Keep. This has a chance of
-        # working if [a] the locator includes a permission signature
-        # or [b] the Keep services are operating in world-readable
-        # mode. Return an exception, or None if successful.
-        try:
-            self._manifest_text = self._my_keep().get(
-                self._manifest_locator, num_retries=self.num_retries)
-        except Exception as e:
-            return e
+    def _root_lock(self):
+        raise NotImplementedError()
 
     def _populate(self):
-        self._items = {}
-        if self._manifest_locator is None and self._manifest_text is None:
-            return
-        error_via_api = None
-        error_via_keep = None
-        should_try_keep = ((self._manifest_text is None) and
-                           util.keep_locator_pattern.match(
-                self._manifest_locator))
-        if ((self._manifest_text is None) and
-            util.signed_locator_pattern.match(self._manifest_locator)):
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            error_via_api = self._populate_from_api_server()
-            if error_via_api is not None and not should_try_keep:
-                raise error_via_api
-        if ((self._manifest_text is None) and
-            not error_via_keep and
-            should_try_keep):
-            # Looks like a keep locator, and we didn't already try keep above
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            # Nothing worked!
-            raise arvados.errors.NotFoundError(
-                ("Failed to retrieve collection '{}' " +
-                 "from either API server ({}) or Keep ({})."
-                 ).format(
-                    self._manifest_locator,
-                    error_via_api,
-                    error_via_keep))
-        # populate
-        import_manifest(self._manifest_text, self)
+        raise NotImplementedError()
 
+    def _sync_mode(self):
+        raise NotImplementedError()
+
+    @staticmethod
     def _populate_first(orig_func):
         # Decorator for methods that read actual Collection data.
         @functools.wraps(orig_func)
@@ -797,15 +678,7 @@ class Collection(CollectionBase):
             return orig_func(self, *args, **kwargs)
         return wrapper
 
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_value, traceback):
-        """Support scoped auto-commit in a with: block"""
-        self.save(no_locator=True)
-        if self._block_manager is not None:
-            self._block_manager.stop_threads()
-
+    @arvfile._synchronized
     @_populate_first
     def find(self, path, create=False, create_collection=False):
         """Recursively search the specified file path.  May return either a Collection
@@ -824,6 +697,9 @@ class Collection(CollectionBase):
           component.
 
         """
+        if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+            raise IOError((errno.EROFS, "Collection is read only"))
+
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -835,7 +711,7 @@ class Collection(CollectionBase):
                 if item is None and create:
                     # create new file
                     if create_collection:
-                        item = Collection(parent=self, num_retries=self.num_retries)
+                        item = Subcollection(self)
                     else:
                         item = ArvadosFile(self)
                     self._items[p[0]] = item
@@ -843,24 +719,13 @@ class Collection(CollectionBase):
             else:
                 if item is None and create:
                     # create new collection
-                    item = Collection(parent=self, num_retries=self.num_retries)
+                    item = Subcollection(self)
                     self._items[p[0]] = item
                 del p[0]
                 return item.find("/".join(p), create=create)
         else:
             return self
 
-    @_populate_first
-    def api_response(self):
-        """
-        api_response() -> dict or None
-
-        Returns information about this Collection fetched from the API server.
-        If the Collection exists in Keep but not the API server, currently
-        returns None.  Future versions may provide a synthetic response.
-        """
-        return self._api_response
-
     def open(self, path, mode):
         """Open a file-like object for access.
 
@@ -884,6 +749,9 @@ class Collection(CollectionBase):
             raise ArgumentError("Bad mode '%s'" % mode)
         create = (mode != "r")
 
+        if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+            raise IOError((errno.EROFS, "Collection is read only"))
+
         f = self.find(path, create=create)
         if f is None:
             raise IOError((errno.ENOENT, "File not found"))
@@ -898,6 +766,7 @@ class Collection(CollectionBase):
         else:
             return ArvadosFileWriter(f, path, mode)
 
+    @arvfile._synchronized
     @_populate_first
     def modified(self):
         """Test if the collection (or any subcollection or file) has been modified
@@ -907,69 +776,86 @@ class Collection(CollectionBase):
                 return True
         return False
 
+    @arvfile._synchronized
     @_populate_first
     def set_unmodified(self):
         """Recursively clear modified flag"""
         for k,v in self._items.items():
             v.set_unmodified()
 
+    @arvfile._synchronized
     @_populate_first
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
-        return self._items.iterkeys()
+        return self._items.keys()
 
+    @arvfile._synchronized
     @_populate_first
     def iterkeys(self):
         """Iterate over names of files and collections directly contained in this collection."""
-        return self._items.iterkeys()
+        return self._items.keys()
 
+    @arvfile._synchronized
     @_populate_first
     def __getitem__(self, k):
-        """Get a file or collection that is directly contained by this collection.  Use
-        find() for path serach."""
+        """Get a file or collection that is directly contained by this collection.  If
+        you want to search a path, use `find()` instead.
+        """
         return self._items[k]
 
+    @arvfile._synchronized
     @_populate_first
     def __contains__(self, k):
         """If there is a file or collection a directly contained by this collection
         with name "k"."""
         return k in self._items
 
+    @arvfile._synchronized
     @_populate_first
     def __len__(self):
         """Get the number of items directly contained in this collection"""
         return len(self._items)
 
+    @_must_be_writable
+    @arvfile._synchronized
     @_populate_first
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
 
+    @arvfile._synchronized
     @_populate_first
     def keys(self):
         """Get a list of names of files and collections directly contained in this collection."""
         return self._items.keys()
 
+    @arvfile._synchronized
     @_populate_first
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
         return self._items.values()
 
+    @arvfile._synchronized
     @_populate_first
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
         return self._items.items()
 
-    @_populate_first
     def exists(self, path):
         """Test if there is a file or collection at "path" """
         return self.find(path) != None
 
+    @_must_be_writable
+    @arvfile._synchronized
     @_populate_first
-    def remove(self, path):
-        """Test if there is a file or collection at "path" """
+    def remove(self, path, rm_r=False):
+        """Remove the file or subcollection (directory) at `path`.
+        :rm_r:
+          Specify whether to remove non-empty subcollections (True), or raise an error (False).
+        """
         p = path.split("/")
         if p[0] == '.':
+            # Remove '.' from the front of the path
             del p[0]
 
         if len(p) > 0:
@@ -977,6 +863,8 @@ class Collection(CollectionBase):
             if item is None:
                 raise IOError((errno.ENOENT, "File not found"))
             if len(p) == 1:
+                if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r:
+                    raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
                 del self._items[p[0]]
             else:
                 del p[0]
@@ -984,6 +872,14 @@ class Collection(CollectionBase):
         else:
             raise IOError((errno.ENOENT, "File not found"))
 
+    def _cloneinto(self, target):
+        for k,v in self._items:
+            target._items[k] = v.clone(new_parent=target)
+
+    def clone(self):
+        raise NotImplementedError()
+
+    @arvfile._synchronized
     @_populate_first
     def manifest_text(self, strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
@@ -1012,14 +908,210 @@ class Collection(CollectionBase):
         stripped = self.manifest_text(strip=True)
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
+
+class Collection(SynchronizedCollectionBase):
+    """Store an Arvados collection, consisting of a set of files and
+    sub-collections.
+    """
+
+    def __init__(self, manifest_locator_or_text=None,
+                 parent=None,
+                 config=None,
+                 api_client=None,
+                 keep_client=None,
+                 num_retries=0,
+                 block_manager=None,
+                 sync=Collection.SYNC_READONLY):
+        """:manifest_locator_or_text:
+          One of Arvados collection UUID, block locator of
+          a manifest, raw manifest text, or None (to create an empty collection).
+        :parent:
+          the parent Collection, may be None.
+        :config:
+          the arvados configuration to get the hostname and api token.
+          Prefer this over supplying your own api_client and keep_client (except in testing).
+          Will use default config settings if not specified.
+        :api_client:
+          The API client object to use for requests.  If not specified, create one using `config`.
+        :keep_client:
+          the Keep client to use for requests.  If not specified, create one using `config`.
+        :num_retries:
+          the number of retries for API and Keep requests.
+        :block_manager:
+          the block manager to use.  If not specified, create one.
+        :sync:
+          Set synchronization policy with API server collection record.
+          :SYNC_READONLY:
+            Collection is read only.  No synchronization.  This mode will
+            also forego locking, which gives better performance.
+          :SYNC_EXPLICIT:
+            Synchronize on explicit request via `merge()` or `save()`
+          :SYNC_LIVE:
+            Synchronize with server in response to background websocket events,
+            on block write, or on file close.
+
+        """
+
+        self.parent = parent
+        self._items = None
+        self._api_client = api_client
+        self._keep_client = keep_client
+        self._block_manager = block_manager
+        self._config = config
+        self.num_retries = num_retries
+        self._manifest_locator = None
+        self._manifest_text = None
+        self._api_response = None
+        self._sync = sync
+        self.lock = threading.RLock()
+
+        if manifest_locator_or_text:
+            if re.match(util.keep_locator_pattern, manifest_locator_or_text):
+                self._manifest_locator = manifest_locator_or_text
+            elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+                self._manifest_locator = manifest_locator_or_text
+            elif re.match(util.manifest_pattern, manifest_locator_or_text):
+                self._manifest_text = manifest_locator_or_text
+            else:
+                raise errors.ArgumentError(
+                    "Argument to CollectionReader must be a manifest or a collection UUID")
+
+    def _root_lock(self):
+        return self.lock
+
+    def sync_mode(self):
+        return self._sync
+
+    @arvfile._synchronized
+    def _my_api(self):
+        if self._api_client is None:
+            self._api_client = arvados.api.SafeApi(self._config)
+            self._keep_client = self._api_client.keep
+        return self._api_client
+
+    @arvfile._synchronized
+    def _my_keep(self):
+        if self._keep_client is None:
+            if self._api_client is None:
+                self._my_api()
+            else:
+                self._keep_client = KeepClient(api=self._api_client)
+        return self._keep_client
+
+    @arvfile._synchronized
+    def _my_block_manager(self):
+        if self._block_manager is None:
+            self._block_manager = BlockManager(self._my_keep())
+        return self._block_manager
+
+    def _populate_from_api_server(self):
+        # As in KeepClient itself, we must wait until the last
+        # possible moment to instantiate an API client, in order to
+        # avoid tripping up clients that don't have access to an API
+        # server.  If we do build one, make sure our Keep client uses
+        # it.  If instantiation fails, we'll fall back to the except
+        # clause, just like any other Collection lookup
+        # failure. Return an exception, or None if successful.
+        try:
+            self._api_response = self._my_api().collections().get(
+                uuid=self._manifest_locator).execute(
+                    num_retries=self.num_retries)
+            self._manifest_text = self._api_response['manifest_text']
+            return None
+        except Exception as e:
+            return e
+
+    def _populate_from_keep(self):
+        # Retrieve a manifest directly from Keep. This has a chance of
+        # working if [a] the locator includes a permission signature
+        # or [b] the Keep services are operating in world-readable
+        # mode. Return an exception, or None if successful.
+        try:
+            self._manifest_text = self._my_keep().get(
+                self._manifest_locator, num_retries=self.num_retries)
+        except Exception as e:
+            return e
+
+    def _populate(self):
+        self._items = {}
+        if self._manifest_locator is None and self._manifest_text is None:
+            return
+        error_via_api = None
+        error_via_keep = None
+        should_try_keep = ((self._manifest_text is None) and
+                           util.keep_locator_pattern.match(
+                self._manifest_locator))
+        if ((self._manifest_text is None) and
+            util.signed_locator_pattern.match(self._manifest_locator)):
+            error_via_keep = self._populate_from_keep()
+        if self._manifest_text is None:
+            error_via_api = self._populate_from_api_server()
+            if error_via_api is not None and not should_try_keep:
+                raise error_via_api
+        if ((self._manifest_text is None) and
+            not error_via_keep and
+            should_try_keep):
+            # Looks like a keep locator, and we didn't already try keep above
+            error_via_keep = self._populate_from_keep()
+        if self._manifest_text is None:
+            # Nothing worked!
+            raise arvados.errors.NotFoundError(
+                ("Failed to retrieve collection '{}' " +
+                 "from either API server ({}) or Keep ({})."
+                 ).format(
+                    self._manifest_locator,
+                    error_via_api,
+                    error_via_keep))
+        # populate
+        import_manifest(self._manifest_text, self)
+
+        if self._sync == SYNC_READONLY:
+            # Now that we're populated, knowing that this will be readonly,
+            # forego any further locking.
+            self.lock = NoopLock()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        """Support scoped auto-commit in a with: block"""
+        self.save(allow_no_locator=True)
+        if self._block_manager is not None:
+            self._block_manager.stop_threads()
+
+    @arvfile._synchronized
     @_populate_first
-    def save(self, no_locator=False):
+    def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
+        c = Collection(parent=new_parent, config=new_config, sync=new_sync)
+        if new_sync == Collection.SYNC_READONLY:
+            c.lock = NoopLock()
+        c._items = {}
+        self._cloneinto(c)
+        return c
+
+    @arvfile._synchronized
+    @_populate_first
+    def api_response(self):
+        """
+        api_response() -> dict or None
+
+        Returns information about this Collection fetched from the API server.
+        If the Collection exists in Keep but not the API server, currently
+        returns None.  Future versions may provide a synthetic response.
+        """
+        return self._api_response
+
+    @_must_be_writable
+    @arvfile._synchronized
+    @_populate_first
+    def save(self, allow_no_locator=False):
         """Commit pending buffer blocks to Keep, write the manifest to Keep, and
         update the collection record to Keep.
 
-        :no_locator:
-          If False and there is no collection uuid associated with
-          this Collection, raise an error.  If True, do not raise an error.
+        :allow_no_locator:
+          If there is no collection uuid associated with this
+          Collection and `allow_no_locator` is False, raise an error.  If True,
+          do not raise an error.
         """
         if self.modified():
             self._my_block_manager().commit_all()
@@ -1030,10 +1122,12 @@ class Collection(CollectionBase):
                     body={'manifest_text': self.manifest_text(strip=False)}
                     ).execute(
                         num_retries=self.num_retries)
-            elif not no_locator:
+            elif not allow_no_locator:
                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
             self.set_unmodified()
 
+    @_must_be_writable
+    @arvfile._synchronized
     @_populate_first
     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
         """Save a new collection record.
@@ -1062,6 +1156,40 @@ class Collection(CollectionBase):
         self.set_unmodified()
 
 
+class Subcollection(SynchronizedCollectionBase):
+    """This is a subdirectory within a collection that doesn't have its own API
+    server record.  It falls under the umbrella of the root collection."""
+
+    def __init__(self, parent):
+        super(Subcollection, self).__init__(parent)
+        self.lock = parent._root_lock()
+
+    def _root_lock():
+        return self.parent._root_lock()
+
+    def sync_mode(self):
+        return self.parent.sync_mode()
+
+    def _my_api(self):
+        return self.parent._my_api()
+
+    def _my_keep(self):
+        return self.parent._my_keep()
+
+    def _my_block_manager(self):
+        return self.parent._my_block_manager()
+
+    def _populate(self):
+        self.parent._populate()
+
+    @arvfile._synchronized
+    @_populate_first
+    def clone(self, new_parent):
+        c = Subcollection(parent=new_parent)
+        c._items = {}
+        self._cloneinto(c)
+        return c
+
 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
     """Import a manifest into a `Collection`.
 
@@ -1150,7 +1278,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
       If False, use block locators as-is.
     """
     buf = ""
-    if isinstance(item, Collection):
+    if isinstance(item, SynchronizedCollectionBase):
         stream = {}
         sorted_keys = sorted(item.keys())
         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
@@ -1168,7 +1296,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
         if stream:
             buf += ' '.join(normalize_stream(stream_name, stream))
             buf += "\n"
-        for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
+        for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
     elif isinstance(item, ArvadosFile):
         st = []
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 9bd954c..8ab2e8d 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -30,38 +30,6 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 # appear as underscores in the fuse mount.)
 _disallowed_filename_characters = re.compile('[\x00/]')
 
-class SafeApi(object):
-    '''Threadsafe wrapper for API object.  This stores and returns a different api
-    object per thread, because httplib2 which underlies apiclient is not
-    threadsafe.
-    '''
-
-    def __init__(self, config):
-        self.host = config.get('ARVADOS_API_HOST')
-        self.api_token = config.get('ARVADOS_API_TOKEN')
-        self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
-        self.local = threading.local()
-        self.block_cache = arvados.KeepBlockCache()
-
-    def localapi(self):
-        if 'api' not in self.local.__dict__:
-            self.local.api = arvados.api('v1', False, self.host,
-                                         self.api_token, self.insecure)
-        return self.local.api
-
-    def localkeep(self):
-        if 'keep' not in self.local.__dict__:
-            self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
-        return self.local.keep
-
-    def __getattr__(self, name):
-        # Proxy nonexistent attributes to the local API client.
-        try:
-            return getattr(self.localapi(), name)
-        except AttributeError:
-            return super(SafeApi, self).__getattr__(name)
-
-
 def convertTime(t):
     '''Parse Arvados timestamp to unix time.'''
     try:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list