[ARVADOS] updated: fef791011864b1bd69dfd01bfcd26c4942d6d5a1
git at public.curoverse.com
git at public.curoverse.com
Thu Feb 5 12:09:04 EST 2015
Summary of changes:
sdk/python/arvados/arvfile.py | 49 +++++++++-------
sdk/python/arvados/collection.py | 107 +++++++++++++++++++----------------
sdk/python/tests/test_arvfile.py | 5 +-
sdk/python/tests/test_collections.py | 5 ++
4 files changed, 95 insertions(+), 71 deletions(-)
via fef791011864b1bd69dfd01bfcd26c4942d6d5a1 (commit)
from 66f42779127b7e48ef4437b7d4ec471e43f9636b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit fef791011864b1bd69dfd01bfcd26c4942d6d5a1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Feb 5 12:11:14 2015 -0500
4823: Rename @_synchronized -> @synchronized and @_must_be_writable to @must_be_writable. Fix arvfile tests.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2dc2b2c..4edecf7 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -300,7 +300,7 @@ class AsyncKeepWriteErrors(Exception):
def __repr__(self):
return "\n".join(self.errors)
-def _synchronized(orig_func):
+def synchronized(orig_func):
@functools.wraps(orig_func)
def wrapper(self, *args, **kwargs):
with self.lock:
@@ -324,8 +324,7 @@ SYNC_READONLY = 1
SYNC_EXPLICIT = 2
SYNC_LIVE = 3
-def _must_be_writable(orig_func):
- # Decorator for methods that read actual Collection data.
+def must_be_writable(orig_func):
@functools.wraps(orig_func)
def wrapper(self, *args, **kwargs):
if self.sync_mode() == SYNC_READONLY:
@@ -353,7 +352,7 @@ class BlockManager(object):
self.num_put_threads = 2
self.num_get_threads = 2
- @_synchronized
+ @synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
"""
Allocate a new, empty bufferblock in WRITABLE state and return it.
@@ -373,7 +372,7 @@ class BlockManager(object):
self._bufferblocks[bb.blockid] = bb
return bb
- @_synchronized
+ @synchronized
def dup_block(self, blockid, owner):
"""
Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
@@ -391,11 +390,11 @@ class BlockManager(object):
self._bufferblocks[bb.blockid] = bb
return bb
- @_synchronized
+ @synchronized
def is_bufferblock(self, id):
return id in self._bufferblocks
- @_synchronized
+ @synchronized
def stop_threads(self):
"""
Shut down and wait for background upload and download threads to finish.
@@ -563,19 +562,19 @@ class ArvadosFile(object):
self.parent = parent
self._modified = True
self._segments = []
+ self.lock = parent._root_lock()
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
- self.lock = parent._root_lock()
def sync_mode(self):
return self.parent.sync_mode()
- @_synchronized
+ @synchronized
def segments(self):
return copy.copy(self._segments)
- @_synchronized
+ @synchronized
def clone(self, new_parent):
"""Make a copy of this file."""
cp = ArvadosFile(new_parent)
@@ -592,8 +591,8 @@ class ArvadosFile(object):
return cp
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def replace_contents(self, other):
"""Replace segments of this file with segments from another `ArvadosFile` object."""
self._segments = other.segments()
@@ -612,18 +611,18 @@ class ArvadosFile(object):
def __ne__(self, other):
return not self.__eq__(other)
- @_synchronized
+ @synchronized
def set_unmodified(self):
"""Clear the modified flag"""
self._modified = False
- @_synchronized
+ @synchronized
def modified(self):
"""Test the modified flag"""
return self._modified
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def truncate(self, size):
"""
Adjust the size of the file. If `size` is less than the size of the file,
@@ -650,7 +649,7 @@ class ArvadosFile(object):
elif size > self.size():
raise IOError("truncate() does not support extending the file size")
- @_synchronized
+ @synchronized
def readfrom(self, offset, size, num_retries):
"""
read upto `size` bytes from the file starting at `offset`.
@@ -694,8 +693,8 @@ class ArvadosFile(object):
self._current_bblock = new_bb
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def writeto(self, offset, data, num_retries):
"""
Write `data` to the file starting at `offset`. This will update
@@ -725,20 +724,26 @@ class ArvadosFile(object):
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def add_segment(self, blocks, pos, size):
"""
Add a segment to the end of the file, with `pos` and `offset` referencing a
section of the stream described by `blocks` (a list of Range objects)
"""
+ self._add_segment(blocks, pos, size)
+
+ def _add_segment(self, blocks, pos, size):
+ """
+ (Internal version.)
+ """
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
last = self._segments[-1] if self._segments else Range(0, 0, 0)
r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
self._segments.append(r)
- @_synchronized
+ @synchronized
def size(self):
"""Get the file size"""
if self._segments:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f5ed2f3..b1a3050 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -8,7 +8,7 @@ import time
from collections import deque
from stat import *
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
from keep import *
from .stream import StreamReader, normalize_stream, locator_block_size
from .ranges import Range, LocatorAndRange
@@ -17,6 +17,7 @@ import config
import errors
import util
import events
+from arvados.retry import retry_method
_logger = logging.getLogger('arvados.collection')
@@ -673,7 +674,7 @@ class SynchronizedCollectionBase(CollectionBase):
def notify(self, event, collection, name, item):
raise NotImplementedError()
- @_synchronized
+ @synchronized
def find(self, path, create=False, create_collection=False):
"""Recursively search the specified file path. May return either a Collection
or ArvadosFile.
@@ -768,7 +769,7 @@ class SynchronizedCollectionBase(CollectionBase):
else:
return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
- @_synchronized
+ @synchronized
def modified(self):
"""Test if the collection (or any subcollection or file) has been modified
since it was created."""
@@ -779,60 +780,60 @@ class SynchronizedCollectionBase(CollectionBase):
return True
return False
- @_synchronized
+ @synchronized
def set_unmodified(self):
"""Recursively clear modified flag"""
self._modified = False
for k,v in self._items.items():
v.set_unmodified()
- @_synchronized
+ @synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
return self._items.keys().__iter__()
- @_synchronized
+ @synchronized
def iterkeys(self):
"""Iterate over names of files and collections directly contained in this collection."""
return self._items.keys()
- @_synchronized
+ @synchronized
def __getitem__(self, k):
"""Get a file or collection that is directly contained by this collection. If
you want to search a path, use `find()` instead.
"""
return self._items[k]
- @_synchronized
+ @synchronized
def __contains__(self, k):
"""If there is a file or collection a directly contained by this collection
with name "k"."""
return k in self._items
- @_synchronized
+ @synchronized
def __len__(self):
"""Get the number of items directly contained in this collection"""
return len(self._items)
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def __delitem__(self, p):
"""Delete an item by name which is directly contained by this collection."""
del self._items[p]
self._modified = True
self.notify(DEL, self, p, None)
- @_synchronized
+ @synchronized
def keys(self):
"""Get a list of names of files and collections directly contained in this collection."""
return self._items.keys()
- @_synchronized
+ @synchronized
def values(self):
"""Get a list of files and collection objects directly contained in this collection."""
return self._items.values()
- @_synchronized
+ @synchronized
def items(self):
"""Get a list of (name, object) tuples directly contained in this collection."""
return self._items.items()
@@ -841,8 +842,8 @@ class SynchronizedCollectionBase(CollectionBase):
"""Test if there is a file or collection at "path" """
return self.find(path) != None
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def remove(self, path, rm_r=False):
"""Remove the file or subcollection (directory) at `path`.
:rm_r:
@@ -877,8 +878,8 @@ class SynchronizedCollectionBase(CollectionBase):
def clone(self):
raise NotImplementedError()
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def copy(self, source, target_path, source_collection=None, overwrite=False):
"""Copy a file or subcollection to a new path in this collection.
@@ -943,7 +944,7 @@ class SynchronizedCollectionBase(CollectionBase):
else:
self.notify(ADD, target_dir, target_name, dup)
- @_synchronized
+ @synchronized
def manifest_text(self, strip=False, normalize=False):
"""Get the manifest text for this collection, sub collections and files.
@@ -966,7 +967,7 @@ class SynchronizedCollectionBase(CollectionBase):
else:
return self._manifest_text
- @_synchronized
+ @synchronized
def diff(self, end_collection, prefix=".", holding_collection=None):
"""
Generate list of add/modify/delete actions which, when given to `apply`, will
@@ -988,8 +989,8 @@ class SynchronizedCollectionBase(CollectionBase):
changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
return changes
- @_must_be_writable
- @_synchronized
+ @must_be_writable
+ @synchronized
def apply(self, changes):
"""
Apply changes from `diff`. If a change conflicts with a local change, it
@@ -1036,7 +1037,7 @@ class SynchronizedCollectionBase(CollectionBase):
stripped = self.manifest_text(strip=True)
return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
- @_synchronized
+ @synchronized
def __eq__(self, other):
if other is self:
return True
@@ -1090,10 +1091,10 @@ class Collection(SynchronizedCollectionBase):
Collection is read only. No synchronization. This mode will
also forego locking, which gives better performance.
:SYNC_EXPLICIT:
- Synchronize on explicit request via `update()` or `save()`
+ Collection is writable. Synchronize on explicit request via `update()` or `save()`
:SYNC_LIVE:
- Synchronize with server in response to background websocket events,
- on block write, or on file close.
+ Collection is writable. Synchronize with server in response to
+ background websocket events, on block write, or on file close.
"""
super(Collection, self).__init__(parent)
@@ -1143,22 +1144,25 @@ class Collection(SynchronizedCollectionBase):
def on_message(self):
self.update()
- @_synchronized
- def update(self, other=None):
+ @synchronized
+ @retry_method
+ def update(self, other=None, num_retries=None):
if other is None:
- n = self._my_api().collections().get(uuid=self._manifest_locator).execute()
+ if self._manifest_locator is None:
+ raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
+ n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
other = import_collection(n["manifest_text"])
baseline = import_collection(self._manifest_text)
self.apply(other.diff(baseline))
- @_synchronized
+ @synchronized
def _my_api(self):
if self._api_client is None:
self._api_client = arvados.SafeApi(self._config)
self._keep_client = self._api_client.keep
return self._api_client
- @_synchronized
+ @synchronized
def _my_keep(self):
if self._keep_client is None:
if self._api_client is None:
@@ -1167,7 +1171,7 @@ class Collection(SynchronizedCollectionBase):
self._keep_client = KeepClient(api=self._api_client)
return self._keep_client
- @_synchronized
+ @synchronized
def _my_block_manager(self):
if self._block_manager is None:
self._block_manager = BlockManager(self._my_keep())
@@ -1249,7 +1253,7 @@ class Collection(SynchronizedCollectionBase):
if self._block_manager is not None:
self._block_manager.stop_threads()
- @_synchronized
+ @synchronized
def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
if new_config is None:
new_config = self._config
@@ -1260,7 +1264,7 @@ class Collection(SynchronizedCollectionBase):
self._cloneinto(c)
return c
- @_synchronized
+ @synchronized
def api_response(self):
"""
api_response() -> dict or None
@@ -1271,12 +1275,16 @@ class Collection(SynchronizedCollectionBase):
"""
return self._api_response
- @_must_be_writable
- @_synchronized
- def save(self, allow_no_locator=False):
+ @must_be_writable
+ @synchronized
+ @retry_method
+ def save(self, update=True, allow_no_locator=False, num_retries=None):
"""Commit pending buffer blocks to Keep, write the manifest to Keep, and
update the collection record to Keep.
+ :update:
+ Update and merge remote changes before saving back.
+
:allow_no_locator:
If there is no collection uuid associated with this
Collection and `allow_no_locator` is False, raise an error. If True,
@@ -1284,20 +1292,23 @@ class Collection(SynchronizedCollectionBase):
"""
if self.modified():
self._my_block_manager().commit_all()
- self._my_keep().put(self.manifest_text(strip=True))
+ if update and self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+ self.update()
+ self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
self._api_response = self._my_api().collections().update(
uuid=self._manifest_locator,
body={'manifest_text': self.manifest_text(strip=False)}
).execute(
- num_retries=self.num_retries)
+ num_retries=num_retries)
elif not allow_no_locator:
raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
self.set_unmodified()
- @_must_be_writable
- @_synchronized
- def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
+ @must_be_writable
+ @synchronized
+ @retry_method
+ def save_as(self, name, owner_uuid=None, ensure_unique_name=False, num_retries=None):
"""Save a new collection record.
:name:
@@ -1314,12 +1325,12 @@ class Collection(SynchronizedCollectionBase):
"""
self._my_block_manager().commit_all()
- self._my_keep().put(self.manifest_text(strip=True))
+ self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
body = {"manifest_text": self.manifest_text(strip=False),
"name": name}
if owner_uuid:
body["owner_uuid"] = owner_uuid
- self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
+ self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
if self.events:
self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
@@ -1331,15 +1342,15 @@ class Collection(SynchronizedCollectionBase):
self.set_unmodified()
- @_synchronized
+ @synchronized
def subscribe(self, callback):
self.callbacks.append(callback)
- @_synchronized
+ @synchronized
def unsubscribe(self, callback):
self.callbacks.remove(callback)
- @_synchronized
+ @synchronized
def notify(self, event, collection, name, item):
for c in self.callbacks:
c(event, collection, name, item)
@@ -1373,7 +1384,7 @@ class Subcollection(SynchronizedCollectionBase):
def notify(self, event, collection, name, item):
self.parent.notify(event, collection, name, item)
- @_synchronized
+ @synchronized
def clone(self, new_parent):
c = Subcollection(new_parent)
c._items = {}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 6e090e4..8c2af7b 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -23,7 +23,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
def get(self, locator, num_retries=0, cache_only=False):
self.requests.append(locator)
return self.blocks.get(locator)
- def put(self, data):
+ def put(self, data, num_retries=None):
pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
self.blocks[pdh] = str(data)
return pdh
@@ -340,6 +340,9 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
def sync_mode(self):
return SYNC_READONLY
+ def _root_lock(self):
+ return arvados.arvfile.NoopLock()
+
def make_count_reader(self, nocache=False):
stream = []
n = 0
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 7880f1e..68c73e1 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1015,6 +1015,11 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
self.assertTrue(re.match(r"\. 5348b82a029fd9e971a811ce1f71360b\+43 0:10:count1.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$",
c1.manifest_text()))
+ def test_notify1(self):
+ c1 = Collection(sync=SYNC_EXPLICIT)
+ events = []
+ c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item)))
+ c1.find("")
if __name__ == '__main__':
unittest.main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list