[ARVADOS] updated: ad7294edfcc59c3e67548328a88c9e689c3ae2cf

Git user git at public.curoverse.com
Tue Apr 25 09:37:35 EDT 2017


Summary of changes:
 sdk/python/arvados/arvfile.py | 161 ++++++++++++++++++++++--------------------
 1 file changed, 83 insertions(+), 78 deletions(-)

       via  ad7294edfcc59c3e67548328a88c9e689c3ae2cf (commit)
      from  f4661a02245a35f8d223693a5aecaae87083fb16 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit ad7294edfcc59c3e67548328a88c9e689c3ae2cf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Apr 21 16:36:21 2017 -0400

    11507: Move repack_writes() method from ArvadosFile to BufferBlock.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 64e0f05..a2ec76a 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -389,6 +389,46 @@ class _BufferBlock(object):
         self.buffer_block = None
         self.buffer_view = None
 
+    @synchronized
+    def repack_writes(self):
+        """Optimize buffer block by repacking segments in file sequence.
+
+        When the client makes random writes, they appear in the buffer block in
+        the sequence they were written rather than the sequence they appear in
+        the file.  This makes for inefficient, fragmented manifests.  Attempt
+        to optimize by repacking writes in file sequence.
+
+        """
+        if self._state != _BufferBlock.WRITABLE:
+            raise AssertionError("Cannot repack non-writable block")
+
+        segs = self.owner.segments()
+
+        # Collect the segments that reference the buffer block.
+        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+
+        # Collect total data referenced by segments (could be smaller than
+        # bufferblock size if a portion of the file was written and
+        # then overwritten).
+        write_total = sum([s.range_size for s in bufferblock_segs])
+
+        if write_total < self.size() or len(bufferblock_segs) > 1:
+            # If there's more than one segment referencing this block, it is
+            # due to out-of-order writes and will produce a fragmented
+            # manifest, so try to optimize by re-packing into a new buffer.
+            contents = self.buffer_view[0:self.write_pointer].tobytes()
+            new_bb = _BufferBlock(None, write_total, None)
+            for t in bufferblock_segs:
+                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+                t.segment_offset = new_bb.size() - t.range_size
+
+            self.buffer_block = new_bb.buffer_block
+            self.buffer_view = new_bb.buffer_view
+            self.write_pointer = new_bb.write_pointer
+            self._locator = None
+            new_bb.clear()
+            self.owner.set_segments(segs)
+
     def __repr__(self):
         return "<BufferBlock %s>" % (self.blockid)
 
@@ -583,63 +623,59 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
+    @synchronized
     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
 
-        with self.lock:
-            self._pending_write_size += closed_file_size
-
-            # Check if there are enough small blocks for filling up one in full
-            if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
-                return
+        self._pending_write_size += closed_file_size
 
-            # Search blocks ready for getting packed together before being committed to Keep.
-            # A WRITABLE block always has an owner.
-            # A WRITABLE block with its owner.closed() implies that it's
-            # size is <= KEEP_BLOCK_SIZE/2.
-            bufferblocks = self._bufferblocks.values()
+        # Check if there are enough small blocks for filling up one in full
+        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+            return
 
+        # Search blocks ready for getting packed together before being committed to Keep.
+        # A WRITABLE block always has an owner.
+        # A WRITABLE block with its owner.closed() implies that it's
+        # size is <= KEEP_BLOCK_SIZE/2.
         try:
-            for b in bufferblocks:
-                if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
-                    b.owner._repack_writes(0)
+            small_blocks = [b for b in self._bufferblocks.values()
+                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
         except AttributeError:
             # Writable blocks without owner shouldn't exist.
             raise UnownedBlockError()
 
-        with self.lock:
-            small_blocks = [b for b in self._bufferblocks.values()
-                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
+            return
 
-            if len(small_blocks) <= 1:
-                # Not enough small blocks for repacking
-                return
+        for bb in small_blocks:
+            bb.repack_writes()
 
-            # Update the pending write size count with its true value, just in case
-            # some small file was opened, written and closed several times.
-            self._pending_write_size = sum([b.size() for b in small_blocks])
-            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
-                return
+        # Update the pending write size count with its true value, just in case
+        # some small file was opened, written and closed several times.
+        self._pending_write_size = sum([b.size() for b in small_blocks])
 
-            new_bb = self._alloc_bufferblock()
-            files = []
-            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
-                bb = small_blocks.pop(0)
-                self._pending_write_size -= bb.size()
-                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                files.append((bb, new_bb.write_pointer - bb.size()))
-
-            self.commit_bufferblock(new_bb, sync=sync)
-
-            for bb, new_bb_segment_offset in files:
-                newsegs = []
-                for s in bb.owner.segments():
-                    if s.locator == bb.blockid:
-                        newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
-                    else:
-                        newsegs.append(s)
-                bb.owner.set_segments(newsegs)
-                self._delete_bufferblock(bb.blockid)
+        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+            return
+
+        new_bb = self._alloc_bufferblock()
+        files = []
+        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+            bb = small_blocks.pop(0)
+            self._pending_write_size -= bb.size()
+            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+            files.append((bb, new_bb.write_pointer - bb.size()))
+
+        self.commit_bufferblock(new_bb, sync=sync)
+
+        for bb, new_bb_segment_offset in files:
+            newsegs = bb.owner.segments()
+            for s in newsegs:
+                if s.locator == bb.blockid:
+                    s.locator = new_bb.locator()
+                    s.segment_offset = new_bb_segment_offset+s.segment_offset
+            bb.owner.set_segments(newsegs)
+            self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -1021,37 +1057,6 @@ class ArvadosFile(object):
 
         return ''.join(data)
 
-    def _repack_writes(self, num_retries):
-        """Optimize buffer block by repacking segments in file sequence.
-
-        When the client makes random writes, they appear in the buffer block in
-        the sequence they were written rather than the sequence they appear in
-        the file.  This makes for inefficient, fragmented manifests.  Attempt
-        to optimize by repacking writes in file sequence.
-
-        """
-        segs = self._segments
-
-        # Collect the segments that reference the buffer block.
-        bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
-        # Collect total data referenced by segments (could be smaller than
-        # bufferblock size if a portion of the file was written and
-        # then overwritten).
-        write_total = sum([s.range_size for s in bufferblock_segs])
-
-        if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
-            # If there's more than one segment referencing this block, it is
-            # due to out-of-order writes and will produce a fragmented
-            # manifest, so try to optimize by re-packing into a new buffer.
-            contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
-            new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
-            for t in bufferblock_segs:
-                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
-                t.segment_offset = new_bb.size() - t.range_size
-            self._current_bblock.clear()
-            self._current_bblock = new_bb
-
     @must_be_writable
     @synchronized
     def writeto(self, offset, data, num_retries):
@@ -1082,7 +1087,7 @@ class ArvadosFile(object):
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-            self._repack_writes(num_retries)
+            self._current_bblock.repack_writes()
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -1109,7 +1114,7 @@ class ArvadosFile(object):
 
         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
             if self._current_bblock.state() == _BufferBlock.WRITABLE:
-                self._repack_writes(num_retries)
+                self._current_bblock.repack_writes()
             if self._current_bblock.state() != _BufferBlock.DELETED:
                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
 
@@ -1160,7 +1165,7 @@ class ArvadosFile(object):
                       normalize=False, only_committed=False):
         buf = ""
         filestream = []
-        for segment in self.segments():
+        for segment in self._segments:
             loc = segment.locator
             if self.parent._my_block_manager().is_bufferblock(loc):
                 if only_committed:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list