[ARVADOS] created: d094ae4ee58f26e0585445eccb3be2d019ab020f

Git user git at public.curoverse.com
Thu May 25 22:39:13 EDT 2017


        at  d094ae4ee58f26e0585445eccb3be2d019ab020f (commit)


commit d094ae4ee58f26e0585445eccb3be2d019ab020f
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu May 25 23:34:59 2017 -0300

    11684: When packing small blocks into one, save references of the files
    included on the block when committing it asynchronously, so that the
    segment's locators can be updated at the put thread after the block is
    committed and the permission token is returned from the API Server.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2fc9c73..f00936d 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -499,6 +499,8 @@ class _BlockManager(object):
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
+        self._repacked_bb = {}
+        self._repacked_bb_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -558,6 +560,16 @@ class _BlockManager(object):
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
+                with self._repacked_bb_lock:
+                    # Check if this block was created by repacking smaller blocks
+                    if bufferblock.blockid in self._repacked_bb:
+                        # Update segment locators (with its tokens) of files within
+                        # this block
+                        old_loc = self._repacked_bb[bufferblock.blockid]['unsigned_loc']
+                        for f in self._repacked_bb[bufferblock.blockid]['files']:
+                            for s in [x for x in f._segments if x.locator == old_loc]:
+                                s.locator = loc
+                        del(self._repacked_bb[bufferblock.blockid])
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
             finally:
@@ -680,16 +692,27 @@ class _BlockManager(object):
             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
             files.append((bb, new_bb.write_pointer - bb.size()))
 
+        # If this repacked block will be committed asynchronously, take note
+        # of its files so their segments' locators will be updated with
+        # the correct permission token returned by the API server.
+        if not sync:
+            with self._repacked_bb_lock:
+                self._repacked_bb[new_bb.blockid] = {
+                    'unsigned_loc': new_bb.locator(),
+                    'files': [bb.owner for bb, _ in files],
+                }
+
         self.commit_bufferblock(new_bb, sync=sync)
 
-        for bb, new_bb_segment_offset in files:
-            newsegs = bb.owner.segments()
-            for s in newsegs:
-                if s.locator == bb.blockid:
-                    s.locator = new_bb.locator()
-                    s.segment_offset = new_bb_segment_offset+s.segment_offset
-            bb.owner.set_segments(newsegs)
-            self._delete_bufferblock(bb.blockid)
+        with self._repacked_bb_lock:
+            for bb, new_bb_segment_offset in files:
+                newsegs = bb.owner.segments()
+                for s in newsegs:
+                    if s.locator == bb.blockid:
+                        s.locator = new_bb.locator()
+                        s.segment_offset = new_bb_segment_offset+s.segment_offset
+                bb.owner.set_segments(newsegs)
+                self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index cfc3665..a992328 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1182,7 +1182,7 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
     def setUp(self):
         self.keep_put = getattr(arvados.keep.KeepClient, 'put')
 
-    def test_repacked_block_sumbmission_get_permission_token(self):
+    def test_repacked_block_submission_get_permission_token(self):
         '''
         Make sure that those blocks that are committed after repacking small ones,
         get their permission tokens assigned on the collection manifest.

commit 3b12ef6b6d7ff6852f6109ab71dbec382322a686
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Mon May 22 17:05:45 2017 -0300

    11684: Reverted easy fix to expose the bug: when there's a delay writing a block that's
    produced by packing smaller blocks into one, its locator doesn't get updated with the
    correct access token, so it will fail when trying to save the collection to the API
    server.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 9ba4349..2fc9c73 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -680,7 +680,7 @@ class _BlockManager(object):
             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
             files.append((bb, new_bb.write_pointer - bb.size()))
 
-        self.commit_bufferblock(new_bb, sync=True)
+        self.commit_bufferblock(new_bb, sync=sync)
 
         for bb, new_bb_segment_offset in files:
             newsegs = bb.owner.segments()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list