[ARVADOS] updated: 1ceeafd5411b47834b836019dfd21d4050158171

Git user git at public.curoverse.com
Mon May 29 13:06:39 EDT 2017


Summary of changes:
 sdk/python/arvados/arvfile.py | 64 ++++++++++++++++++-------------------------
 1 file changed, 27 insertions(+), 37 deletions(-)

       via  1ceeafd5411b47834b836019dfd21d4050158171 (commit)
      from  d094ae4ee58f26e0585445eccb3be2d019ab020f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1ceeafd5411b47834b836019dfd21d4050158171
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Mon May 29 13:59:14 2017 -0300

    11684: Instead of fiddling with ArvadosFile object's internals from the BlockManager
    put threads to update the segments locators when committing synchronously a block
    built from smaller blocks, take advantage of ArvadosFile.flush() existing mechanism
    to update unrealized segments locators by building a list of bufferblock owners
    and calling all owner's flush() method on commit_all().
    To avoid calling delete_bufferblock() many times on a single bufferblock, added
    a flag on flush() and delete the bufferblock after flushing all owners.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index f00936d..2c653d7 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -499,8 +499,6 @@ class _BlockManager(object):
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
-        self._repacked_bb = {}
-        self._repacked_bb_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -559,17 +557,6 @@ class _BlockManager(object):
                 else:
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
-                with self._repacked_bb_lock:
-                    # Check if this block was created by repacking smaller blocks
-                    if bufferblock.blockid in self._repacked_bb:
-                        # Update segment locators (with its tokens) of files within
-                        # this block
-                        old_loc = self._repacked_bb[bufferblock.blockid]['unsigned_loc']
-                        for f in self._repacked_bb[bufferblock.blockid]['files']:
-                            for s in [x for x in f._segments if x.locator == old_loc]:
-                                s.locator = loc
-                        del(self._repacked_bb[bufferblock.blockid])
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
             finally:
@@ -685,34 +672,25 @@ class _BlockManager(object):
             return
 
         new_bb = self._alloc_bufferblock()
+        new_bb.owner = []
         files = []
         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
             bb = small_blocks.pop(0)
+            new_bb.owner.append(bb.owner)
             self._pending_write_size -= bb.size()
             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
             files.append((bb, new_bb.write_pointer - bb.size()))
 
-        # If this repacked block will be committed asynchronously, take note
-        # of its files so their segments' locators will be updated with
-        # the correct permission token returned by the API server.
-        if not sync:
-            with self._repacked_bb_lock:
-                self._repacked_bb[new_bb.blockid] = {
-                    'unsigned_loc': new_bb.locator(),
-                    'files': [bb.owner for bb, _ in files],
-                }
-
         self.commit_bufferblock(new_bb, sync=sync)
 
-        with self._repacked_bb_lock:
-            for bb, new_bb_segment_offset in files:
-                newsegs = bb.owner.segments()
-                for s in newsegs:
-                    if s.locator == bb.blockid:
-                        s.locator = new_bb.locator()
-                        s.segment_offset = new_bb_segment_offset+s.segment_offset
-                bb.owner.set_segments(newsegs)
-                self._delete_bufferblock(bb.blockid)
+        for bb, new_bb_segment_offset in files:
+            newsegs = bb.owner.segments()
+            for s in newsegs:
+                if s.locator == bb.blockid:
+                    s.locator = new_bb.blockid
+                    s.segment_offset = new_bb_segment_offset+s.segment_offset
+            bb.owner.set_segments(newsegs)
+            self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -819,7 +797,10 @@ class _BlockManager(object):
 
         for k,v in items:
             if v.state() != _BufferBlock.COMMITTED and v.owner:
-                v.owner.flush(sync=False)
+                # Ignore blocks with a list of owners, as if they're not in COMMITTED
+                # state, they're already being committed asynchronously.
+                if not isinstance(v.owner, list):
+                    v.owner.flush(sync=False)
 
         with self.lock:
             if self._put_queue is not None:
@@ -836,7 +817,15 @@ class _BlockManager(object):
             # flush again with sync=True to remove committed bufferblocks from
             # the segments.
             if v.owner:
-                v.owner.flush(sync=True)
+                if isinstance(v.owner, ArvadosFile):
+                    v.owner.flush(sync=True)
+                elif isinstance(v.owner, list) and len(v.owner) > 0:
+                    # This bufferblock is referenced by many files as a result
+                    # of repacking small blocks, so don't delete it when flushing
+                    # its owners, just do it after flushing them all.
+                    for owner in v.owner:
+                        owner.flush(sync=True, delete_bufferblock=False)
+                    self.delete_bufferblock(k)
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
@@ -1140,7 +1129,7 @@ class ArvadosFile(object):
         return len(data)
 
     @synchronized
-    def flush(self, sync=True, num_retries=0):
+    def flush(self, sync=True, num_retries=0, delete_bufferblock=True):
         """Flush the current bufferblock to Keep.
 
         :sync:
@@ -1166,8 +1155,9 @@ class ArvadosFile(object):
                         self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
                     to_delete.add(s.locator)
                     s.locator = bb.locator()
-            for s in to_delete:
-               self.parent._my_block_manager().delete_bufferblock(s)
+            if delete_bufferblock:
+                for s in to_delete:
+                    self.parent._my_block_manager().delete_bufferblock(s)
 
         self.parent.notify(MOD, self.parent, self.name, (self, self))
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list