[ARVADOS] updated: 02e9754a68a5816458d517b8f5012530cf17ebba
git at public.curoverse.com
git at public.curoverse.com
Mon Dec 29 16:41:37 EST 2014
Summary of changes:
sdk/python/arvados/arvfile.py | 18 ++++++++++--------
sdk/python/arvados/collection.py | 13 ++++++++-----
2 files changed, 18 insertions(+), 13 deletions(-)
via 02e9754a68a5816458d517b8f5012530cf17ebba (commit)
from 88ba4c97290a316568d89985636d7d2b8fbcfa92 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 02e9754a68a5816458d517b8f5012530cf17ebba
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 29 16:43:04 2014 -0500
3198: Start using BlockManager. Needs tests.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index b5d8189..1342601 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -212,7 +212,7 @@ class BufferBlock(object):
PENDING = 1
COMMITTED = 2
- def __init__(self, blockid, starting_size=2**14):
+ def __init__(self, blockid, starting_size):
self.blockid = blockid
self.buffer_block = bytearray(starting_size)
self.buffer_view = memoryview(self.buffer_block)
@@ -256,8 +256,10 @@ class BlockManager(object):
self._prefetch_queue = None
self._prefetch_thread = None
- def alloc_bufferblock(self):
- bb = BufferBlock("bufferblock%i" % len(self._bufferblocks))
+ def alloc_bufferblock(self, blockid=None, starting_size=2**14):
+ if blockid is None:
+ blockid = "bufferblock%i" % len(self._bufferblocks)
+ bb = BufferBlock(blockid, starting_size=starting_size)
self._bufferblocks[bb.blockid] = bb
return bb
@@ -282,7 +284,7 @@ class BlockManager(object):
threading.Thread(target=worker, args=(self,))]
block.state = BufferBlock.PENDING
- self._queue.put(block)
+ self._put_queue.put(block)
def get_block(self, locator, num_retries):
if locator in self._bufferblocks:
@@ -297,7 +299,7 @@ class BlockManager(object):
for k,v in self._bufferblocks:
if v.state == BufferBlock.WRITABLE:
self.commit_bufferblock(v)
- self._queue.join()
+ self._put_queue.join()
if not self._errors.empty():
e = []
try:
@@ -324,11 +326,12 @@ class BlockManager(object):
self._prefetch_queue.put(locator)
class ArvadosFile(object):
- def __init__(self, stream=[], segments=[], keep=None):
+ def __init__(self, block_manager, stream=[], segments=[], keep=None):
'''
stream: a list of Range objects representing a block stream
segments: a list of Range objects representing segments
'''
+ self.bbm = block_manager
self._modified = True
self._segments = []
for s in segments:
@@ -392,13 +395,12 @@ class ArvadosFile(object):
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
- new_bb = BufferBlock(self._current_bblock.blockid, starting_size=write_total)
+ new_bb = self.bbm.alloc_bufferblock(self._current_bblock.blockid, starting_size=write_total)
for t in bufferblock_segs:
new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
- self.bbm[self._current_bblock.blockid] = self._current_bblock
def writeto(self, offset, data, num_retries):
if len(data) == 0:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 3cbbca2..85c7ad7 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -6,7 +6,7 @@ import re
from collections import deque
from stat import *
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
from keep import *
from .stream import StreamReader, normalize_stream, locator_block_size
from .ranges import Range, LocatorAndRange
@@ -641,7 +641,7 @@ class ResumableCollectionWriter(CollectionWriter):
class Collection(CollectionBase):
def __init__(self, manifest_locator_or_text=None, api_client=None,
- keep_client=None, num_retries=0):
+ keep_client=None, num_retries=0, block_manager=None):
self._items = None
self._api_client = api_client
@@ -651,6 +651,9 @@ class Collection(CollectionBase):
self._manifest_text = None
self._api_response = None
+ if block_manager is None:
+ self.block_manager = BlockManager(keep_client)
+
if manifest_locator_or_text:
if re.match(util.keep_locator_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
@@ -753,13 +756,13 @@ class Collection(CollectionBase):
# item must be a file
if item is None and create:
# create new file
- item = ArvadosFile(keep=self._keep_client)
+ item = ArvadosFile(self.block_manager, keep=self._keep_client)
self._items[p[0]] = item
return item
else:
if item is None and create:
# create new collection
- item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries)
+ item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager)
self._items[p[0]] = item
del p[0]
return item.find("/".join(p), create=create)
@@ -973,7 +976,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
for s in v._segments:
loc = s.locator
if loc.startswith("bufferblock"):
- loc = v._bufferblocks[loc].calculate_locator()
+ loc = v.bbm._bufferblocks[loc].locator()
st.append(LocatorAndRange(loc, locator_block_size(loc),
s.segment_offset, s.range_size))
stream[k] = st
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list