[ARVADOS] updated: 95fc98726d64c71ed0b3a8c2270ee62c1c5d1bb5
Git user
git at public.curoverse.com
Fri Oct 28 10:57:37 EDT 2016
Summary of changes:
sdk/python/arvados/arvfile.py | 52 +++++++++++++++++++++++++------------------
1 file changed, 30 insertions(+), 22 deletions(-)
via 95fc98726d64c71ed0b3a8c2270ee62c1c5d1bb5 (commit)
via 4c0b3359319e4c778371e3d61368dec6635c3ef2 (commit)
from 078bdb166766423e1a423523e5285966aff7ec6b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 95fc98726d64c71ed0b3a8c2270ee62c1c5d1bb5
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Oct 28 11:56:20 2016 -0300
10315: Update pending write size count just before packing small blocks to see if there really are enough to fill a full one.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2858a48..7ee028d 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -574,6 +574,13 @@ class _BlockManager(object):
# Not enough small blocks for repacking
return
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ if not force:
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE:
+ return
+
new_bb = self._alloc_bufferblock()
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
commit 4c0b3359319e4c778371e3d61368dec6635c3ef2
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Fri Oct 28 11:27:22 2016 -0300
10315: Added back the BlockManager's put threads lazy start, but with a specific lock, so it can be called from a @synchronized method.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 5ae53f4..2858a48 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -418,7 +418,7 @@ class _BlockManager(object):
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self._pending_write_size = 0
- self.start_put_threads()
+ self.threads_lock = threading.Lock()
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -484,28 +484,28 @@ class _BlockManager(object):
if self._put_queue is not None:
self._put_queue.task_done()
- @synchronized
def start_put_threads(self):
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=self._commit_bufferblock_worker)
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
+ with self.threads_lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=self._commit_bufferblock_worker)
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
def _block_prefetch_worker(self):
"""The background downloader thread."""
@@ -627,6 +627,7 @@ class _BlockManager(object):
block.set_state(_BufferBlock.ERROR, e)
raise
else:
+ self.start_put_threads()
self._put_queue.put(block)
@synchronized
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list