[ARVADOS] updated: fef791011864b1bd69dfd01bfcd26c4942d6d5a1

git at public.curoverse.com git at public.curoverse.com
Thu Feb 5 12:09:04 EST 2015


Summary of changes:
 sdk/python/arvados/arvfile.py        |  49 +++++++++-------
 sdk/python/arvados/collection.py     | 107 +++++++++++++++++++----------------
 sdk/python/tests/test_arvfile.py     |   5 +-
 sdk/python/tests/test_collections.py |   5 ++
 4 files changed, 95 insertions(+), 71 deletions(-)

       via  fef791011864b1bd69dfd01bfcd26c4942d6d5a1 (commit)
      from  66f42779127b7e48ef4437b7d4ec471e43f9636b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit fef791011864b1bd69dfd01bfcd26c4942d6d5a1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 5 12:11:14 2015 -0500

    4823: Rename @_synchronized -> @synchronized and @_must_be_writable to @must_be_writable.  Fix arvfile tests.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2dc2b2c..4edecf7 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -300,7 +300,7 @@ class AsyncKeepWriteErrors(Exception):
     def __repr__(self):
         return "\n".join(self.errors)
 
-def _synchronized(orig_func):
+def synchronized(orig_func):
     @functools.wraps(orig_func)
     def wrapper(self, *args, **kwargs):
         with self.lock:
@@ -324,8 +324,7 @@ SYNC_READONLY = 1
 SYNC_EXPLICIT = 2
 SYNC_LIVE = 3
 
-def _must_be_writable(orig_func):
-    # Decorator for methods that read actual Collection data.
+def must_be_writable(orig_func):
     @functools.wraps(orig_func)
     def wrapper(self, *args, **kwargs):
         if self.sync_mode() == SYNC_READONLY:
@@ -353,7 +352,7 @@ class BlockManager(object):
         self.num_put_threads = 2
         self.num_get_threads = 2
 
-    @_synchronized
+    @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         """
         Allocate a new, empty bufferblock in WRITABLE state and return it.
@@ -373,7 +372,7 @@ class BlockManager(object):
         self._bufferblocks[bb.blockid] = bb
         return bb
 
-    @_synchronized
+    @synchronized
     def dup_block(self, blockid, owner):
         """
         Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
@@ -391,11 +390,11 @@ class BlockManager(object):
         self._bufferblocks[bb.blockid] = bb
         return bb
 
-    @_synchronized
+    @synchronized
     def is_bufferblock(self, id):
         return id in self._bufferblocks
 
-    @_synchronized
+    @synchronized
     def stop_threads(self):
         """
         Shut down and wait for background upload and download threads to finish.
@@ -563,19 +562,19 @@ class ArvadosFile(object):
         self.parent = parent
         self._modified = True
         self._segments = []
+        self.lock = parent._root_lock()
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
-        self.lock = parent._root_lock()
 
     def sync_mode(self):
         return self.parent.sync_mode()
 
-    @_synchronized
+    @synchronized
     def segments(self):
         return copy.copy(self._segments)
 
-    @_synchronized
+    @synchronized
     def clone(self, new_parent):
         """Make a copy of this file."""
         cp = ArvadosFile(new_parent)
@@ -592,8 +591,8 @@ class ArvadosFile(object):
 
         return cp
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def replace_contents(self, other):
         """Replace segments of this file with segments from another `ArvadosFile` object."""
         self._segments = other.segments()
@@ -612,18 +611,18 @@ class ArvadosFile(object):
     def __ne__(self, other):
         return not self.__eq__(other)
 
-    @_synchronized
+    @synchronized
     def set_unmodified(self):
         """Clear the modified flag"""
         self._modified = False
 
-    @_synchronized
+    @synchronized
     def modified(self):
         """Test the modified flag"""
         return self._modified
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def truncate(self, size):
         """
         Adjust the size of the file.  If `size` is less than the size of the file,
@@ -650,7 +649,7 @@ class ArvadosFile(object):
         elif size > self.size():
             raise IOError("truncate() does not support extending the file size")
 
-    @_synchronized
+    @synchronized
     def readfrom(self, offset, size, num_retries):
         """
         read upto `size` bytes from the file starting at `offset`.
@@ -694,8 +693,8 @@ class ArvadosFile(object):
 
             self._current_bblock = new_bb
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def writeto(self, offset, data, num_retries):
         """
         Write `data` to the file starting at `offset`.  This will update
@@ -725,20 +724,26 @@ class ArvadosFile(object):
 
         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def add_segment(self, blocks, pos, size):
         """
         Add a segment to the end of the file, with `pos` and `offset` referencing a
         section of the stream described by `blocks` (a list of Range objects)
         """
+        self._add_segment(blocks, pos, size)
+
+    def _add_segment(self, blocks, pos, size):
+        """
+        (Internal version.)
+        """
         self._modified = True
         for lr in locators_and_ranges(blocks, pos, size):
             last = self._segments[-1] if self._segments else Range(0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
             self._segments.append(r)
 
-    @_synchronized
+    @synchronized
     def size(self):
         """Get the file size"""
         if self._segments:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f5ed2f3..b1a3050 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -8,7 +8,7 @@ import time
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
@@ -17,6 +17,7 @@ import config
 import errors
 import util
 import events
+from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
@@ -673,7 +674,7 @@ class SynchronizedCollectionBase(CollectionBase):
     def notify(self, event, collection, name, item):
         raise NotImplementedError()
 
-    @_synchronized
+    @synchronized
     def find(self, path, create=False, create_collection=False):
         """Recursively search the specified file path.  May return either a Collection
         or ArvadosFile.
@@ -768,7 +769,7 @@ class SynchronizedCollectionBase(CollectionBase):
         else:
             return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
 
-    @_synchronized
+    @synchronized
     def modified(self):
         """Test if the collection (or any subcollection or file) has been modified
         since it was created."""
@@ -779,60 +780,60 @@ class SynchronizedCollectionBase(CollectionBase):
                 return True
         return False
 
-    @_synchronized
+    @synchronized
     def set_unmodified(self):
         """Recursively clear modified flag"""
         self._modified = False
         for k,v in self._items.items():
             v.set_unmodified()
 
-    @_synchronized
+    @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
         return self._items.keys().__iter__()
 
-    @_synchronized
+    @synchronized
     def iterkeys(self):
         """Iterate over names of files and collections directly contained in this collection."""
         return self._items.keys()
 
-    @_synchronized
+    @synchronized
     def __getitem__(self, k):
         """Get a file or collection that is directly contained by this collection.  If
         you want to search a path, use `find()` instead.
         """
         return self._items[k]
 
-    @_synchronized
+    @synchronized
     def __contains__(self, k):
         """If there is a file or collection a directly contained by this collection
         with name "k"."""
         return k in self._items
 
-    @_synchronized
+    @synchronized
     def __len__(self):
         """Get the number of items directly contained in this collection"""
         return len(self._items)
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def __delitem__(self, p):
         """Delete an item by name which is directly contained by this collection."""
         del self._items[p]
         self._modified = True
         self.notify(DEL, self, p, None)
 
-    @_synchronized
+    @synchronized
     def keys(self):
         """Get a list of names of files and collections directly contained in this collection."""
         return self._items.keys()
 
-    @_synchronized
+    @synchronized
     def values(self):
         """Get a list of files and collection objects directly contained in this collection."""
         return self._items.values()
 
-    @_synchronized
+    @synchronized
     def items(self):
         """Get a list of (name, object) tuples directly contained in this collection."""
         return self._items.items()
@@ -841,8 +842,8 @@ class SynchronizedCollectionBase(CollectionBase):
         """Test if there is a file or collection at "path" """
         return self.find(path) != None
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def remove(self, path, rm_r=False):
         """Remove the file or subcollection (directory) at `path`.
         :rm_r:
@@ -877,8 +878,8 @@ class SynchronizedCollectionBase(CollectionBase):
     def clone(self):
         raise NotImplementedError()
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def copy(self, source, target_path, source_collection=None, overwrite=False):
         """Copy a file or subcollection to a new path in this collection.
 
@@ -943,7 +944,7 @@ class SynchronizedCollectionBase(CollectionBase):
         else:
             self.notify(ADD, target_dir, target_name, dup)
 
-    @_synchronized
+    @synchronized
     def manifest_text(self, strip=False, normalize=False):
         """Get the manifest text for this collection, sub collections and files.
 
@@ -966,7 +967,7 @@ class SynchronizedCollectionBase(CollectionBase):
             else:
                 return self._manifest_text
 
-    @_synchronized
+    @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
         """
         Generate list of add/modify/delete actions which, when given to `apply`, will
@@ -988,8 +989,8 @@ class SynchronizedCollectionBase(CollectionBase):
                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
         return changes
 
-    @_must_be_writable
-    @_synchronized
+    @must_be_writable
+    @synchronized
     def apply(self, changes):
         """
         Apply changes from `diff`.  If a change conflicts with a local change, it
@@ -1036,7 +1037,7 @@ class SynchronizedCollectionBase(CollectionBase):
         stripped = self.manifest_text(strip=True)
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
-    @_synchronized
+    @synchronized
     def __eq__(self, other):
         if other is self:
             return True
@@ -1090,10 +1091,10 @@ class Collection(SynchronizedCollectionBase):
             Collection is read only.  No synchronization.  This mode will
             also forego locking, which gives better performance.
           :SYNC_EXPLICIT:
-            Synchronize on explicit request via `update()` or `save()`
+            Collection is writable.  Synchronize on explicit request via `update()` or `save()`
           :SYNC_LIVE:
-            Synchronize with server in response to background websocket events,
-            on block write, or on file close.
+            Collection is writable.  Synchronize with server in response to
+            background websocket events, on block write, or on file close.
 
         """
         super(Collection, self).__init__(parent)
@@ -1143,22 +1144,25 @@ class Collection(SynchronizedCollectionBase):
     def on_message(self):
         self.update()
 
-    @_synchronized
-    def update(self, other=None):
+    @synchronized
+    @retry_method
+    def update(self, other=None, num_retries=None):
         if other is None:
-            n = self._my_api().collections().get(uuid=self._manifest_locator).execute()
+            if self._manifest_locator is None:
+                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+            n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
             other = import_collection(n["manifest_text"])
         baseline = import_collection(self._manifest_text)
         self.apply(other.diff(baseline))
 
-    @_synchronized
+    @synchronized
     def _my_api(self):
         if self._api_client is None:
             self._api_client = arvados.SafeApi(self._config)
             self._keep_client = self._api_client.keep
         return self._api_client
 
-    @_synchronized
+    @synchronized
     def _my_keep(self):
         if self._keep_client is None:
             if self._api_client is None:
@@ -1167,7 +1171,7 @@ class Collection(SynchronizedCollectionBase):
                 self._keep_client = KeepClient(api=self._api_client)
         return self._keep_client
 
-    @_synchronized
+    @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
             self._block_manager = BlockManager(self._my_keep())
@@ -1249,7 +1253,7 @@ class Collection(SynchronizedCollectionBase):
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
-    @_synchronized
+    @synchronized
     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
         if new_config is None:
             new_config = self._config
@@ -1260,7 +1264,7 @@ class Collection(SynchronizedCollectionBase):
         self._cloneinto(c)
         return c
 
-    @_synchronized
+    @synchronized
     def api_response(self):
         """
         api_response() -> dict or None
@@ -1271,12 +1275,16 @@ class Collection(SynchronizedCollectionBase):
         """
         return self._api_response
 
-    @_must_be_writable
-    @_synchronized
-    def save(self, allow_no_locator=False):
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save(self, update=True, allow_no_locator=False, num_retries=None):
         """Commit pending buffer blocks to Keep, write the manifest to Keep, and
         update the collection record to Keep.
 
+        :update:
+          Update and merge remote changes before saving back.
+
         :allow_no_locator:
           If there is no collection uuid associated with this
           Collection and `allow_no_locator` is False, raise an error.  If True,
@@ -1284,20 +1292,23 @@ class Collection(SynchronizedCollectionBase):
         """
         if self.modified():
             self._my_block_manager().commit_all()
-            self._my_keep().put(self.manifest_text(strip=True))
+            if update and self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+                self.update()
+            self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
                 self._api_response = self._my_api().collections().update(
                     uuid=self._manifest_locator,
                     body={'manifest_text': self.manifest_text(strip=False)}
                     ).execute(
-                        num_retries=self.num_retries)
+                        num_retries=num_retries)
             elif not allow_no_locator:
                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
             self.set_unmodified()
 
-    @_must_be_writable
-    @_synchronized
-    def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
+    @must_be_writable
+    @synchronized
+    @retry_method
+    def save_as(self, name, owner_uuid=None, ensure_unique_name=False, num_retries=None):
         """Save a new collection record.
 
         :name:
@@ -1314,12 +1325,12 @@ class Collection(SynchronizedCollectionBase):
 
         """
         self._my_block_manager().commit_all()
-        self._my_keep().put(self.manifest_text(strip=True))
+        self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
         body = {"manifest_text": self.manifest_text(strip=False),
                 "name": name}
         if owner_uuid:
             body["owner_uuid"] = owner_uuid
-        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
+        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
 
         if self.events:
             self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
@@ -1331,15 +1342,15 @@ class Collection(SynchronizedCollectionBase):
 
         self.set_unmodified()
 
-    @_synchronized
+    @synchronized
     def subscribe(self, callback):
         self.callbacks.append(callback)
 
-    @_synchronized
+    @synchronized
     def unsubscribe(self, callback):
         self.callbacks.remove(callback)
 
-    @_synchronized
+    @synchronized
     def notify(self, event, collection, name, item):
         for c in self.callbacks:
             c(event, collection, name, item)
@@ -1373,7 +1384,7 @@ class Subcollection(SynchronizedCollectionBase):
     def notify(self, event, collection, name, item):
         self.parent.notify(event, collection, name, item)
 
-    @_synchronized
+    @synchronized
     def clone(self, new_parent):
         c = Subcollection(new_parent)
         c._items = {}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 6e090e4..8c2af7b 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -23,7 +23,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def get(self, locator, num_retries=0, cache_only=False):
             self.requests.append(locator)
             return self.blocks.get(locator)
-        def put(self, data):
+        def put(self, data, num_retries=None):
             pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
             self.blocks[pdh] = str(data)
             return pdh
@@ -340,6 +340,9 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
         def sync_mode(self):
             return SYNC_READONLY
 
+        def _root_lock(self):
+            return arvados.arvfile.NoopLock()
+
     def make_count_reader(self, nocache=False):
         stream = []
         n = 0
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 7880f1e..68c73e1 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1015,6 +1015,11 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         self.assertTrue(re.match(r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$",
                                  c1.manifest_text()))
 
+    def test_notify1(self):
+        c1 = Collection(sync=SYNC_EXPLICIT)
+        events = []
+        c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item)))
+        c1.find("")
 
 if __name__ == '__main__':
     unittest.main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list