[ARVADOS] updated: bf6ef981cea7e923a085c0a9231cebb379c7560a
Git user
git at public.curoverse.com
Thu Oct 6 16:34:07 EDT 2016
Summary of changes:
sdk/python/arvados/arvfile.py | 72 +++++++++++++++++++++---------------
sdk/python/arvados/commands/put.py | 2 +-
sdk/python/tests/test_collections.py | 18 ++++++++-
3 files changed, 61 insertions(+), 31 deletions(-)
via bf6ef981cea7e923a085c0a9231cebb379c7560a (commit)
via e21c8f8c0ba6f2012bc4108f8fa27411ab4375d1 (commit)
via b8036cfd2acd1bf2910130deb46be8a38eaff253 (commit)
from a7ee134a2815fad9febc3cdcd7b2a1fc77aff953 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit bf6ef981cea7e923a085c0a9231cebb379c7560a
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Oct 6 17:32:35 2016 -0300
9701: Fixed a previous test to match new flush() behaviour. Added a new one to check
for small blocks packing.
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index cf8f23e..0767f2a 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1089,6 +1089,7 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
# One file committed
with c.open("foo.txt", "w") as foo:
foo.write("foo")
+ foo.flush() # Force block commit
f.write("0123456789")
# Other file not committed. Block not written to keep yet.
self.assertEqual(
@@ -1097,7 +1098,8 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
normalize=False,
only_committed=True),
'. acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:count.txt 0:3:foo.txt\n')
- # And now with the file closed...
+ # And now with the file closed...
+ f.flush() # Force block commit
self.assertEqual(
c._get_manifest_text(".",
strip=False,
@@ -1105,6 +1107,20 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
only_committed=True),
". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:10:count.txt 10:3:foo.txt\n")
+ def test_only_small_blocks_are_packed_together(self):
+ c = Collection()
+ # Write a couple of small files,
+ with c.open("count.txt", "w") as f:
+ f.write("0123456789")
+ with c.open("foo.txt", "w") as foo:
+ foo.write("foo")
+ # Then, write a big file, it shouldn't be packed with the ones above
+ with c.open("bigfile.txt", "w") as big:
+ big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+ self.assertEqual(
+ c.manifest_text("."),
+ '. 2d303c138c118af809f39319e5d507e9+34603008 e62e558e58131771aae2fd0175cdbf2a+13 0:34603008:bigfile.txt 34603011:10:count.txt 34603008:3:foo.txt\n')
+
class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
commit e21c8f8c0ba6f2012bc4108f8fa27411ab4375d1
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Oct 6 17:23:09 2016 -0300
9701: Several corrections/enhancements:
* Added a 'sync' parameter on repack_small_blocks() so that blocks commits can
be done in either way depending from where it's called.
* Allow packing small buffer blocks up to a full block on Keep.
* Replaced ArvadosFile's _closed flag with a list of its ArvadosFileWriter objects,
so that it's used as a reference counter.
* Moved ArvadosFile flush behaviour from ArvadosFileWriter.close() method to
ArvadosFile.remove_writer() so that it can decide whether it should commit the
buffer block or repack it with others.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ec5e4d5..a043bee 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -549,35 +549,29 @@ class _BlockManager(object):
def __exit__(self, exc_type, exc_value, traceback):
self.stop_threads()
- def repack_small_blocks(self, force=False):
+ def repack_small_blocks(self, force=False, sync=False):
"""Packs small blocks together before uploading"""
- # Candidate bblocks -- This could be sorted in some way to prioritize some
- # kind of bblocks
+ # Search blocks ready for getting packed together before being committed to Keep
small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
- if len(small_blocks) == 0:
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
return
- # Check if there's enough small blocks for combining and uploading
+ # Check if there are enough small blocks for filling up one in full
pending_write_size = sum([b.size() for b in small_blocks])
- if force or (pending_write_size > (config.KEEP_BLOCK_SIZE / 2)):
- if len(small_blocks) == 1:
- # No small blocks for repacking, leave this one alone
- # so it's committed before exiting.
- return
+ if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
self._bufferblocks[new_bb.blockid] = new_bb
size = 0
- while len(small_blocks) > 0 and size <= (config.KEEP_BLOCK_SIZE / 2):
+ while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
size += bb.size()
- new_segs = []
+ arvfile = bb.owner
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
- # FIXME: We shoudn't be accessing _segments directly
- bb.owner._segments = [Range(new_bb.blockid, 0, bb.size(), size-bb.size())]
+ arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
bb.clear()
del self._bufferblocks[bb.blockid]
- # new_bb's size greater half a keep block, let's commit it
- self.commit_bufferblock(new_bb, sync=True)
+ self.commit_bufferblock(new_bb, sync=sync)
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
@@ -659,7 +653,7 @@ class _BlockManager(object):
"""
with self.lock:
- self.repack_small_blocks(force=True)
+ self.repack_small_blocks(force=True, sync=True)
items = self._bufferblocks.items()
for k,v in items:
@@ -730,7 +724,7 @@ class ArvadosFile(object):
"""
self.parent = parent
self.name = name
- self._closed = False
+ self._writers = set()
self._committed = False
self._segments = []
self.lock = parent.root_collection().lock
@@ -808,8 +802,12 @@ class ArvadosFile(object):
return not self.__eq__(other)
@synchronized
+ def set_segments(self, segs):
+ self._segments = segs
+
+ @synchronized
def set_committed(self):
- """Set committed flag to False"""
+ """Set committed flag to True"""
self._committed = True
@synchronized
@@ -818,15 +816,32 @@ class ArvadosFile(object):
return self._committed
@synchronized
- def set_closed(self):
- """Set current block as pending and closed flag to False"""
- self._closed = True
- self.parent._my_block_manager().repack_small_blocks()
+ def add_writer(self, writer):
+ """Add an ArvadosFileWriter reference to the list of writers"""
+ if isinstance(writer, ArvadosFileWriter):
+ self._writers.add(writer)
@synchronized
+ def remove_writer(self, writer):
+ """
+ Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+ and do some block maintenance tasks.
+ """
+ self._writers.remove(writer)
+
+ if self.size() > config.KEEP_BLOCK_SIZE / 2:
+ # File writer closed, not small enough for repacking
+ self.flush()
+ elif self.closed():
+ # All writers closed and size is adequate for repacking
+ self.parent._my_block_manager().repack_small_blocks()
+
def closed(self):
- """Get whether this is closed or not."""
- return self._closed
+ """
+ Get whether this is closed or not. When the writers list is empty, the file
+ is supposed to be closed.
+ """
+ return len(self._writers) == 0
@must_be_writable
@synchronized
@@ -1109,6 +1124,7 @@ class ArvadosFileWriter(ArvadosFileReader):
def __init__(self, arvadosfile, mode, num_retries=None):
super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
self.mode = mode
+ self.arvadosfile.add_writer(self)
@_FileLikeObjectBase._before_close
@retry_method
@@ -1138,9 +1154,7 @@ class ArvadosFileWriter(ArvadosFileReader):
def flush(self):
self.arvadosfile.flush()
- def close(self, flush=True):
+ def close(self):
if not self.closed:
- if flush:
- self.flush()
- self.arvadosfile.set_closed()
+ self.arvadosfile.remove_writer(self)
super(ArvadosFileWriter, self).close()
commit b8036cfd2acd1bf2910130deb46be8a38eaff253
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Oct 6 17:21:49 2016 -0300
9701: Reverted the use of the optional 'flush' argument on ArvadosFileWriter.close()
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index ff64060..1a27410 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -472,7 +472,7 @@ class ArvPutUploadJob(object):
with self._collection_lock:
output = self._my_collection().open(filename, 'w')
self._write(source_fd, output)
- output.close(flush=False)
+ output.close()
def _write(self, source_fd, output):
while True:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list