[ARVADOS] created: a7ee134a2815fad9febc3cdcd7b2a1fc77aff953

Git user git at public.curoverse.com
Wed Oct 5 14:31:36 EDT 2016


        at  a7ee134a2815fad9febc3cdcd7b2a1fc77aff953 (commit)


commit a7ee134a2815fad9febc3cdcd7b2a1fc77aff953
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Wed Oct 5 15:30:33 2016 -0300

    9701: Test fixes

diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index 47594ab..7a0120c 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -283,8 +283,8 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
             cwriter_new.start()
-            self.assertEqual(0, cwriter_new.bytes_written)
             cwriter_new.destroy_cache()
+            self.assertEqual(0, cwriter_new.bytes_written)
 
     def make_progress_tester(self):
         progression = []
@@ -434,9 +434,8 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
             os.chmod(cachedir, 0o700)
 
     def test_put_block_replication(self):
-        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
-             mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
-            cache_mock.side_effect = ValueError
+        self.call_main_on_test_file()
+        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
             self.call_main_on_test_file(['--replication', '1'])
             self.call_main_on_test_file(['--replication', '4'])

commit 480df25dc679998b53f9e7299244ac1ff3f90114
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Wed Oct 5 15:20:55 2016 -0300

    9701: Changes on the Python SDK to allow small file packing on Collection class:
    * Added optional flush parameter to ArvadosFileWriter.close().
    * Added _closed attribute & related accessors to ArvadosFile to enable BlockManager to query this state.
    * For every ArvadosFile close() operation, call BlockManager to search for small blocks for repacking purposes.
    * Do a last repacking operation just before exiting, joining the last small blocks even if they don't meet the
      minimum size requirement.
    
    There's some pending code cleanup, as BlockManager fiddles directly with ArvadosFile._segments lists, some locking:
    issues have to be solved.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index f2f7df2..ec5e4d5 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -549,6 +549,36 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
+    def repack_small_blocks(self, force=False):
+        """Packs small blocks together before uploading"""
+        # Candidate bblocks -- This could be sorted in some way to prioritize some
+        # kind of bblocks
+        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
+        if len(small_blocks) == 0:
+            return
+
+        # Check if there's enough small blocks for combining and uploading
+        pending_write_size = sum([b.size() for b in small_blocks])
+        if force or (pending_write_size > (config.KEEP_BLOCK_SIZE / 2)):
+            if len(small_blocks) == 1:
+                # No small blocks for repacking, leave this one alone
+                # so it's committed before exiting.
+                return
+            new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
+            self._bufferblocks[new_bb.blockid] = new_bb
+            size = 0
+            while len(small_blocks) > 0 and size <= (config.KEEP_BLOCK_SIZE / 2):
+                bb = small_blocks.pop(0)
+                size += bb.size()
+                new_segs = []
+                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+                # FIXME: We shoudn't be accessing _segments directly
+                bb.owner._segments = [Range(new_bb.blockid, 0, bb.size(), size-bb.size())]
+                bb.clear()
+                del self._bufferblocks[bb.blockid]
+            # new_bb's size greater half a keep block, let's commit it
+            self.commit_bufferblock(new_bb, sync=True)
+
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
 
@@ -562,7 +592,6 @@ class _BlockManager(object):
           which case it will wait on an upload queue slot.
 
         """
-
         try:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
@@ -630,10 +659,11 @@ class _BlockManager(object):
 
         """
         with self.lock:
+            self.repack_small_blocks(force=True)
             items = self._bufferblocks.items()
 
         for k,v in items:
-            if v.state() != _BufferBlock.COMMITTED:
+            if v.state() != _BufferBlock.COMMITTED and v.owner:
                 v.owner.flush(sync=False)
 
         with self.lock:
@@ -700,6 +730,7 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
+        self._closed = False
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -786,6 +817,17 @@ class ArvadosFile(object):
         """Get whether this is committed or not."""
         return self._committed
 
+    @synchronized
+    def set_closed(self):
+        """Set current block as pending and closed flag to False"""
+        self._closed = True
+        self.parent._my_block_manager().repack_small_blocks()
+
+    @synchronized
+    def closed(self):
+        """Get whether this is closed or not."""
+        return self._closed
+
     @must_be_writable
     @synchronized
     def truncate(self, size):
@@ -1096,7 +1138,9 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.flush()
+            if flush:
+                self.flush()
+            self.arvadosfile.set_closed()
             super(ArvadosFileWriter, self).close()
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 1a27410..ff64060 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -472,7 +472,7 @@ class ArvPutUploadJob(object):
                 with self._collection_lock:
                     output = self._my_collection().open(filename, 'w')
             self._write(source_fd, output)
-            output.close()
+            output.close(flush=False)
 
     def _write(self, source_fd, output):
         while True:

commit 224f384d411bb1b4cccc7165c55bb64fd5c695ad
Merge: 0821f54 6d162bf
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Mon Oct 3 16:44:31 2016 -0300

    9701: Merge branch '9463-change-arvput-use-collection-class' into 9701-collection-pack-small-files-alt


-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list