[ARVADOS] updated: e78607008c39aa88ccf6e95d7c6dfcc20a52a2ed
git at public.curoverse.com
git at public.curoverse.com
Fri Dec 19 16:06:18 EST 2014
Summary of changes:
sdk/python/arvados/arvfile.py | 247 +++++++++++++++++++++++++++--------
sdk/python/arvados/collection.py | 80 ++++++++----
sdk/python/arvados/ranges.py | 80 +++++++-----
sdk/python/arvados/stream.py | 23 +---
sdk/python/tests/test_collections.py | 3 +-
sdk/python/tests/test_stream.py | 147 +--------------------
6 files changed, 303 insertions(+), 277 deletions(-)
via e78607008c39aa88ccf6e95d7c6dfcc20a52a2ed (commit)
from 93bc7c31775039cfb05be4caa0891f13fa49409f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e78607008c39aa88ccf6e95d7c6dfcc20a52a2ed
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Dec 19 16:07:30 2014 -0500
3198: More refactoring and bug/test fixing
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ca24990..d2b174f 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -4,6 +4,8 @@ import zlib
import bz2
from .ranges import *
from arvados.retry import retry_method
+import config
+import hashlib
def split(path):
"""split(path) -> streamname, filename
@@ -202,87 +204,222 @@ class StreamFileReader(ArvadosFileReaderBase):
return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
+class BufferBlock(object):
+ def __init__(self, locator, starting_size=2**16):
+ self.locator = locator
+ self.buffer_block = bytearray(starting_size)
+ self.buffer_view = memoryview(self.buffer_block)
+ self.write_pointer = 0
+
+ def append(self, data):
+ while (self.write_pointer+len(data)) > len(self.buffer_block):
+ new_buffer_block = bytearray(len(self.buffer_block) * 2)
+ new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
+ self.buffer_block = new_buffer_block
+ self.buffer_view = memoryview(self.buffer_block)
+ self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
+ self.write_pointer += len(data)
+
+ def calculate_locator(self):
+ return "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.write_pointer)
+
+
class ArvadosFile(object):
- def __init__(self, stream, segments):
- # TODO: build segments list
- self.segments = []
+ def __init__(self, stream=[], segments=[], keep=None):
+ '''
+ stream: a list of Range objects representing a block stream
+ segments: a list of Range objects representing segments
+ '''
+ self._modified = True
+ self._segments = []
+ for s in segments:
+ self.add_segment(stream, s.range_start, s.range_size)
+ self._current_bblock = None
+ self._bufferblocks = None
+ self._keep = keep
+
+ def set_unmodified(self):
+ self._modified = False
+
+ def modified(self):
+ return self._modified
def truncate(self, size):
- pass
- # TODO: fixme
-
- # segs = locators_and_ranges(self.segments, 0, size)
-
- # newstream = []
- # self.segments = []
- # streamoffset = 0L
- # fileoffset = 0L
-
- # for seg in segs:
- # for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg.locator+seg.range_start, seg[SEGMENTSIZE]):
- # newstream.append([locator, blocksize, streamoffset])
- # self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
- # streamoffset += blocksize
- # fileoffset += segmentsize
- # if len(newstream) == 0:
- # newstream.append(config.EMPTY_BLOCK_LOCATOR)
- # self.segments.append([0, 0, 0])
- # self._stream._data_locators = newstream
- # if self._filepos > fileoffset:
- # self._filepos = fileoffset
-
- def readfrom(self, offset, data):
- pass
+ new_segs = []
+ for r in self._segments:
+ range_end = r.range_start+r.range_size
+ if r.range_start >= size:
+ # segment is past the trucate size, all done
+ break
+ elif size < range_end:
+ nr = Range(r.locator, r.range_start, size - r.range_start)
+ nr.segment_offset = r.segment_offset
+ new_segs.append(nr)
+ break
+ else:
+ new_segs.append(r)
- def writeto(self, offset, data):
- if offset > self._size():
- raise ArgumentError("Offset is past the end of the file")
- # TODO: fixme
- # self._stream._append(data)
- # replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
+ self._segments = new_segs
+ self._modified = True
- def flush(self):
+ def _keepget(self, locator, num_retries):
+ if self._bufferblocks and locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ return bb.buffer_view[0:bb.write_pointer].tobytes()
+ else:
+ return self._keep.get(locator, num_retries=num_retries)
+
+ def readfrom(self, offset, size, num_retries):
+ if size == 0 or offset >= self.size():
+ return ''
+ if self._keep is None:
+ self._keep = KeepClient(num_retries=num_retries)
+ data = []
+ # TODO: initiate prefetch on all blocks in the range (offset, offset + size + config.KEEP_BLOCK_SIZE)
+
+ for lr in locators_and_ranges(self._segments, offset, size):
+ # TODO: if data is empty, wait on block get, otherwise only
+ # get more data if the block is already in the cache.
+ data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ return ''.join(data)
+
+ def _init_bufferblock(self):
+ if self._bufferblocks is None:
+ self._bufferblocks = {}
+ self._current_bblock = BufferBlock("bufferblock%i" % len(self._bufferblocks))
+ self._bufferblocks[self._current_bblock.locator] = self._current_bblock
+
+ def _repack_writes(self):
pass
+ # TODO: fixme
+# '''Test if the buffer block has more data than is referenced by actual segments
+# (this happens when a buffered write over-writes a file range written in
+# a previous buffered write). Re-pack the buffer block for efficiency
+# and to avoid leaking information.
+# '''
+# segs = self._files.values()[0].segments
+
+# bufferblock_segs = []
+# i = 0
+# tmp_segs = copy.copy(segs)
+# while i < len(tmp_segs):
+# # Go through each segment and identify segments that include the buffer block
+# s = tmp_segs[i]
+# if s[LOCATOR] < self.current_bblock.locator_list_entry.range_start and (s[LOCATOR] + s.range_size) > self.current_bblock.locator_list_entry.range_start:
+# # The segment straddles the previous block and the current buffer block. Split the segment.
+# b1 = self.current_bblock.locator_list_entry.range_start - s[LOCATOR]
+# b2 = (s[LOCATOR] + s.range_size) - self.current_bblock.locator_list_entry.range_start
+# bb_seg = [self.current_bblock.locator_list_entry.range_start, b2, s.range_start+b1]
+# tmp_segs[i] = [s[LOCATOR], b1, s.range_start]
+# tmp_segs.insert(i+1, bb_seg)
+# bufferblock_segs.append(bb_seg)
+# i += 1
+# elif s[LOCATOR] >= self.current_bblock.locator_list_entry.range_start:
+# # The segment's data is in the buffer block.
+# bufferblock_segs.append(s)
+# i += 1
+
+# # Now sum up the segments to get the total bytes
+# # of the file referencing into the buffer block.
+# write_total = sum([s.range_size for s in bufferblock_segs])
+
+# if write_total < self.current_bblock.locator_list_entry.range_size:
+# # There is more data in the buffer block than is actually accounted for by segments, so
+# # re-pack into a new buffer by copying over to a new buffer block.
+# new_bb = BufferBlock(self.current_bblock.locator,
+# self.current_bblock.locator_list_entry.range_start,
+# starting_size=write_total)
+# for t in bufferblock_segs:
+# t_start = t[LOCATOR] - self.current_bblock.locator_list_entry.range_start
+# t_end = t_start + t.range_size
+# t[0] = self.current_bblock.locator_list_entry.range_start + new_bb.write_pointer
+# new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
+
+# self.current_bblock = new_bb
+# self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+# self._data_locators[-1] = self.current_bblock.locator_list_entry
+# self._files.values()[0].segments = tmp_segs
+
+
+ def writeto(self, offset, data, num_retries):
+ if len(data) == 0:
+ return
+
+ if offset > self.size():
+ raise ArgumentError("Offset is past the end of the file")
+
+ if len(data) > config.KEEP_BLOCK_SIZE:
+ raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+
+ self._modified = True
+
+ if self._current_bblock is None:
+ self._init_bufferblock()
+
+ if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+ self._repack_writes()
+ if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+ self._init_bufferblock()
+
+ self._current_bblock.append(data)
+ replace_range(self._segments, offset, len(data), self._current_bblock.locator, self._current_bblock.write_pointer - len(data))
def add_segment(self, blocks, pos, size):
+ self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
- last = self.segments[-1] if self.segments else Range(0, 0, 0)
- r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size)
- r.block_size = lr.block_size
- r.segment_offset = lr.segment_offset
- self.segments.append(r)
+ last = self._segments[-1] if self._segments else Range(0, 0, 0)
+ r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
+ self._segments.append(r)
+
+ def size(self):
+ if self._segments:
+ n = self._segments[-1]
+ return n.range_start + n.range_size
+ else:
+ return 0
class ArvadosFileReader(ArvadosFileReaderBase):
- def __init__(self, arvadosfile, name, mode='rb'):
- super(ArvadosFileReader, self).__init__(name)
+ def __init__(self, arvadosfile, name, mode="r", num_retries=None):
+ super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
self.arvadosfile = arvadosfile
def size(self):
- n = self.segments[-1]
- return n.range_start + n.range_size
+ return self.arvadosfile.size()
@ArvadosFileBase._before_close
@retry_method
def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
- if size == 0:
- return ''
-
- data = self.arvadosfile.readfrom(self._filepos, size)
+ data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
self._filepos += len(data)
return data
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readfrom(self, offset, size, num_retries=None):
+ """Read up to 'size' bytes from the stream, starting at the current file position"""
+ return self.arvadosfile.readfrom(offset, size, num_retries)
+
+ def flush(self):
+ pass
class ArvadosFileWriter(ArvadosFileReader):
- def __init__(self, arvadosfile, name):
- super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode='wb')
+ def __init__(self, arvadosfile, name, mode, num_retries=None):
+ super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
- def write(self, data):
- self.arvadosfile.writeto(self._filepos, data)
- self._filepos += len(data)
+ @ArvadosFileBase._before_close
+ @retry_method
+ def write(self, data, num_retries=None):
+ if self.mode[0] == "a":
+ self.arvadosfile.writeto(self.size(), data)
+ else:
+ self.arvadosfile.writeto(self._filepos, data, num_retries)
+ self._filepos += len(data)
- def writelines(self, seq):
+ @ArvadosFileBase._before_close
+ @retry_method
+ def writelines(self, seq, num_retries=None):
for s in seq:
self.write(s)
@@ -290,3 +427,5 @@ class ArvadosFileWriter(ArvadosFileReader):
if size is None:
size = self._filepos
self.arvadosfile.truncate(size)
+ if self._filepos > self.size():
+ self._filepos = self.size()
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index b5a9ef9..7660162 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -6,10 +6,10 @@ import re
from collections import deque
from stat import *
-from .arvfile import ArvadosFileBase, split, ArvadosFile
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader
from keep import *
-from .stream import StreamReader, normalize_stream
-from .ranges import Range
+from .stream import StreamReader, normalize_stream, locator_block_size
+from .ranges import Range, LocatorAndRange
import config
import errors
import util
@@ -640,10 +640,11 @@ class ResumableCollectionWriter(CollectionWriter):
class Collection(object):
- def __init__(self):
+ def __init__(self, keep=None):
self.items = {}
+ self.keep = keep
- def find_or_create(self, path):
+ def find(self, path, create=False):
p = path.split("/")
if p[0] == '.':
del p[0]
@@ -652,24 +653,56 @@ class Collection(object):
item = self.items.get(p[0])
if len(p) == 1:
# item must be a file
- if item is None:
+ if item is None and create:
# create new file
- item = ArvadosFile(p[0], 'wb', [], [])
+ item = ArvadosFile(keep=self.keep)
self.items[p[0]] = item
return item
else:
- if item is None:
+ if item is None and create:
# create new collection
item = Collection()
self.items[p[0]] = item
del p[0]
- return item.find_or_create("/".join(p))
+ return item.find("/".join(p), create=create)
else:
return self
+ def open(self, path, mode):
+ mode = mode.replace("b", "")
+ if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
+ raise ArgumentError("Bad mode '%s'" % mode)
+ create = (mode != "r")
-def import_manifest(manifest_text):
- c = Collection()
+ f = self.find(path, create=create)
+ if f is None:
+ raise ArgumentError("File not found")
+ if not isinstance(f, ArvadosFile):
+ raise ArgumentError("Path must refer to a file.")
+
+ if mode[0] == "w":
+ f.truncate(0)
+
+ if mode == "r":
+ return ArvadosFileReader(f, path, mode)
+ else:
+ return ArvadosFileWriter(f, path, mode)
+
+ def modified(self):
+ for k,v in self.items.items():
+ if v.modified():
+ return True
+ return False
+
+ def set_unmodified(self):
+ for k,v in self.items.items():
+ v.set_unmodified()
+
+def import_manifest(manifest_text, keep=None):
+ c = Collection(keep=keep)
+
+ if manifest_text[-1] != "\n":
+ manifest_text += "\n"
STREAM_NAME = 0
BLOCKS = 1
@@ -681,6 +714,7 @@ def import_manifest(manifest_text):
for n in re.finditer(r'([^ \n]+)([ \n])', manifest_text):
tok = n.group(1)
sep = n.group(2)
+
if state == STREAM_NAME:
# starting a new stream
stream_name = tok.replace('\\040', ' ')
@@ -705,7 +739,7 @@ def import_manifest(manifest_text):
pos = long(s.group(1))
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
- f = c.find_or_create("%s/%s" % (stream_name, name))
+ f = c.find("%s/%s" % (stream_name, name), create=True)
f.add_segment(blocks, pos, size)
else:
# error!
@@ -715,6 +749,7 @@ def import_manifest(manifest_text):
stream_name = None
state = STREAM_NAME
+ c.set_unmodified()
return c
def export_manifest(item, stream_name="."):
@@ -722,15 +757,16 @@ def export_manifest(item, stream_name="."):
if isinstance(item, Collection):
stream = {}
for k,v in item.items.items():
- if isinstance(item, Collection):
+ if isinstance(v, Collection):
buf += export_manifest(v, stream_name)
- else:
- if isinstance(item, ArvadosFile):
- buf += str(item.segments)
- #stream[k] = [[s.locator, s[4], s[], s[]] for s in item.segments]
- else:
- buf += stream_name
- buf += " "
- buf += str(item.segments)
- buf += "\n"
+ elif isinstance(v, ArvadosFile):
+ st = []
+ for s in v._segments:
+ loc = s.locator
+ if loc.startswith("bufferblock"):
+ loc = v._bufferblocks[loc].calculate_locator()
+ st.append(LocatorAndRange(loc, locator_block_size(loc),
+ s.segment_offset, s.range_size))
+ stream[k] = st
+ buf += ' '.join(normalize_stream(stream_name, stream)) + "\n"
return buf
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
index acc1c50..8c377e2 100644
--- a/sdk/python/arvados/ranges.py
+++ b/sdk/python/arvados/ranges.py
@@ -1,11 +1,12 @@
class Range(object):
- def __init__(self, locator, range_start, range_size):
+ def __init__(self, locator, range_start, range_size, segment_offset=0):
self.locator = locator
self.range_start = range_start
self.range_size = range_size
+ self.segment_offset = segment_offset
def __repr__(self):
- return "[\"%s\", %i, %i]" % (self.locator, self.range_size, self.range_start)
+ return "[\"%s\", %i, %i, %i]" % (self.locator, self.range_start, self.range_size, self.segment_offset)
def first_block(data_locators, range_start, range_size, debug=False):
block_start = 0L
@@ -93,21 +94,21 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
if range_start >= block_start and range_end <= block_end:
# range starts and ends in this block
- resp.append(LocatorAndRange(dl.locator, block_size, range_start - block_start, range_size))
+ resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), range_size))
elif range_start >= block_start and range_end > block_end:
# range starts in this block
- resp.append(LocatorAndRange(dl.locator, block_size, range_start - block_start, block_end - range_start))
+ resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), block_end - range_start))
elif range_start < block_start and range_end > block_end:
# range starts in a previous block and extends to further blocks
- resp.append(LocatorAndRange(dl.locator, block_size, 0L, block_size))
+ resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, block_size))
elif range_start < block_start and range_end <= block_end:
# range starts in a previous block and ends in this block
- resp.append(LocatorAndRange(dl.locator, block_size, 0L, range_end - block_start))
+ resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, range_end - block_start))
block_start = block_end
i += 1
return resp
-def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset, debug=False):
'''
Replace a file segment range with a new segment.
data_locators: list of Range objects, assumes that segments are in order and contigous
@@ -116,64 +117,71 @@ def replace_range(data_locators, range_start, range_size, new_locator, debug=Fal
new_locator: locator for new segment to be inserted
!!! data_locators will be updated in place !!!
'''
- if range_size == 0:
+ if new_range_size == 0:
return
- range_start = long(range_start)
- range_size = long(range_size)
- range_end = range_start + range_size
+ new_range_start = long(new_range_start)
+ new_range_size = long(new_range_size)
+ new_range_end = new_range_start + new_range_size
+
+ if len(data_locators) == 0:
+ data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
+ return
last = data_locators[-1]
- if (last.range_start+last.range_size) == range_start:
- # extend last segment
- last.range_size += range_size
+ if (last.range_start+last.range_size) == new_range_start:
+ if last.locator == new_locator:
+ # extend last segment
+ last.range_size += new_range_size
+ else:
+ data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
return
- i = first_block(data_locators, range_start, range_size, debug)
+ i = first_block(data_locators, new_range_start, new_range_size, debug)
if i is None:
return
while i < len(data_locators):
- locator, segment_size, segment_start = data_locators[i]
- segment_end = segment_start + segment_size
+ dl = data_locators[i]
+ old_segment_start = dl.range_start
+ old_segment_end = old_segment_start + dl.range_size
if debug:
- print locator, "range_start", range_start, "segment_start", segment_start, "range_end", range_end, "segment_end", segment_end
- if range_end <= segment_start:
+ print locator, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end
+ if new_range_end <= old_segment_start:
# range ends before this segment starts, so don't look at any more locators
break
- #if range_start >= segment_end:
+ #if range_start >= old_segment_end:
# range starts after this segment ends, so go to next segment
# we should always start at the first segment due to the binary above, so this test is redundant
#next
- if range_start >= segment_start and range_end <= segment_end:
- # range starts and ends in this segment
- # split segment into 3 pieces
- if (range_start-segment_start) > 0:
- data_locators[i] = [locator, (range_start-segment_start), segment_start]
- data_locators.insert(i+1, [new_locator, range_size, range_start])
+ if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
+ # new range starts and ends in old segment
+ # split segment into up to 3 pieces
+ if (new_range_start-old_segment_start) > 0:
+ data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
+ data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
else:
- data_locators[i] = [new_locator, range_size, range_start]
+ data_locators[i] = Range(new_locator, new_range_start, new_range_size, new_segment_offset)
i -= 1
- if (segment_end-range_end) > 0:
- data_locators.insert(i+2, [(locator + (range_start-segment_start) + range_size), (segment_end-range_end), range_end])
+ if (old_segment_end-new_range_end) > 0:
+ data_locators.insert(i+2, Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_start-old_segment_start) + new_range_size))
return
- elif range_start >= segment_start and range_end > segment_end:
+ elif old_segment_start <= new_range_start and new_range_end > old_segment_end:
# range starts in this segment
# split segment into 2 pieces
- data_locators[i] = [locator, (range_start-segment_start), segment_start]
- data_locators.insert(i+1, [new_locator, range_size, range_start])
+ data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
+ data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
i += 1
- elif range_start < segment_start and range_end > segment_end:
+ elif new_range_start < old_segment_start and new_range_end >= old_segment_end:
# range starts in a previous segment and extends to further segments
# delete this segment
del data_locators[i]
i -= 1
- elif range_start < segment_start and range_end <= segment_end:
+ elif new_range_start < old_segment_start and new_range_end < old_segment_end:
# range starts in a previous segment and ends in this segment
# move the starting point of this segment up, and shrink it.
- data_locators[i] = [locator+(range_end-segment_start), (segment_end-range_end), range_end]
+ data_locators[i] = Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_end-old_segment_start))
return
- segment_start = segment_end
i += 1
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index d28efcf..a7e3be3 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -13,6 +13,10 @@ from keep import *
import config
import errors
+def locator_block_size(loc):
+ s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', loc)
+ return long(s.group(1))
+
def normalize_stream(s, stream):
'''
s is the stream name
@@ -31,7 +35,7 @@ def normalize_stream(s, stream):
if b.locator not in blocks:
stream_tokens.append(b.locator)
blocks[b.locator] = streamoffset
- streamoffset += b.block_size
+ streamoffset += locator_block_size(b.locator)
# Add the empty block if the stream is otherwise empty.
if len(stream_tokens) == 1:
@@ -154,23 +158,6 @@ class StreamReader(object):
return ' '.join(manifest_text) + '\n'
-class BufferBlock(object):
- def __init__(self, locator, streamoffset, starting_size=2**16):
- self.locator = locator
- self.buffer_block = bytearray(starting_size)
- self.buffer_view = memoryview(self.buffer_block)
- self.write_pointer = 0
- self.locator_list_entry = [locator, 0, streamoffset]
-
- def append(self, data):
- while (self.write_pointer+len(data)) > len(self.buffer_block):
- new_buffer_block = bytearray(len(self.buffer_block) * 2)
- new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
- self.buffer_block = new_buffer_block
- self.buffer_view = memoryview(self.buffer_block)
- self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
- self.write_pointer += len(data)
- self.locator_list_entry[1] = self.write_pointer
# class StreamWriter(StreamReader):
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 5ed8716..65e06fb 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -813,7 +813,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
"""
- print arvados.export_manifest(arvados.import_manifest(m1))
+
+ self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.import_manifest(m1)))
if __name__ == '__main__':
unittest.main()
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 2f4fc70..dfff394 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -272,151 +272,6 @@ class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
class StreamFileReadlinesTestCase(StreamFileReadTestCase):
def read_for_test(self, reader, byte_count, **kwargs):
return ''.join(reader.readlines(**kwargs))
-
-# class StreamWriterTestCase(unittest.TestCase):
-# class MockKeep(object):
-# def __init__(self, blocks):
-# self.blocks = blocks
-# def get(self, locator, num_retries=0):
-# return self.blocks[locator]
-# def put(self, data):
-# pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
-# self.blocks[pdh] = str(data)
-# return pdh
-
-# def test_init(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# self.assertEqual("01234", stream.readfrom(0, 5))
-
-# def test_append(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# self.assertEqual("56789", stream.readfrom(5, 8))
-# stream.append("foo")
-# self.assertEqual("56789foo", stream.readfrom(5, 8))
-
-
-# class StreamFileWriterTestCase(unittest.TestCase):
-# def test_truncate(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# self.assertEqual("56789", writer.readfrom(5, 8))
-# writer.truncate(8)
-# self.assertEqual("567", writer.readfrom(5, 8))
-
-# def test_append(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# self.assertEqual("56789", writer.readfrom(5, 8))
-# writer.seek(10)
-# writer.write("foo")
-# self.assertEqual(writer.size(), 13)
-# self.assertEqual("56789foo", writer.readfrom(5, 8))
-
-# def test_write0(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# self.assertEqual("0123456789", writer.readfrom(0, 13))
-# writer.seek(0)
-# writer.write("foo")
-# self.assertEqual(writer.size(), 10)
-# self.assertEqual("foo3456789", writer.readfrom(0, 13))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 10:3:count.txt 3:7:count.txt\n", stream.manifest_text())
-
-# def test_write1(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# self.assertEqual("0123456789", writer.readfrom(0, 13))
-# writer.seek(3)
-# writer.write("foo")
-# self.assertEqual(writer.size(), 10)
-# self.assertEqual("012foo6789", writer.readfrom(0, 13))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", stream.manifest_text())
-
-# def test_write2(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# self.assertEqual("0123456789", writer.readfrom(0, 13))
-# writer.seek(7)
-# writer.write("foo")
-# self.assertEqual(writer.size(), 10)
-# self.assertEqual("0123456foo", writer.readfrom(0, 13))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:3:count.txt\n", stream.manifest_text())
-
-# def test_write3(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# self.assertEqual("012345678901234", writer.readfrom(0, 15))
-# writer.seek(7)
-# writer.write("foobar")
-# self.assertEqual(writer.size(), 20)
-# self.assertEqual("0123456foobar34", writer.readfrom(0, 15))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", stream.manifest_text())
-
-# def test_write4(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:4:count.txt', '0:4:count.txt', '0:4:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# self.assertEqual("012301230123", writer.readfrom(0, 15))
-# writer.seek(2)
-# writer.write("abcdefg")
-# self.assertEqual(writer.size(), 12)
-# self.assertEqual("01abcdefg123", writer.readfrom(0, 15))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", stream.manifest_text())
-
-# def test_write_large(self):
-# stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({}))
-# writer = stream.files()["count.txt"]
-# text = ''.join(["0123456789" for a in xrange(0, 100)])
-# for b in xrange(0, 100000):
-# writer.write(text)
-# self.assertEqual(writer.size(), 100000000)
-# stream.commit()
-# self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", stream.manifest_text())
-
-# def test_write_rewrite0(self):
-# stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({}))
-# writer = stream.files()["count.txt"]
-# for b in xrange(0, 10):
-# writer.seek(0, os.SEEK_SET)
-# writer.write("0123456789")
-# stream.commit()
-# self.assertEqual(writer.size(), 10)
-# self.assertEqual("0123456789", writer.readfrom(0, 20))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", stream.manifest_text())
-
-# def test_write_rewrite1(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# for b in xrange(0, 10):
-# writer.seek(10, os.SEEK_SET)
-# writer.write("abcdefghij")
-# stream.commit()
-# self.assertEqual(writer.size(), 20)
-# self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:10:count.txt 10:10:count.txt\n", stream.manifest_text())
-
-# def test_write_rewrite2(self):
-# stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-# keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-# writer = stream.files()["count.txt"]
-# for b in xrange(0, 10):
-# writer.seek(5, os.SEEK_SET)
-# writer.write("abcdefghij")
-# stream.commit()
-# self.assertEqual(writer.size(), 15)
-# self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
-# self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", stream.manifest_text())
-
+o
if __name__ == '__main__':
unittest.main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list