[ARVADOS] updated: da63a23ebc1d936305324cbc66d9903b9129df94
git at public.curoverse.com
git at public.curoverse.com
Thu Feb 5 13:44:32 EST 2015
Summary of changes:
sdk/python/arvados/arvfile.py | 44 ++++++++++-----
sdk/python/arvados/collection.py | 116 ++++++++++++++++++++++-----------------
sdk/python/tests/test_arvfile.py | 22 ++++----
3 files changed, 110 insertions(+), 72 deletions(-)
via da63a23ebc1d936305324cbc66d9903b9129df94 (commit)
from fef791011864b1bd69dfd01bfcd26c4942d6d5a1 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit da63a23ebc1d936305324cbc66d9903b9129df94
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Feb 5 13:46:42 2015 -0500
4823: Add root_collection() to make it easier to find the root. Adjusted
semantics of save(), renamed save_as() to save_newC(). Now call save() on
close() when in SYNC_LIVE. Release readfrom() now releases collection lock
before actually fetching blocks.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 4edecf7..acb02a0 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -546,9 +546,10 @@ class BlockManager(object):
class ArvadosFile(object):
- """
- ArvadosFile manages the underlying representation of a file in Keep as a sequence of
- segments spanning a set of blocks, and implements random read/write access.
+ """ArvadosFile manages the underlying representation of a file in Keep as a
+ sequence of segments spanning a set of blocks, and implements random
+ read/write access. This object may be accessed from multiple threads.
+
"""
def __init__(self, parent, stream=[], segments=[]):
@@ -562,7 +563,7 @@ class ArvadosFile(object):
self.parent = parent
self._modified = True
self._segments = []
- self.lock = parent._root_lock()
+ self.lock = parent.root_collection().lock
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
@@ -649,19 +650,21 @@ class ArvadosFile(object):
elif size > self.size():
raise IOError("truncate() does not support extending the file size")
- @synchronized
def readfrom(self, offset, size, num_retries):
"""
read upto `size` bytes from the file starting at `offset`.
"""
- if size == 0 or offset >= self.size():
- return ''
- data = []
+ with self.lock:
+ if size == 0 or offset >= self.size():
+ return ''
+ prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
+ readsegs = locators_and_ranges(self._segments, offset, size)
- for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
+ for lr in prefetch:
self.parent._my_block_manager().block_prefetch(lr.locator)
- for lr in locators_and_ranges(self._segments, offset, size):
+ data = []
+ for lr in readsegs:
d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
if d:
data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
@@ -753,6 +756,11 @@ class ArvadosFile(object):
return 0
class ArvadosFileReader(ArvadosFileReaderBase):
+ """Wraps ArvadosFile in a file-like object supporting reading only. Be aware
+ that this class is NOT thread safe as there is no locking around updating file
+ pointer.
+ """
+
def __init__(self, arvadosfile, name, mode="r", num_retries=None):
super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
self.arvadosfile = arvadosfile
@@ -764,7 +772,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
@retry_method
def read(self, size, num_retries=None):
"""Read up to `size` bytes from the stream, starting at the current file position"""
- data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
self._filepos += len(data)
return data
@@ -779,6 +787,12 @@ class ArvadosFileReader(ArvadosFileReaderBase):
class ArvadosFileWriter(ArvadosFileReader):
+ """Wraps ArvadosFile in a file-like object supporting both reading and writing.
+ Be aware that this class is NOT thread safe as there is no locking around
+ updating file pointer.
+
+ """
+
def __init__(self, arvadosfile, name, mode, num_retries=None):
super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
@@ -786,7 +800,7 @@ class ArvadosFileWriter(ArvadosFileReader):
@retry_method
def write(self, data, num_retries=None):
if self.mode[0] == "a":
- self.arvadosfile.writeto(self.size(), data)
+ self.arvadosfile.writeto(self.size(), data, num_retries)
else:
self.arvadosfile.writeto(self._filepos, data, num_retries)
self._filepos += len(data)
@@ -795,7 +809,7 @@ class ArvadosFileWriter(ArvadosFileReader):
@retry_method
def writelines(self, seq, num_retries=None):
for s in seq:
- self.write(s)
+ self.write(s, num_retries)
def truncate(self, size=None):
if size is None:
@@ -803,3 +817,7 @@ class ArvadosFileWriter(ArvadosFileReader):
self.arvadosfile.truncate(size)
if self._filepos > self.size():
self._filepos = self.size()
+
+ def close(self):
+ if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
+ self.arvadosfile.parent.root_collection().save()
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index b1a3050..42db95e 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -662,15 +662,15 @@ class SynchronizedCollectionBase(CollectionBase):
def _my_block_manager(self):
raise NotImplementedError()
- def _root_lock(self):
- raise NotImplementedError()
-
def _populate(self):
raise NotImplementedError()
def sync_mode(self):
raise NotImplementedError()
+ def root_collection(self):
+ raise NotImplementedError()
+
def notify(self, event, collection, name, item):
raise NotImplementedError()
@@ -1057,7 +1057,7 @@ class SynchronizedCollectionBase(CollectionBase):
class Collection(SynchronizedCollectionBase):
"""Store an Arvados collection, consisting of a set of files and
- sub-collections.
+ sub-collections. This object
"""
def __init__(self, manifest_locator_or_text=None,
@@ -1125,8 +1125,8 @@ class Collection(SynchronizedCollectionBase):
self._populate()
if self._sync == SYNC_LIVE:
- if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
- raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
+ if not self._has_collection_uuid():
+ raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
@staticmethod
@@ -1135,8 +1135,8 @@ class Collection(SynchronizedCollectionBase):
c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
return c
- def _root_lock(self):
- return self.lock
+ def root_collection(self):
+ return self
def sync_mode(self):
return self._sync
@@ -1243,13 +1243,16 @@ class Collection(SynchronizedCollectionBase):
# forego any further locking.
self.lock = NoopLock()
+ def _has_collection_uuid(self):
+ return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Support scoped auto-commit in a with: block"""
- if self._sync != SYNC_READONLY:
- self.save(allow_no_locator=True)
+ if self._sync != SYNC_READONLY and self._has_collection_uuid():
+ self.save()
if self._block_manager is not None:
self._block_manager.stop_threads()
@@ -1278,42 +1281,50 @@ class Collection(SynchronizedCollectionBase):
@must_be_writable
@synchronized
@retry_method
- def save(self, update=True, allow_no_locator=False, num_retries=None):
- """Commit pending buffer blocks to Keep, write the manifest to Keep, and
- update the collection record to Keep.
+ def save(self, merge=True, num_retries=None):
+ """Commit pending buffer blocks to Keep, merge with remote record (if
+ update=True), write the manifest to Keep, and update the collection
+ record. Will raise AssertionError if not associated with a collection
+ record on the API server. If you want to save a manifest to Keep only,
+ see `save_new()`.
:update:
- Update and merge remote changes before saving back.
+ Update and merge remote changes before saving. Otherwise, any
+ remote changes will be ignored and overwritten.
- :allow_no_locator:
- If there is no collection uuid associated with this
- Collection and `allow_no_locator` is False, raise an error. If True,
- do not raise an error.
"""
if self.modified():
+ if not self._has_collection_uuid():
+ raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
self._my_block_manager().commit_all()
- if update and self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+ if merge:
self.update()
self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
- if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
- self._api_response = self._my_api().collections().update(
- uuid=self._manifest_locator,
- body={'manifest_text': self.manifest_text(strip=False)}
- ).execute(
- num_retries=num_retries)
- elif not allow_no_locator:
- raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
+
+ mt = self.manifest_text(strip=False)
+ self._api_response = self._my_api().collections().update(
+ uuid=self._manifest_locator,
+ body={'manifest_text': mt}
+ ).execute(
+ num_retries=num_retries)
+ self._manifest_text = mt
self.set_unmodified()
@must_be_writable
@synchronized
@retry_method
- def save_as(self, name, owner_uuid=None, ensure_unique_name=False, num_retries=None):
- """Save a new collection record.
+ def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+ """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
+ a new collection record (if create_collection_record True). After
+ creating a new collection record, this Collection object will be
+ associated with the new record for `save()` and SYNC_LIVE updates.
:name:
The collection name.
+ :keep_only:
+ Only save the manifest to keep, do not create a collection record.
+
:owner_uuid:
the user, or project uuid that will own this collection.
If None, defaults to the current user.
@@ -1326,20 +1337,28 @@ class Collection(SynchronizedCollectionBase):
"""
self._my_block_manager().commit_all()
self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
- body = {"manifest_text": self.manifest_text(strip=False),
- "name": name}
- if owner_uuid:
- body["owner_uuid"] = owner_uuid
- self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+ mt = self.manifest_text(strip=False)
+
+ if create_collection_record:
+ if name is None:
+ name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
- if self.events:
- self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+ body = {"manifest_text": mt,
+ "name": name}
+ if owner_uuid:
+ body["owner_uuid"] = owner_uuid
- self._manifest_locator = self._api_response["uuid"]
+ self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
- if self.events:
- self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+ if self.events:
+ self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+ self._manifest_locator = self._api_response["uuid"]
+
+ if self.events:
+ self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+
+ self._manifest_text = mt
self.set_unmodified()
@synchronized
@@ -1361,33 +1380,32 @@ class Subcollection(SynchronizedCollectionBase):
def __init__(self, parent):
super(Subcollection, self).__init__(parent)
- self.lock = parent._root_lock()
+ self.lock = self.root_collection().lock
- def _root_lock(self):
- return self.parent._root_lock()
+ def root_collection(self):
+ return self.parent.root_collection()
def sync_mode(self):
- return self.parent.sync_mode()
+ return self.root_collection().sync_mode()
def _my_api(self):
- return self.parent._my_api()
+ return self.root_collection()._my_api()
def _my_keep(self):
- return self.parent._my_keep()
+ return self.root_collection()._my_keep()
def _my_block_manager(self):
- return self.parent._my_block_manager()
+ return self.root_collection()._my_block_manager()
def _populate(self):
- self.parent._populate()
+ self.root_collection()._populate()
def notify(self, event, collection, name, item):
- self.parent.notify(event, collection, name, item)
+ return self.root_collection().notify(event, collection, name, item)
@synchronized
def clone(self, new_parent):
c = Subcollection(new_parent)
- c._items = {}
self._cloneinto(c)
return c
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 8c2af7b..704ec61 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -68,7 +68,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
- c.save_as("test_truncate")
+ c.save_new("test_truncate")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
@@ -91,7 +91,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
self.assertEqual(None, keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
- c.save_as("test_append")
+ c.save_new("test_append")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
self.assertEqual("foo", keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
@@ -171,7 +171,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
- c.save_as("test_write_large")
+ c.save_new("test_write_large")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
@@ -231,7 +231,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
- c.save_as("test_write_large")
+ c.save_new("test_write_large")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
@@ -249,7 +249,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
self.assertEqual(None, keep.get("2e9ec317e197819358fbc43afca7d837+8"))
- c.save_as("test_create")
+ c.save_new("test_create")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
self.assertEqual("01234567", keep.get("2e9ec317e197819358fbc43afca7d837+8"))
@@ -263,7 +263,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
with arvados.Collection(api_client=api, keep_client=keep, sync=SYNC_EXPLICIT) as c:
writer = c.open("foo/bar/count.txt", "w+")
writer.write("01234567")
- c.save_as("test_create")
+ c.save_new("test_create")
def test_overwrite(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
@@ -279,7 +279,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
- c.save_as("test_overwrite")
+ c.save_new("test_overwrite")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
@@ -309,7 +309,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
self.assertEqual(None, keep.get("2e9ec317e197819358fbc43afca7d837+8"))
- c.save_as("test_create_multiple")
+ c.save_new("test_create_multiple")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
self.assertEqual("01234567", keep.get("2e9ec317e197819358fbc43afca7d837+8"))
@@ -333,6 +333,10 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
def __init__(self, blocks, nocache):
self.blocks = blocks
self.nocache = nocache
+ self.lock = arvados.arvfile.NoopLock()
+
+ def root_collection(self):
+ return self
def _my_block_manager(self):
return ArvadosFileReaderTestCase.MockParent.MockBlockMgr(self.blocks, self.nocache)
@@ -340,8 +344,6 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
def sync_mode(self):
return SYNC_READONLY
- def _root_lock(self):
- return arvados.arvfile.NoopLock()
def make_count_reader(self, nocache=False):
stream = []
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list