[ARVADOS] created: de7c71aac6a8e93f84d515e42859cce674eab009
git at public.curoverse.com
git at public.curoverse.com
Fri Apr 10 15:26:11 EDT 2015
at de7c71aac6a8e93f84d515e42859cce674eab009 (commit)
commit de7c71aac6a8e93f84d515e42859cce674eab009
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 10 15:27:05 2015 -0400
5692: Collection.manifest_text(strip=False) will flush open files and wait for
all blocks to be committed in order to return a manifest text with valid
authorization tokens. Fix tests affected by the change.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 3129bdf..97d6e65 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -305,7 +305,7 @@ class _BufferBlock(object):
self.buffer_view = None
self.buffer_block = None
else:
- raise AssertionError("Invalid state change from %s to %s" % (self.state, state))
+ raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
@synchronized
def state(self):
@@ -484,9 +484,10 @@ class _BlockManager(object):
thread.daemon = True
thread.start()
- # Mark the block as PENDING so to disallow any more appends.
- block.set_state(_BufferBlock.PENDING)
- self._put_queue.put(block)
+ if block.state() == _BufferBlock.WRITABLE:
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ self._put_queue.put(block)
@synchronized
def get_bufferblock(self, locator):
@@ -523,8 +524,7 @@ class _BlockManager(object):
items = self._bufferblocks.items()
for k,v in items:
- if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v)
+ v.owner.flush()
with self.lock:
if self._put_queue is not None:
@@ -736,7 +736,7 @@ class ArvadosFile(object):
break
return ''.join(data)
- def _repack_writes(self):
+ def _repack_writes(self, num_retries):
"""Test if the buffer block has more data than actual segments.
This happens when a buffered write over-writes a file range written in
@@ -754,9 +754,10 @@ class ArvadosFile(object):
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
+ contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
- new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
@@ -785,7 +786,7 @@ class ArvadosFile(object):
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
+ self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -795,9 +796,9 @@ class ArvadosFile(object):
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
@synchronized
- def flush(self):
+ def flush(self, num_retries=None):
if self._current_bblock:
- self._repack_writes()
+ self._repack_writes(num_retries)
self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
@must_be_writable
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 3d48652..3641472 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -831,7 +831,8 @@ class RichCollectionBase(CollectionBase):
:strip:
If True, remove signing tokens from block locators if present.
- If False (default), block locators are left unchanged.
+ If False (default), block locators are left unchanged. Note: if
+ strip is False, this will also flush any open files (committing blocks to Keep).
:normalize:
If True, always export the manifest text in normalized form
@@ -842,6 +843,9 @@ class RichCollectionBase(CollectionBase):
"""
if self.modified() or self._manifest_text is None or normalize:
+ if strip is False:
+ self._my_block_manager().commit_all()
+
stream = {}
buf = []
sorted_keys = sorted(self.keys())
@@ -1249,8 +1253,7 @@ class Collection(RichCollectionBase):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
- merge=True, the default), write the manifest to Keep, and update the
- collection record.
+ merge=True, the default), and update the collection record.
Will raise AssertionError if not associated with a collection record on
the API server. If you want to save a manifest to Keep only, see
@@ -1267,10 +1270,9 @@ class Collection(RichCollectionBase):
if self.modified():
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.")
- self._my_block_manager().commit_all()
+
if merge:
self.update()
- self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
text = self.manifest_text(strip=False)
self._api_response = self._my_api().collections().update(
@@ -1285,20 +1287,17 @@ class Collection(RichCollectionBase):
@must_be_writable
@synchronized
@retry_method
- def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+ def save_new(self, name=None, owner_uuid=None, ensure_unique_name=False, num_retries=None):
"""Save collection to a new collection record.
- Commit pending buffer blocks to Keep, write the manifest to Keep, and
- create a new collection record (if create_collection_record True).
- After creating a new collection record, this Collection object will be
- associated with the new record used by `save()`.
+ Commit pending buffer blocks to Keep and create a new collection record
+ (if create_collection_record True). After creating a new collection
+ record, this Collection object will be associated with the new record
+ used by `save()`.
:name:
The collection name.
- :create_collection_record:
- If True, create a collection record. If False, only save the manifest to keep.
-
:owner_uuid:
the user, or project uuid that will own this collection.
If None, defaults to the current user.
@@ -1312,23 +1311,20 @@ class Collection(RichCollectionBase):
Retry count on API calls (if None, use the collection default)
"""
- self._my_block_manager().commit_all()
- self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
text = self.manifest_text(strip=False)
- if create_collection_record:
- if name is None:
- name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+ if name is None:
+ name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
- body = {"manifest_text": text,
- "name": name}
- if owner_uuid:
- body["owner_uuid"] = owner_uuid
+ body = {"manifest_text": text,
+ "name": name}
+ if owner_uuid:
+ body["owner_uuid"] = owner_uuid
- self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
- text = self._api_response["manifest_text"]
+ self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+ text = self._api_response["manifest_text"]
- self._manifest_locator = self._api_response["uuid"]
+ self._manifest_locator = self._api_response["uuid"]
self._manifest_text = text
self.set_unmodified()
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 825465c..a8da557 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -149,6 +149,16 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(writer.size(), 0)
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:count.txt\n", c.manifest_text())
+ def test_get_manifest_text_commits(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ with Collection(keep_client=keep) as c:
+ writer = c.open("count.txt", "w")
+ writer.write("0123456789")
+ self.assertEqual('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', c.manifest_text(strip=True))
+ self.assertNotIn('781e5e245d69b566979b86e28d23f2c7+10', keep.blocks)
+ self.assertEqual('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', c.manifest_text(strip=False))
+ self.assertIn('781e5e245d69b566979b86e28d23f2c7+10', keep.blocks)
+
def test_write_in_middle(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
@@ -228,11 +238,11 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(writer.size(), 10)
self.assertEqual("0123456789", writer.readfrom(0, 20))
- self.assertEqual(". 7a08b07e84641703e5f2c836aa59a170+100 90:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 7a08b07e84641703e5f2c836aa59a170+100 90:10:count.txt\n", c.manifest_text(strip=True))
writer.flush()
self.assertEqual(writer.size(), 10)
self.assertEqual("0123456789", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", c.manifest_text(strip=True))
def test_rewrite_append_existing_file(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
@@ -245,12 +255,12 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(writer.size(), 20)
self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:10:count.txt 100:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:10:count.txt 100:10:count.txt\n", c.manifest_text(strip=True))
writer.arvadosfile.flush()
self.assertEqual(writer.size(), 20)
self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n", c.manifest_text(strip=True))
def test_rewrite_over_existing_file(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
@@ -263,13 +273,13 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual(writer.size(), 15)
self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:5:count.txt 100:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 ae5f43bab79cf0be33f025fa97ae7398+100 0:5:count.txt 100:10:count.txt\n", c.manifest_text(strip=True))
writer.arvadosfile.flush()
self.assertEqual(writer.size(), 15)
self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", c.manifest_text(strip=True))
def test_write_large_rewrite(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
@@ -569,6 +579,9 @@ class BlockManagerTest(unittest.TestCase):
mockkeep = mock.MagicMock()
blockmanager = arvados.arvfile._BlockManager(mockkeep)
bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.owner = ArvadosFile(Collection(block_manager=blockmanager),
+ [Range(bufferblock.blockid, 0, 3)], [Range(0, 0, 3)])
+ bufferblock.owner._current_bblock = bufferblock
bufferblock.append("foo")
blockmanager.commit_all()
self.assertTrue(mockkeep.put.called)
@@ -581,6 +594,9 @@ class BlockManagerTest(unittest.TestCase):
mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
blockmanager = arvados.arvfile._BlockManager(mockkeep)
bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.owner = ArvadosFile(Collection(block_manager=blockmanager),
+ [Range(bufferblock.blockid, 0, 3)], [Range(0, 0, 3)])
+ bufferblock.owner._current_bblock = bufferblock
bufferblock.append("foo")
with self.assertRaises(arvados.errors.KeepWriteError) as err:
blockmanager.commit_all()
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 8cf34f0..a71e5d1 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -990,7 +990,7 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
# c1 changed, so it should not be deleted.
c1.apply(d)
- self.assertEqual(c1.manifest_text(), ". 95ebc3c7b3b9f1d2c40fec14415d3cb8+5 5348b82a029fd9e971a811ce1f71360b+43 0:5:count1.txt 5:10:count2.txt\n")
+ self.assertEqual(c1.manifest_text(strip=True), ". 95ebc3c7b3b9f1d2c40fec14415d3cb8+5 5348b82a029fd9e971a811ce1f71360b+43 0:5:count1.txt 5:10:count2.txt\n")
def test_conflict_mod(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
@@ -1002,7 +1002,7 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
# c1 changed, so c2 mod will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
+ self.assertRegexpMatches(c1.manifest_text(strip=True), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
def test_conflict_add(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
@@ -1015,7 +1015,7 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
# c1 added count1.txt, so c2 add will go to a conflict file
c1.apply(d)
- self.assertRegexpMatches(c1.manifest_text(), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
+ self.assertRegexpMatches(c1.manifest_text(strip=True), r"\. 95ebc3c7b3b9f1d2c40fec14415d3cb8\+5 5348b82a029fd9e971a811ce1f71360b\+43 0:5:count1\.txt 5:10:count1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~$")
def test_conflict_del(self):
c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list