[ARVADOS] updated: 02e9754a68a5816458d517b8f5012530cf17ebba

git at public.curoverse.com git at public.curoverse.com
Mon Dec 29 16:41:37 EST 2014


Summary of changes:
 sdk/python/arvados/arvfile.py    | 18 ++++++++++--------
 sdk/python/arvados/collection.py | 13 ++++++++-----
 2 files changed, 18 insertions(+), 13 deletions(-)

       via  02e9754a68a5816458d517b8f5012530cf17ebba (commit)
      from  88ba4c97290a316568d89985636d7d2b8fbcfa92 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 02e9754a68a5816458d517b8f5012530cf17ebba
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 29 16:43:04 2014 -0500

    3198: Start using BlockManager.  Needs tests.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index b5d8189..1342601 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -212,7 +212,7 @@ class BufferBlock(object):
     PENDING = 1
     COMMITTED = 2
 
-    def __init__(self, blockid, starting_size=2**14):
+    def __init__(self, blockid, starting_size):
         self.blockid = blockid
         self.buffer_block = bytearray(starting_size)
         self.buffer_view = memoryview(self.buffer_block)
@@ -256,8 +256,10 @@ class BlockManager(object):
         self._prefetch_queue = None
         self._prefetch_thread = None
 
-    def alloc_bufferblock(self):
-        bb = BufferBlock("bufferblock%i" % len(self._bufferblocks))
+    def alloc_bufferblock(self, blockid=None, starting_size=2**14):
+        if blockid is None:
+            blockid = "bufferblock%i" % len(self._bufferblocks)
+        bb = BufferBlock(blockid, starting_size=starting_size)
         self._bufferblocks[bb.blockid] = bb
         return bb
 
@@ -282,7 +284,7 @@ class BlockManager(object):
                              threading.Thread(target=worker, args=(self,))]
 
         block.state = BufferBlock.PENDING
-        self._queue.put(block)
+        self._put_queue.put(block)
 
     def get_block(self, locator, num_retries):
         if locator in self._bufferblocks:
@@ -297,7 +299,7 @@ class BlockManager(object):
         for k,v in self._bufferblocks:
             if v.state == BufferBlock.WRITABLE:
                 self.commit_bufferblock(v)
-        self._queue.join()
+        self._put_queue.join()
         if not self._errors.empty():
             e = []
             try:
@@ -324,11 +326,12 @@ class BlockManager(object):
         self._prefetch_queue.put(locator)
 
 class ArvadosFile(object):
-    def __init__(self, stream=[], segments=[], keep=None):
+    def __init__(self, block_manager, stream=[], segments=[], keep=None):
         '''
         stream: a list of Range objects representing a block stream
         segments: a list of Range objects representing segments
         '''
+        self.bbm = block_manager
         self._modified = True
         self._segments = []
         for s in segments:
@@ -392,13 +395,12 @@ class ArvadosFile(object):
         if write_total < self._current_bblock.size():
             # There is more data in the buffer block than is actually accounted for by segments, so
             # re-pack into a new buffer by copying over to a new buffer block.
-            new_bb = BufferBlock(self._current_bblock.blockid, starting_size=write_total)
+            new_bb = self.bbm.alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total)
             for t in bufferblock_segs:
                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
                 t.segment_offset = new_bb.size() - t.range_size
 
             self._current_bblock = new_bb
-            self.bbm[self._current_bblock.blockid] = self._current_bblock
 
     def writeto(self, offset, data, num_retries):
         if len(data) == 0:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 3cbbca2..85c7ad7 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -6,7 +6,7 @@ import re
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
@@ -641,7 +641,7 @@ class ResumableCollectionWriter(CollectionWriter):
 
 class Collection(CollectionBase):
     def __init__(self, manifest_locator_or_text=None, api_client=None,
-                 keep_client=None, num_retries=0):
+                 keep_client=None, num_retries=0, block_manager=None):
 
         self._items = None
         self._api_client = api_client
@@ -651,6 +651,9 @@ class Collection(CollectionBase):
         self._manifest_text = None
         self._api_response = None
 
+        if block_manager is None:
+            self.block_manager = BlockManager(keep_client)
+
         if manifest_locator_or_text:
             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
@@ -753,13 +756,13 @@ class Collection(CollectionBase):
                 # item must be a file
                 if item is None and create:
                     # create new file
-                    item = ArvadosFile(keep=self._keep_client)
+                    item = ArvadosFile(self.block_manager, keep=self._keep_client)
                     self._items[p[0]] = item
                 return item
             else:
                 if item is None and create:
                     # create new collection
-                    item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries)
+                    item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager)
                     self._items[p[0]] = item
                 del p[0]
                 return item.find("/".join(p), create=create)
@@ -973,7 +976,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
             for s in v._segments:
                 loc = s.locator
                 if loc.startswith("bufferblock"):
-                    loc = v._bufferblocks[loc].calculate_locator()
+                    loc = v.bbm._bufferblocks[loc].locator()
                 st.append(LocatorAndRange(loc, locator_block_size(loc),
                                      s.segment_offset, s.range_size))
             stream[k] = st

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list