[ARVADOS] updated: bf6ef981cea7e923a085c0a9231cebb379c7560a

Git user git at public.curoverse.com
Thu Oct 6 16:34:07 EDT 2016


Summary of changes:
 sdk/python/arvados/arvfile.py        | 72 +++++++++++++++++++++---------------
 sdk/python/arvados/commands/put.py   |  2 +-
 sdk/python/tests/test_collections.py | 18 ++++++++-
 3 files changed, 61 insertions(+), 31 deletions(-)

       via  bf6ef981cea7e923a085c0a9231cebb379c7560a (commit)
       via  e21c8f8c0ba6f2012bc4108f8fa27411ab4375d1 (commit)
       via  b8036cfd2acd1bf2910130deb46be8a38eaff253 (commit)
      from  a7ee134a2815fad9febc3cdcd7b2a1fc77aff953 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit bf6ef981cea7e923a085c0a9231cebb379c7560a
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Oct 6 17:32:35 2016 -0300

    9701: Fixed a previous test to match new flush() behaviour. Added a new one to check
    for small blocks packing.

diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index cf8f23e..0767f2a 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -1089,6 +1089,7 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
             # One file committed
             with c.open("foo.txt", "w") as foo:
                 foo.write("foo")
+                foo.flush() # Force block commit
             f.write("0123456789")
             # Other file not committed. Block not written to keep yet.
             self.assertEqual(
@@ -1097,7 +1098,8 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
                                      normalize=False,
                                      only_committed=True),
                 '. acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:count.txt 0:3:foo.txt\n')
-        # And now with the file closed...
+            # And now with the file closed...
+            f.flush() # Force block commit
         self.assertEqual(
             c._get_manifest_text(".",
                                  strip=False,
@@ -1105,6 +1107,20 @@ class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
                                  only_committed=True),
             ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:10:count.txt 10:3:foo.txt\n")
 
+    def test_only_small_blocks_are_packed_together(self):
+        c = Collection()
+        # Write a couple of small files, 
+        with c.open("count.txt", "w") as f:
+            f.write("0123456789")
+        with c.open("foo.txt", "w") as foo:
+            foo.write("foo")
+        # Then, write a big file, it shouldn't be packed with the ones above
+        with c.open("bigfile.txt", "w") as big:
+            big.write("x" * 1024 * 1024 * 33) # 33 MB > KEEP_BLOCK_SIZE/2
+        self.assertEqual(
+            c.manifest_text("."),
+            '. 2d303c138c118af809f39319e5d507e9+34603008 e62e558e58131771aae2fd0175cdbf2a+13 0:34603008:bigfile.txt 34603011:10:count.txt 34603008:3:foo.txt\n')
+
 
 class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}

commit e21c8f8c0ba6f2012bc4108f8fa27411ab4375d1
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Oct 6 17:23:09 2016 -0300

    9701: Several corrections/enhancements:
    * Added a 'sync' parameter on repack_small_blocks() so that blocks commits can
      be done in either way depending from where it's called.
    * Allow packing small buffer blocks up to a full block on Keep.
    * Replaced ArvadosFile's _closed flag with a list of its ArvadosFileWriter objects,
      so that it's used as a reference counter.
    * Moved ArvadosFile flush behaviour from ArvadosFileWriter.close() method to
      ArvadosFile.remove_writer() so that it can decide whether it should commit the
      buffer block or repack it with others.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ec5e4d5..a043bee 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -549,35 +549,29 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
-    def repack_small_blocks(self, force=False):
+    def repack_small_blocks(self, force=False, sync=False):
         """Packs small blocks together before uploading"""
-        # Candidate bblocks -- This could be sorted in some way to prioritize some
-        # kind of bblocks
+        # Search blocks ready for getting packed together before being committed to Keep
         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
-        if len(small_blocks) == 0:
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
             return
 
-        # Check if there's enough small blocks for combining and uploading
+        # Check if there are enough small blocks for filling up one in full
         pending_write_size = sum([b.size() for b in small_blocks])
-        if force or (pending_write_size > (config.KEEP_BLOCK_SIZE / 2)):
-            if len(small_blocks) == 1:
-                # No small blocks for repacking, leave this one alone
-                # so it's committed before exiting.
-                return
+        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
             new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
             self._bufferblocks[new_bb.blockid] = new_bb
             size = 0
-            while len(small_blocks) > 0 and size <= (config.KEEP_BLOCK_SIZE / 2):
+            while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
                 size += bb.size()
-                new_segs = []
+                arvfile = bb.owner
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                # FIXME: We shoudn't be accessing _segments directly
-                bb.owner._segments = [Range(new_bb.blockid, 0, bb.size(), size-bb.size())]
+                arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
                 bb.clear()
                 del self._bufferblocks[bb.blockid]
-            # new_bb's size greater half a keep block, let's commit it
-            self.commit_bufferblock(new_bb, sync=True)
+            self.commit_bufferblock(new_bb, sync=sync)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -659,7 +653,7 @@ class _BlockManager(object):
 
         """
         with self.lock:
-            self.repack_small_blocks(force=True)
+            self.repack_small_blocks(force=True, sync=True)
             items = self._bufferblocks.items()
 
         for k,v in items:
@@ -730,7 +724,7 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
-        self._closed = False
+        self._writers = set()
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -808,8 +802,12 @@ class ArvadosFile(object):
         return not self.__eq__(other)
 
     @synchronized
+    def set_segments(self, segs):
+        self._segments = segs
+
+    @synchronized
     def set_committed(self):
-        """Set committed flag to False"""
+        """Set committed flag to True"""
         self._committed = True
 
     @synchronized
@@ -818,15 +816,32 @@ class ArvadosFile(object):
         return self._committed
 
     @synchronized
-    def set_closed(self):
-        """Set current block as pending and closed flag to False"""
-        self._closed = True
-        self.parent._my_block_manager().repack_small_blocks()
+    def add_writer(self, writer):
+        """Add an ArvadosFileWriter reference to the list of writers"""
+        if isinstance(writer, ArvadosFileWriter):
+            self._writers.add(writer)
 
     @synchronized
+    def remove_writer(self, writer):
+        """
+        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+        and do some block maintenance tasks.
+        """
+        self._writers.remove(writer)
+
+        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+            # File writer closed, not small enough for repacking
+            self.flush()
+        elif self.closed():
+            # All writers closed and size is adequate for repacking
+            self.parent._my_block_manager().repack_small_blocks()
+
     def closed(self):
-        """Get whether this is closed or not."""
-        return self._closed
+        """
+        Get whether this is closed or not. When the writers list is empty, the file
+        is supposed to be closed.
+        """
+        return len(self._writers) == 0
 
     @must_be_writable
     @synchronized
@@ -1109,6 +1124,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
         self.mode = mode
+        self.arvadosfile.add_writer(self)
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -1138,9 +1154,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self, flush=True):
+    def close(self):
         if not self.closed:
-            if flush:
-                self.flush()
-            self.arvadosfile.set_closed()
+            self.arvadosfile.remove_writer(self)
             super(ArvadosFileWriter, self).close()

commit b8036cfd2acd1bf2910130deb46be8a38eaff253
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Oct 6 17:21:49 2016 -0300

    9701: Reverted the use of the optional 'flush' argument on ArvadosFileWriter.close()

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index ff64060..1a27410 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -472,7 +472,7 @@ class ArvPutUploadJob(object):
                 with self._collection_lock:
                     output = self._my_collection().open(filename, 'w')
             self._write(source_fd, output)
-            output.close(flush=False)
+            output.close()
 
     def _write(self, source_fd, output):
         while True:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list