[ARVADOS] updated: 52c6f13db207030bdbe063665c0dd524007db828
Git user
git at public.curoverse.com
Fri Apr 21 14:31:53 EDT 2017
Summary of changes:
sdk/python/arvados/_ranges.py | 2 +-
sdk/python/arvados/arvfile.py | 58 +++++++++++++++++++-----------
sdk/python/tests/test_collections.py | 15 ++++++++
services/fuse/tests/test_tmp_collection.py | 13 +++++++
4 files changed, 66 insertions(+), 22 deletions(-)
via 52c6f13db207030bdbe063665c0dd524007db828 (commit)
via 0ecea550fde014578e71004360b700cdfeae4909 (commit)
from 7116da151dc8bfd5ac1a9b016b2ed6e4c35572f7 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 52c6f13db207030bdbe063665c0dd524007db828
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 21 14:31:49 2017 -0400
11507: More small block packing fixes:
* Repack writes before small block merging.
* Update segments of file with new locator/segment offset instead of replacing
entire file contents.
diff --git a/sdk/python/arvados/_ranges.py b/sdk/python/arvados/_ranges.py
index 5532ea0..b4368ad 100644
--- a/sdk/python/arvados/_ranges.py
+++ b/sdk/python/arvados/_ranges.py
@@ -190,7 +190,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
# range ends before this segment starts, so don't look at any more locators
break
- if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
+ if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
# new range starts and ends in old segment
# split segment into up to 3 pieces
if (new_range_start-old_segment_start) > 0:
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index fa107a9..c41a45f 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -16,7 +16,7 @@ import uuid
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
from ._normalize_stream import normalize_stream
-from ._ranges import locators_and_ranges, replace_range, Range
+from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
from .retry import retry_method
MOD = "mod"
@@ -389,6 +389,9 @@ class _BufferBlock(object):
self.buffer_block = None
self.buffer_view = None
+ def __repr__(self):
+ return "<BufferBlock %s>" % (self.blockid)
+
class NoopLock(object):
def __enter__(self):
@@ -580,23 +583,32 @@ class _BlockManager(object):
def __exit__(self, exc_type, exc_value, traceback):
self.stop_threads()
- @synchronized
def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- try:
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- except AttributeError:
- # Writable blocks without owner shouldn't exist.
- raise UnownedBlockError()
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
+ with self.lock:
+ bufferblocks = self._bufferblocks.values()
+
+ try:
+ for b in bufferblocks:
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
+ b.owner._repack_writes(0)
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ with self.lock:
+ small_blocks = [b for b in self._bufferblocks.values()
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
if len(small_blocks) <= 1:
# Not enough small blocks for repacking
@@ -618,8 +630,14 @@ class _BlockManager(object):
self.commit_bufferblock(new_bb, sync=sync)
- for bb, segment_offset in files:
- bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), segment_offset)])
+ for bb, new_bb_segment_offset in files:
+ newsegs = []
+ for s in bb.owner.segments():
+ if s.locator == bb.blockid:
+ newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
+ else:
+ newsegs.append(s)
+ bb.owner.set_segments(newsegs)
self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
@@ -1030,7 +1048,7 @@ class ArvadosFile(object):
for t in bufferblock_segs:
new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
t.segment_offset = new_bb.size() - t.range_size
-
+ self._current_bblock.clear()
self._current_bblock = new_bb
@must_be_writable
@@ -1141,17 +1159,17 @@ class ArvadosFile(object):
normalize=False, only_committed=False):
buf = ""
filestream = []
- for segment in self.segments:
+ for segment in self.segments():
loc = segment.locator
if self.parent._my_block_manager().is_bufferblock(loc):
if only_committed:
continue
- loc = self._bufferblocks[loc].calculate_locator()
+ loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
- filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+ filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
segment.segment_offset, segment.range_size))
- buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
+ buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
buf += "\n"
return buf
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 7421850..fd31664 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1170,6 +1170,21 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
'. 900150983cd24fb0d6963f7d28e17f72+3 a8430a058b8fbf408e1931b794dbd6fb+13 0:3:count.txt 6:7:count.txt 13:3:foo.txt\n')
+ def test_small_block_packing_with_overwrite(self):
+ c = Collection()
+ c.open("b1", "w").close()
+ c["b1"].writeto(0, "b1", 0)
+
+ c.open("b2", "w").close()
+ c["b2"].writeto(0, "b2", 0)
+
+ c["b1"].writeto(0, "1b", 0)
+
+ self.assertEquals(c.manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1 2:2:b2\n")
+ self.assertEquals(c["b1"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1\n")
+ self.assertEquals(c["b2"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 2:2:b2\n")
+
+
class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
diff --git a/services/fuse/tests/test_tmp_collection.py b/services/fuse/tests/test_tmp_collection.py
index 60eba1b..c608d62 100644
--- a/services/fuse/tests/test_tmp_collection.py
+++ b/services/fuse/tests/test_tmp_collection.py
@@ -122,3 +122,16 @@ class TmpCollectionTest(IntegrationTest):
with open(path, 'w') as f:
f.write(content)
self.assertRegexpMatches(current_manifest(tmpdir), expect)
+
+ @IntegrationTest.mount(argv=mnt_args)
+ def test_tmp_rewrite(self):
+ self.pool_test(os.path.join(self.mnt, 'zzz'))
+ @staticmethod
+ def _test_tmp_rewrite(self, tmpdir):
+ with open(os.path.join(tmpdir, "b1"), 'w') as f:
+ f.write("b1")
+ with open(os.path.join(tmpdir, "b2"), 'w') as f:
+ f.write("b2")
+ with open(os.path.join(tmpdir, "b1"), 'w') as f:
+ f.write("1b")
+ self.assertRegexpMatches(current_manifest(tmpdir), "^\. ed4f3f67c70b02b29c50ce1ea26666bd\+4(\+\S+)? 0:2:b1 2:2:b2\n$")
commit 0ecea550fde014578e71004360b700cdfeae4909
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 21 09:55:00 2017 -0400
11507: Cleanup
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index accaaa0..fa107a9 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -612,16 +612,14 @@ class _BlockManager(object):
files = []
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
- arvfile = bb.owner
self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
files.append((bb, new_bb.write_pointer - bb.size()))
self.commit_bufferblock(new_bb, sync=sync)
- for fn in files:
- bb = fn[0]
- bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), fn[1])])
+ for bb, segment_offset in files:
+ bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), segment_offset)])
self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list