[ARVADOS] updated: 52c6f13db207030bdbe063665c0dd524007db828

Git user git at public.curoverse.com
Fri Apr 21 14:31:53 EDT 2017


Summary of changes:
 sdk/python/arvados/_ranges.py              |  2 +-
 sdk/python/arvados/arvfile.py              | 58 +++++++++++++++++++-----------
 sdk/python/tests/test_collections.py       | 15 ++++++++
 services/fuse/tests/test_tmp_collection.py | 13 +++++++
 4 files changed, 66 insertions(+), 22 deletions(-)

       via  52c6f13db207030bdbe063665c0dd524007db828 (commit)
       via  0ecea550fde014578e71004360b700cdfeae4909 (commit)
      from  7116da151dc8bfd5ac1a9b016b2ed6e4c35572f7 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 52c6f13db207030bdbe063665c0dd524007db828
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Apr 21 14:31:49 2017 -0400

    11507: More small block packing fixes:
    
    * Repack writes before small block merging.
    * Update segments of file with new locator/segment offset instead of replacing
      entire file contents.

diff --git a/sdk/python/arvados/_ranges.py b/sdk/python/arvados/_ranges.py
index 5532ea0..b4368ad 100644
--- a/sdk/python/arvados/_ranges.py
+++ b/sdk/python/arvados/_ranges.py
@@ -190,7 +190,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
             # range ends before this segment starts, so don't look at any more locators
             break
 
-        if  old_segment_start <= new_range_start and new_range_end <= old_segment_end:
+        if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
             # new range starts and ends in old segment
             # split segment into up to 3 pieces
             if (new_range_start-old_segment_start) > 0:
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index fa107a9..c41a45f 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -16,7 +16,7 @@ import uuid
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
 from ._normalize_stream import normalize_stream
-from ._ranges import locators_and_ranges, replace_range, Range
+from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
 from .retry import retry_method
 
 MOD = "mod"
@@ -389,6 +389,9 @@ class _BufferBlock(object):
         self.buffer_block = None
         self.buffer_view = None
 
+    def __repr__(self):
+        return "<BufferBlock %s>" % (self.blockid)
+
 
 class NoopLock(object):
     def __enter__(self):
@@ -580,23 +583,32 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
-    @synchronized
     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
         self._pending_write_size += closed_file_size
 
         # Check if there are enough small blocks for filling up one in full
-        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+            return
 
-            # Search blocks ready for getting packed together before being committed to Keep.
-            # A WRITABLE block always has an owner.
-            # A WRITABLE block with its owner.closed() implies that it's
-            # size is <= KEEP_BLOCK_SIZE/2.
-            try:
-                small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-            except AttributeError:
-                # Writable blocks without owner shouldn't exist.
-                raise UnownedBlockError()
+        # Search blocks ready for getting packed together before being committed to Keep.
+        # A WRITABLE block always has an owner.
+        # A WRITABLE block with its owner.closed() implies that it's
+        # size is <= KEEP_BLOCK_SIZE/2.
+        with self.lock:
+            bufferblocks = self._bufferblocks.values()
+
+        try:
+            for b in bufferblocks:
+                if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
+                    b.owner._repack_writes(0)
+        except AttributeError:
+            # Writable blocks without owner shouldn't exist.
+            raise UnownedBlockError()
+
+        with self.lock:
+            small_blocks = [b for b in self._bufferblocks.values()
+                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
 
             if len(small_blocks) <= 1:
                 # Not enough small blocks for repacking
@@ -618,8 +630,14 @@ class _BlockManager(object):
 
             self.commit_bufferblock(new_bb, sync=sync)
 
-            for bb, segment_offset in files:
-                bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), segment_offset)])
+            for bb, new_bb_segment_offset in files:
+                newsegs = []
+                for s in bb.owner.segments():
+                    if s.locator == bb.blockid:
+                        newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
+                    else:
+                        newsegs.append(s)
+                bb.owner.set_segments(newsegs)
                 self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):
@@ -1030,7 +1048,7 @@ class ArvadosFile(object):
             for t in bufferblock_segs:
                 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
                 t.segment_offset = new_bb.size() - t.range_size
-
+            self._current_bblock.clear()
             self._current_bblock = new_bb
 
     @must_be_writable
@@ -1141,17 +1159,17 @@ class ArvadosFile(object):
                       normalize=False, only_committed=False):
         buf = ""
         filestream = []
-        for segment in self.segments:
+        for segment in self.segments():
             loc = segment.locator
             if self.parent._my_block_manager().is_bufferblock(loc):
                 if only_committed:
                     continue
-                loc = self._bufferblocks[loc].calculate_locator()
+                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
-            filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
                                  segment.segment_offset, segment.range_size))
-        buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
+        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
         buf += "\n"
         return buf
 
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 7421850..fd31664 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1170,6 +1170,21 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
             '. 900150983cd24fb0d6963f7d28e17f72+3 a8430a058b8fbf408e1931b794dbd6fb+13 0:3:count.txt 6:7:count.txt 13:3:foo.txt\n')
 
 
+    def test_small_block_packing_with_overwrite(self):
+        c = Collection()
+        c.open("b1", "w").close()
+        c["b1"].writeto(0, "b1", 0)
+
+        c.open("b2", "w").close()
+        c["b2"].writeto(0, "b2", 0)
+
+        c["b1"].writeto(0, "1b", 0)
+
+        self.assertEquals(c.manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1 2:2:b2\n")
+        self.assertEquals(c["b1"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 0:2:b1\n")
+        self.assertEquals(c["b2"].manifest_text(), ". ed4f3f67c70b02b29c50ce1ea26666bd+4 2:2:b2\n")
+
+
 class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {}
diff --git a/services/fuse/tests/test_tmp_collection.py b/services/fuse/tests/test_tmp_collection.py
index 60eba1b..c608d62 100644
--- a/services/fuse/tests/test_tmp_collection.py
+++ b/services/fuse/tests/test_tmp_collection.py
@@ -122,3 +122,16 @@ class TmpCollectionTest(IntegrationTest):
                     with open(path, 'w') as f:
                         f.write(content)
                 self.assertRegexpMatches(current_manifest(tmpdir), expect)
+
+    @IntegrationTest.mount(argv=mnt_args)
+    def test_tmp_rewrite(self):
+        self.pool_test(os.path.join(self.mnt, 'zzz'))
+    @staticmethod
+    def _test_tmp_rewrite(self, tmpdir):
+        with open(os.path.join(tmpdir, "b1"), 'w') as f:
+            f.write("b1")
+        with open(os.path.join(tmpdir, "b2"), 'w') as f:
+            f.write("b2")
+        with open(os.path.join(tmpdir, "b1"), 'w') as f:
+            f.write("1b")
+        self.assertRegexpMatches(current_manifest(tmpdir), "^\. ed4f3f67c70b02b29c50ce1ea26666bd\+4(\+\S+)? 0:2:b1 2:2:b2\n$")

commit 0ecea550fde014578e71004360b700cdfeae4909
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Apr 21 09:55:00 2017 -0400

    11507: Cleanup

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index accaaa0..fa107a9 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -612,16 +612,14 @@ class _BlockManager(object):
             files = []
             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
-                arvfile = bb.owner
                 self._pending_write_size -= bb.size()
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
                 files.append((bb, new_bb.write_pointer - bb.size()))
 
             self.commit_bufferblock(new_bb, sync=sync)
 
-            for fn in files:
-                bb = fn[0]
-                bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), fn[1])])
+            for bb, segment_offset in files:
+                bb.owner.set_segments([Range(new_bb.locator(), 0, bb.size(), segment_offset)])
                 self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list