[ARVADOS] updated: ad7294edfcc59c3e67548328a88c9e689c3ae2cf
Git user
git at public.curoverse.com
Tue Apr 25 09:37:35 EDT 2017
Summary of changes:
sdk/python/arvados/arvfile.py | 161 ++++++++++++++++++++++--------------------
1 file changed, 83 insertions(+), 78 deletions(-)
via ad7294edfcc59c3e67548328a88c9e689c3ae2cf (commit)
from f4661a02245a35f8d223693a5aecaae87083fb16 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit ad7294edfcc59c3e67548328a88c9e689c3ae2cf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 21 16:36:21 2017 -0400
11507: Move repack_writes() method from ArvadosFile to BufferBlock.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 64e0f05..a2ec76a 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -389,6 +389,46 @@ class _BufferBlock(object):
self.buffer_block = None
self.buffer_view = None
+ @synchronized
+ def repack_writes(self):
+ """Optimize buffer block by repacking segments in file sequence.
+
+ When the client makes random writes, they appear in the buffer block in
+ the sequence they were written rather than the sequence they appear in
+ the file. This makes for inefficient, fragmented manifests. Attempt
+ to optimize by repacking writes in file sequence.
+
+ """
+ if self._state != _BufferBlock.WRITABLE:
+ raise AssertionError("Cannot repack non-writable block")
+
+ segs = self.owner.segments()
+
+ # Collect the segments that reference the buffer block.
+ bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+
+ # Collect total data referenced by segments (could be smaller than
+ # bufferblock size if a portion of the file was written and
+ # then overwritten).
+ write_total = sum([s.range_size for s in bufferblock_segs])
+
+ if write_total < self.size() or len(bufferblock_segs) > 1:
+ # If there's more than one segment referencing this block, it is
+ # due to out-of-order writes and will produce a fragmented
+ # manifest, so try to optimize by re-packing into a new buffer.
+ contents = self.buffer_view[0:self.write_pointer].tobytes()
+ new_bb = _BufferBlock(None, write_total, None)
+ for t in bufferblock_segs:
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+ t.segment_offset = new_bb.size() - t.range_size
+
+ self.buffer_block = new_bb.buffer_block
+ self.buffer_view = new_bb.buffer_view
+ self.write_pointer = new_bb.write_pointer
+ self._locator = None
+ new_bb.clear()
+ self.owner.set_segments(segs)
+
def __repr__(self):
return "<BufferBlock %s>" % (self.blockid)
@@ -583,63 +623,59 @@ class _BlockManager(object):
def __exit__(self, exc_type, exc_value, traceback):
self.stop_threads()
+ @synchronized
def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
- with self.lock:
- self._pending_write_size += closed_file_size
-
- # Check if there are enough small blocks for filling up one in full
- if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
- return
+ self._pending_write_size += closed_file_size
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- bufferblocks = self._bufferblocks.values()
+ # Check if there are enough small blocks for filling up one in full
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
try:
- for b in bufferblocks:
- if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
- b.owner._repack_writes(0)
+ small_blocks = [b for b in self._bufferblocks.values()
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
except AttributeError:
# Writable blocks without owner shouldn't exist.
raise UnownedBlockError()
- with self.lock:
- small_blocks = [b for b in self._bufferblocks.values()
- if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ for bb in small_blocks:
+ bb.repack_writes()
- # Update the pending write size count with its true value, just in case
- # some small file was opened, written and closed several times.
- self._pending_write_size = sum([b.size() for b in small_blocks])
- if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
- return
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
- new_bb = self._alloc_bufferblock()
- files = []
- while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
- bb = small_blocks.pop(0)
- self._pending_write_size -= bb.size()
- new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
- files.append((bb, new_bb.write_pointer - bb.size()))
-
- self.commit_bufferblock(new_bb, sync=sync)
-
- for bb, new_bb_segment_offset in files:
- newsegs = []
- for s in bb.owner.segments():
- if s.locator == bb.blockid:
- newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
- else:
- newsegs.append(s)
- bb.owner.set_segments(newsegs)
- self._delete_bufferblock(bb.blockid)
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.locator()
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
@@ -1021,37 +1057,6 @@ class ArvadosFile(object):
return ''.join(data)
- def _repack_writes(self, num_retries):
- """Optimize buffer block by repacking segments in file sequence.
-
- When the client makes random writes, they appear in the buffer block in
- the sequence they were written rather than the sequence they appear in
- the file. This makes for inefficient, fragmented manifests. Attempt
- to optimize by repacking writes in file sequence.
-
- """
- segs = self._segments
-
- # Collect the segments that reference the buffer block.
- bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
- # Collect total data referenced by segments (could be smaller than
- # bufferblock size if a portion of the file was written and
- # then overwritten).
- write_total = sum([s.range_size for s in bufferblock_segs])
-
- if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
- # If there's more than one segment referencing this block, it is
- # due to out-of-order writes and will produce a fragmented
- # manifest, so try to optimize by re-packing into a new buffer.
- contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
- new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
- for t in bufferblock_segs:
- new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
- t.segment_offset = new_bb.size() - t.range_size
- self._current_bblock.clear()
- self._current_bblock = new_bb
-
@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
@@ -1082,7 +1087,7 @@ class ArvadosFile(object):
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes(num_retries)
+ self._current_bblock.repack_writes()
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -1109,7 +1114,7 @@ class ArvadosFile(object):
if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
if self._current_bblock.state() == _BufferBlock.WRITABLE:
- self._repack_writes(num_retries)
+ self._current_bblock.repack_writes()
if self._current_bblock.state() != _BufferBlock.DELETED:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
@@ -1160,7 +1165,7 @@ class ArvadosFile(object):
normalize=False, only_committed=False):
buf = ""
filestream = []
- for segment in self.segments():
+ for segment in self._segments:
loc = segment.locator
if self.parent._my_block_manager().is_bufferblock(loc):
if only_committed:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list