[ARVADOS] updated: e78607008c39aa88ccf6e95d7c6dfcc20a52a2ed

git at public.curoverse.com git at public.curoverse.com
Fri Dec 19 16:06:18 EST 2014


Summary of changes:
 sdk/python/arvados/arvfile.py        | 247 +++++++++++++++++++++++++++--------
 sdk/python/arvados/collection.py     |  80 ++++++++----
 sdk/python/arvados/ranges.py         |  80 +++++++-----
 sdk/python/arvados/stream.py         |  23 +---
 sdk/python/tests/test_collections.py |   3 +-
 sdk/python/tests/test_stream.py      | 147 +--------------------
 6 files changed, 303 insertions(+), 277 deletions(-)

       via  e78607008c39aa88ccf6e95d7c6dfcc20a52a2ed (commit)
      from  93bc7c31775039cfb05be4caa0891f13fa49409f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e78607008c39aa88ccf6e95d7c6dfcc20a52a2ed
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Dec 19 16:07:30 2014 -0500

    3198: More refactoring and bug/test fixing

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ca24990..d2b174f 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -4,6 +4,8 @@ import zlib
 import bz2
 from .ranges import *
 from arvados.retry import retry_method
+import config
+import hashlib
 
 def split(path):
     """split(path) -> streamname, filename
@@ -202,87 +204,222 @@ class StreamFileReader(ArvadosFileReaderBase):
         return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
 
 
+class BufferBlock(object):
+    def __init__(self, locator, starting_size=2**16):
+        self.locator = locator
+        self.buffer_block = bytearray(starting_size)
+        self.buffer_view = memoryview(self.buffer_block)
+        self.write_pointer = 0
+
+    def append(self, data):
+        while (self.write_pointer+len(data)) > len(self.buffer_block):
+            new_buffer_block = bytearray(len(self.buffer_block) * 2)
+            new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
+            self.buffer_block = new_buffer_block
+            self.buffer_view = memoryview(self.buffer_block)
+        self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
+        self.write_pointer += len(data)
+
+    def calculate_locator(self):
+        return "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.write_pointer)
+
+
 class ArvadosFile(object):
-    def __init__(self, stream, segments):
-        # TODO: build segments list
-        self.segments = []
+    def __init__(self, stream=[], segments=[], keep=None):
+        '''
+        stream: a list of Range objects representing a block stream
+        segments: a list of Range objects representing segments
+        '''
+        self._modified = True
+        self._segments = []
+        for s in segments:
+            self.add_segment(stream, s.range_start, s.range_size)
+        self._current_bblock = None
+        self._bufferblocks = None
+        self._keep = keep
+
+    def set_unmodified(self):
+        self._modified = False
+
+    def modified(self):
+        return self._modified
 
     def truncate(self, size):
-        pass
-        # TODO: fixme
-
-        # segs = locators_and_ranges(self.segments, 0, size)
-
-        # newstream = []
-        # self.segments = []
-        # streamoffset = 0L
-        # fileoffset = 0L
-
-        # for seg in segs:
-        #     for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg.locator+seg.range_start, seg[SEGMENTSIZE]):
-        #         newstream.append([locator, blocksize, streamoffset])
-        #         self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
-        #         streamoffset += blocksize
-        #         fileoffset += segmentsize
-        # if len(newstream) == 0:
-        #     newstream.append(config.EMPTY_BLOCK_LOCATOR)
-        #     self.segments.append([0, 0, 0])
-        # self._stream._data_locators = newstream
-        # if self._filepos > fileoffset:
-        #     self._filepos = fileoffset
-
-    def readfrom(self, offset, data):
-        pass
+        new_segs = []
+        for r in self._segments:
+            range_end = r.range_start+r.range_size
+            if r.range_start >= size:
+                # segment is past the trucate size, all done
+                break
+            elif size < range_end:
+                nr = Range(r.locator, r.range_start, size - r.range_start)
+                nr.segment_offset = r.segment_offset
+                new_segs.append(nr)
+                break
+            else:
+                new_segs.append(r)
 
-    def writeto(self, offset, data):
-        if offset > self._size():
-            raise ArgumentError("Offset is past the end of the file")
-        # TODO: fixme
-        # self._stream._append(data)
-        # replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
+        self._segments = new_segs
+        self._modified = True
 
-    def flush(self):
+    def _keepget(self, locator, num_retries):
+        if self._bufferblocks and locator in self._bufferblocks:
+            bb = self._bufferblocks[locator]
+            return bb.buffer_view[0:bb.write_pointer].tobytes()
+        else:
+            return self._keep.get(locator, num_retries=num_retries)
+
+    def readfrom(self, offset, size, num_retries):
+        if size == 0 or offset >= self.size():
+            return ''
+        if self._keep is None:
+            self._keep = KeepClient(num_retries=num_retries)
+        data = []
+        # TODO: initiate prefetch on all blocks in the range (offset, offset + size + config.KEEP_BLOCK_SIZE)
+
+        for lr in locators_and_ranges(self._segments, offset, size):
+            # TODO: if data is empty, wait on block get, otherwise only
+            # get more data if the block is already in the cache.
+            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+        return ''.join(data)
+
+    def _init_bufferblock(self):
+        if self._bufferblocks is None:
+            self._bufferblocks = {}
+        self._current_bblock = BufferBlock("bufferblock%i" % len(self._bufferblocks))
+        self._bufferblocks[self._current_bblock.locator] = self._current_bblock
+
+    def _repack_writes(self):
         pass
+         # TODO: fixme
+#         '''Test if the buffer block has more data than is referenced by actual segments
+#         (this happens when a buffered write over-writes a file range written in
+#         a previous buffered write).  Re-pack the buffer block for efficiency
+#         and to avoid leaking information.
+#         '''
+#         segs = self._files.values()[0].segments
+
+#         bufferblock_segs = []
+#         i = 0
+#         tmp_segs = copy.copy(segs)
+#         while i < len(tmp_segs):
+#             # Go through each segment and identify segments that include the buffer block
+#             s = tmp_segs[i]
+#             if s[LOCATOR] < self.current_bblock.locator_list_entry.range_start and (s[LOCATOR] + s.range_size) > self.current_bblock.locator_list_entry.range_start:
+#                 # The segment straddles the previous block and the current buffer block.  Split the segment.
+#                 b1 = self.current_bblock.locator_list_entry.range_start - s[LOCATOR]
+#                 b2 = (s[LOCATOR] + s.range_size) - self.current_bblock.locator_list_entry.range_start
+#                 bb_seg = [self.current_bblock.locator_list_entry.range_start, b2, s.range_start+b1]
+#                 tmp_segs[i] = [s[LOCATOR], b1, s.range_start]
+#                 tmp_segs.insert(i+1, bb_seg)
+#                 bufferblock_segs.append(bb_seg)
+#                 i += 1
+#             elif s[LOCATOR] >= self.current_bblock.locator_list_entry.range_start:
+#                 # The segment's data is in the buffer block.
+#                 bufferblock_segs.append(s)
+#             i += 1
+
+#         # Now sum up the segments to get the total bytes
+#         # of the file referencing into the buffer block.
+#         write_total = sum([s.range_size for s in bufferblock_segs])
+
+#         if write_total < self.current_bblock.locator_list_entry.range_size:
+#             # There is more data in the buffer block than is actually accounted for by segments, so
+#             # re-pack into a new buffer by copying over to a new buffer block.
+#             new_bb = BufferBlock(self.current_bblock.locator,
+#                                  self.current_bblock.locator_list_entry.range_start,
+#                                  starting_size=write_total)
+#             for t in bufferblock_segs:
+#                 t_start = t[LOCATOR] - self.current_bblock.locator_list_entry.range_start
+#                 t_end = t_start + t.range_size
+#                 t[0] = self.current_bblock.locator_list_entry.range_start + new_bb.write_pointer
+#                 new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
+
+#             self.current_bblock = new_bb
+#             self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+#             self._data_locators[-1] = self.current_bblock.locator_list_entry
+#             self._files.values()[0].segments = tmp_segs
+
+
+    def writeto(self, offset, data, num_retries):
+        if len(data) == 0:
+            return
+
+        if offset > self.size():
+            raise ArgumentError("Offset is past the end of the file")
+
+        if len(data) > config.KEEP_BLOCK_SIZE:
+            raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+
+        self._modified = True
+
+        if self._current_bblock is None:
+            self._init_bufferblock()
+
+        if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+            self._repack_writes()
+            if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+                self._init_bufferblock()
+
+        self._current_bblock.append(data)
+        replace_range(self._segments, offset, len(data), self._current_bblock.locator, self._current_bblock.write_pointer - len(data))
 
     def add_segment(self, blocks, pos, size):
+        self._modified = True
         for lr in locators_and_ranges(blocks, pos, size):
-            last = self.segments[-1] if self.segments else Range(0, 0, 0)
-            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size)
-            r.block_size = lr.block_size
-            r.segment_offset = lr.segment_offset
-            self.segments.append(r)
+            last = self._segments[-1] if self._segments else Range(0, 0, 0)
+            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
+            self._segments.append(r)
+
+    def size(self):
+        if self._segments:
+            n = self._segments[-1]
+            return n.range_start + n.range_size
+        else:
+            return 0
 
 
 class ArvadosFileReader(ArvadosFileReaderBase):
-    def __init__(self, arvadosfile, name, mode='rb'):
-        super(ArvadosFileReader, self).__init__(name)
+    def __init__(self, arvadosfile, name, mode="r", num_retries=None):
+        super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
         self.arvadosfile = arvadosfile
 
     def size(self):
-        n = self.segments[-1]
-        return n.range_start + n.range_size
+        return self.arvadosfile.size()
 
     @ArvadosFileBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at the current file position"""
-        if size == 0:
-            return ''
-
-        data = self.arvadosfile.readfrom(self._filepos, size)
+        data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
         self._filepos += len(data)
         return data
 
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readfrom(self, offset, size, num_retries=None):
+        """Read up to 'size' bytes from the stream, starting at the current file position"""
+        return self.arvadosfile.readfrom(offset, size, num_retries)
+
+    def flush(self):
+        pass
 
 class ArvadosFileWriter(ArvadosFileReader):
-    def __init__(self, arvadosfile, name):
-        super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode='wb')
+    def __init__(self, arvadosfile, name, mode, num_retries=None):
+        super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
 
-    def write(self, data):
-        self.arvadosfile.writeto(self._filepos, data)
-        self._filepos += len(data)
+    @ArvadosFileBase._before_close
+    @retry_method
+    def write(self, data, num_retries=None):
+        if self.mode[0] == "a":
+            self.arvadosfile.writeto(self.size(), data)
+        else:
+            self.arvadosfile.writeto(self._filepos, data, num_retries)
+            self._filepos += len(data)
 
-    def writelines(self, seq):
+    @ArvadosFileBase._before_close
+    @retry_method
+    def writelines(self, seq, num_retries=None):
         for s in seq:
             self.write(s)
 
@@ -290,3 +427,5 @@ class ArvadosFileWriter(ArvadosFileReader):
         if size is None:
             size = self._filepos
         self.arvadosfile.truncate(size)
+        if self._filepos > self.size():
+            self._filepos = self.size()
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index b5a9ef9..7660162 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -6,10 +6,10 @@ import re
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader
 from keep import *
-from .stream import StreamReader, normalize_stream
-from .ranges import Range
+from .stream import StreamReader, normalize_stream, locator_block_size
+from .ranges import Range, LocatorAndRange
 import config
 import errors
 import util
@@ -640,10 +640,11 @@ class ResumableCollectionWriter(CollectionWriter):
 
 
 class Collection(object):
-    def __init__(self):
+    def __init__(self, keep=None):
         self.items = {}
+        self.keep = keep
 
-    def find_or_create(self, path):
+    def find(self, path, create=False):
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -652,24 +653,56 @@ class Collection(object):
             item = self.items.get(p[0])
             if len(p) == 1:
                 # item must be a file
-                if item is None:
+                if item is None and create:
                     # create new file
-                    item = ArvadosFile(p[0], 'wb', [], [])
+                    item = ArvadosFile(keep=self.keep)
                     self.items[p[0]] = item
                 return item
             else:
-                if item is None:
+                if item is None and create:
                     # create new collection
                     item = Collection()
                     self.items[p[0]] = item
                 del p[0]
-                return item.find_or_create("/".join(p))
+                return item.find("/".join(p), create=create)
         else:
             return self
 
+    def open(self, path, mode):
+        mode = mode.replace("b", "")
+        if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
+            raise ArgumentError("Bad mode '%s'" % mode)
+        create = (mode != "r")
 
-def import_manifest(manifest_text):
-    c = Collection()
+        f = self.find(path, create=create)
+        if f is None:
+            raise ArgumentError("File not found")
+        if not isinstance(f, ArvadosFile):
+            raise ArgumentError("Path must refer to a file.")
+
+        if mode[0] == "w":
+            f.truncate(0)
+
+        if mode == "r":
+            return ArvadosFileReader(f, path, mode)
+        else:
+            return ArvadosFileWriter(f, path, mode)
+
+    def modified(self):
+        for k,v in self.items.items():
+            if v.modified():
+                return True
+        return False
+
+    def set_unmodified(self):
+        for k,v in self.items.items():
+            v.set_unmodified()
+
+def import_manifest(manifest_text, keep=None):
+    c = Collection(keep=keep)
+
+    if manifest_text[-1] != "\n":
+        manifest_text += "\n"
 
     STREAM_NAME = 0
     BLOCKS = 1
@@ -681,6 +714,7 @@ def import_manifest(manifest_text):
     for n in re.finditer(r'([^ \n]+)([ \n])', manifest_text):
         tok = n.group(1)
         sep = n.group(2)
+
         if state == STREAM_NAME:
             # starting a new stream
             stream_name = tok.replace('\\040', ' ')
@@ -705,7 +739,7 @@ def import_manifest(manifest_text):
                 pos = long(s.group(1))
                 size = long(s.group(2))
                 name = s.group(3).replace('\\040', ' ')
-                f = c.find_or_create("%s/%s" % (stream_name, name))
+                f = c.find("%s/%s" % (stream_name, name), create=True)
                 f.add_segment(blocks, pos, size)
             else:
                 # error!
@@ -715,6 +749,7 @@ def import_manifest(manifest_text):
             stream_name = None
             state = STREAM_NAME
 
+    c.set_unmodified()
     return c
 
 def export_manifest(item, stream_name="."):
@@ -722,15 +757,16 @@ def export_manifest(item, stream_name="."):
     if isinstance(item, Collection):
         stream = {}
         for k,v in item.items.items():
-            if isinstance(item, Collection):
+            if isinstance(v, Collection):
                 buf += export_manifest(v, stream_name)
-            else:
-                if isinstance(item, ArvadosFile):
-                    buf += str(item.segments)
-                    #stream[k] = [[s.locator, s[4], s[], s[]] for s in item.segments]
-    else:
-        buf += stream_name
-        buf += " "
-        buf += str(item.segments)
-        buf += "\n"
+            elif isinstance(v, ArvadosFile):
+                st = []
+                for s in v._segments:
+                    loc = s.locator
+                    if loc.startswith("bufferblock"):
+                        loc = v._bufferblocks[loc].calculate_locator()
+                    st.append(LocatorAndRange(loc, locator_block_size(loc),
+                                         s.segment_offset, s.range_size))
+                stream[k] = st
+        buf += ' '.join(normalize_stream(stream_name, stream)) + "\n"
     return buf
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
index acc1c50..8c377e2 100644
--- a/sdk/python/arvados/ranges.py
+++ b/sdk/python/arvados/ranges.py
@@ -1,11 +1,12 @@
 class Range(object):
-    def __init__(self, locator, range_start, range_size):
+    def __init__(self, locator, range_start, range_size, segment_offset=0):
         self.locator = locator
         self.range_start = range_start
         self.range_size = range_size
+        self.segment_offset = segment_offset
 
     def __repr__(self):
-        return "[\"%s\", %i, %i]" % (self.locator, self.range_size, self.range_start)
+        return "[\"%s\", %i, %i, %i]" % (self.locator, self.range_start, self.range_size, self.segment_offset)
 
 def first_block(data_locators, range_start, range_size, debug=False):
     block_start = 0L
@@ -93,21 +94,21 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
 
         if range_start >= block_start and range_end <= block_end:
             # range starts and ends in this block
-            resp.append(LocatorAndRange(dl.locator, block_size, range_start - block_start, range_size))
+            resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), range_size))
         elif range_start >= block_start and range_end > block_end:
             # range starts in this block
-            resp.append(LocatorAndRange(dl.locator, block_size, range_start - block_start, block_end - range_start))
+            resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset + (range_start - block_start), block_end - range_start))
         elif range_start < block_start and range_end > block_end:
             # range starts in a previous block and extends to further blocks
-            resp.append(LocatorAndRange(dl.locator, block_size, 0L, block_size))
+            resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, block_size))
         elif range_start < block_start and range_end <= block_end:
             # range starts in a previous block and ends in this block
-            resp.append(LocatorAndRange(dl.locator, block_size, 0L, range_end - block_start))
+            resp.append(LocatorAndRange(dl.locator, block_size, dl.segment_offset, range_end - block_start))
         block_start = block_end
         i += 1
     return resp
 
-def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset, debug=False):
     '''
     Replace a file segment range with a new segment.
     data_locators: list of Range objects, assumes that segments are in order and contigous
@@ -116,64 +117,71 @@ def replace_range(data_locators, range_start, range_size, new_locator, debug=Fal
     new_locator: locator for new segment to be inserted
     !!! data_locators will be updated in place !!!
     '''
-    if range_size == 0:
+    if new_range_size == 0:
         return
 
-    range_start = long(range_start)
-    range_size = long(range_size)
-    range_end = range_start + range_size
+    new_range_start = long(new_range_start)
+    new_range_size = long(new_range_size)
+    new_range_end = new_range_start + new_range_size
+
+    if len(data_locators) == 0:
+        data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
+        return
 
     last = data_locators[-1]
-    if (last.range_start+last.range_size) == range_start:
-        # extend last segment
-        last.range_size += range_size
+    if (last.range_start+last.range_size) == new_range_start:
+        if last.locator == new_locator:
+            # extend last segment
+            last.range_size += new_range_size
+        else:
+            data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
         return
 
-    i = first_block(data_locators, range_start, range_size, debug)
+    i = first_block(data_locators, new_range_start, new_range_size, debug)
     if i is None:
         return
 
     while i < len(data_locators):
-        locator, segment_size, segment_start = data_locators[i]
-        segment_end = segment_start + segment_size
+        dl = data_locators[i]
+        old_segment_start = dl.range_start
+        old_segment_end = old_segment_start + dl.range_size
         if debug:
-            print locator, "range_start", range_start, "segment_start", segment_start, "range_end", range_end, "segment_end", segment_end
-        if range_end <= segment_start:
+            print locator, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end
+        if new_range_end <= old_segment_start:
             # range ends before this segment starts, so don't look at any more locators
             break
 
-        #if range_start >= segment_end:
+        #if range_start >= old_segment_end:
             # range starts after this segment ends, so go to next segment
             # we should always start at the first segment due to the binary above, so this test is redundant
             #next
 
-        if range_start >= segment_start and range_end <= segment_end:
-            # range starts and ends in this segment
-            # split segment into 3 pieces
-            if (range_start-segment_start) > 0:
-                data_locators[i] = [locator, (range_start-segment_start), segment_start]
-                data_locators.insert(i+1, [new_locator, range_size, range_start])
+        if  old_segment_start <= new_range_start and new_range_end <= old_segment_end:
+            # new range starts and ends in old segment
+            # split segment into up to 3 pieces
+            if (new_range_start-old_segment_start) > 0:
+                data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
+                data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
             else:
-                data_locators[i] = [new_locator, range_size, range_start]
+                data_locators[i] = Range(new_locator, new_range_start, new_range_size, new_segment_offset)
                 i -= 1
-            if (segment_end-range_end) > 0:
-                data_locators.insert(i+2, [(locator + (range_start-segment_start) + range_size), (segment_end-range_end), range_end])
+            if (old_segment_end-new_range_end) > 0:
+                data_locators.insert(i+2, Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_start-old_segment_start) + new_range_size))
             return
-        elif range_start >= segment_start and range_end > segment_end:
+        elif old_segment_start <= new_range_start and new_range_end > old_segment_end:
             # range starts in this segment
             # split segment into 2 pieces
-            data_locators[i] = [locator, (range_start-segment_start), segment_start]
-            data_locators.insert(i+1, [new_locator, range_size, range_start])
+            data_locators[i] = Range(dl.locator, old_segment_start, (new_range_start-old_segment_start), dl.segment_offset)
+            data_locators.insert(i+1, Range(new_locator, new_range_start, new_range_size, new_segment_offset))
             i += 1
-        elif range_start < segment_start and range_end > segment_end:
+        elif new_range_start < old_segment_start and new_range_end >= old_segment_end:
             # range starts in a previous segment and extends to further segments
             # delete this segment
             del data_locators[i]
             i -= 1
-        elif range_start < segment_start and range_end <= segment_end:
+        elif new_range_start < old_segment_start and new_range_end < old_segment_end:
             # range starts in a previous segment and ends in this segment
             # move the starting point of this segment up, and shrink it.
-            data_locators[i] = [locator+(range_end-segment_start), (segment_end-range_end), range_end]
+            data_locators[i] = Range(dl.locator, new_range_end, (old_segment_end-new_range_end), dl.segment_offset + (new_range_end-old_segment_start))
             return
-        segment_start = segment_end
         i += 1
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index d28efcf..a7e3be3 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -13,6 +13,10 @@ from keep import *
 import config
 import errors
 
+def locator_block_size(loc):
+    s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', loc)
+    return long(s.group(1))
+
 def normalize_stream(s, stream):
     '''
     s is the stream name
@@ -31,7 +35,7 @@ def normalize_stream(s, stream):
             if b.locator not in blocks:
                 stream_tokens.append(b.locator)
                 blocks[b.locator] = streamoffset
-                streamoffset += b.block_size
+                streamoffset += locator_block_size(b.locator)
 
     # Add the empty block if the stream is otherwise empty.
     if len(stream_tokens) == 1:
@@ -154,23 +158,6 @@ class StreamReader(object):
         return ' '.join(manifest_text) + '\n'
 
 
-class BufferBlock(object):
-    def __init__(self, locator, streamoffset, starting_size=2**16):
-        self.locator = locator
-        self.buffer_block = bytearray(starting_size)
-        self.buffer_view = memoryview(self.buffer_block)
-        self.write_pointer = 0
-        self.locator_list_entry = [locator, 0, streamoffset]
-
-    def append(self, data):
-        while (self.write_pointer+len(data)) > len(self.buffer_block):
-            new_buffer_block = bytearray(len(self.buffer_block) * 2)
-            new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
-            self.buffer_block = new_buffer_block
-            self.buffer_view = memoryview(self.buffer_block)
-        self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
-        self.write_pointer += len(data)
-        self.locator_list_entry[1] = self.write_pointer
 
 
 # class StreamWriter(StreamReader):
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 5ed8716..65e06fb 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -813,7 +813,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
 . 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
 """
-        print arvados.export_manifest(arvados.import_manifest(m1))
+
+        self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.import_manifest(m1)))
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 2f4fc70..dfff394 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -272,151 +272,6 @@ class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
 class StreamFileReadlinesTestCase(StreamFileReadTestCase):
     def read_for_test(self, reader, byte_count, **kwargs):
         return ''.join(reader.readlines(**kwargs))
-
-# class StreamWriterTestCase(unittest.TestCase):
-#     class MockKeep(object):
-#         def __init__(self, blocks):
-#             self.blocks = blocks
-#         def get(self, locator, num_retries=0):
-#             return self.blocks[locator]
-#         def put(self, data):
-#             pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
-#             self.blocks[pdh] = str(data)
-#             return pdh
-
-#     def test_init(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         self.assertEqual("01234", stream.readfrom(0, 5))
-
-#     def test_append(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         self.assertEqual("56789", stream.readfrom(5, 8))
-#         stream.append("foo")
-#         self.assertEqual("56789foo", stream.readfrom(5, 8))
-
-
-# class StreamFileWriterTestCase(unittest.TestCase):
-#     def test_truncate(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         self.assertEqual("56789", writer.readfrom(5, 8))
-#         writer.truncate(8)
-#         self.assertEqual("567", writer.readfrom(5, 8))
-
-#     def test_append(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         self.assertEqual("56789", writer.readfrom(5, 8))
-#         writer.seek(10)
-#         writer.write("foo")
-#         self.assertEqual(writer.size(), 13)
-#         self.assertEqual("56789foo", writer.readfrom(5, 8))
-
-#     def test_write0(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         self.assertEqual("0123456789", writer.readfrom(0, 13))
-#         writer.seek(0)
-#         writer.write("foo")
-#         self.assertEqual(writer.size(), 10)
-#         self.assertEqual("foo3456789", writer.readfrom(0, 13))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 10:3:count.txt 3:7:count.txt\n", stream.manifest_text())
-
-#     def test_write1(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         self.assertEqual("0123456789", writer.readfrom(0, 13))
-#         writer.seek(3)
-#         writer.write("foo")
-#         self.assertEqual(writer.size(), 10)
-#         self.assertEqual("012foo6789", writer.readfrom(0, 13))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", stream.manifest_text())
-
-#     def test_write2(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         self.assertEqual("0123456789", writer.readfrom(0, 13))
-#         writer.seek(7)
-#         writer.write("foo")
-#         self.assertEqual(writer.size(), 10)
-#         self.assertEqual("0123456foo", writer.readfrom(0, 13))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:3:count.txt\n", stream.manifest_text())
-
-#     def test_write3(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         self.assertEqual("012345678901234", writer.readfrom(0, 15))
-#         writer.seek(7)
-#         writer.write("foobar")
-#         self.assertEqual(writer.size(), 20)
-#         self.assertEqual("0123456foobar34", writer.readfrom(0, 15))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", stream.manifest_text())
-
-#     def test_write4(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:4:count.txt', '0:4:count.txt', '0:4:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         self.assertEqual("012301230123", writer.readfrom(0, 15))
-#         writer.seek(2)
-#         writer.write("abcdefg")
-#         self.assertEqual(writer.size(), 12)
-#         self.assertEqual("01abcdefg123", writer.readfrom(0, 15))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", stream.manifest_text())
-
-#     def test_write_large(self):
-#         stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({}))
-#         writer = stream.files()["count.txt"]
-#         text = ''.join(["0123456789" for a in xrange(0, 100)])
-#         for b in xrange(0, 100000):
-#             writer.write(text)
-#         self.assertEqual(writer.size(), 100000000)
-#         stream.commit()
-#         self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", stream.manifest_text())
-
-#     def test_write_rewrite0(self):
-#         stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({}))
-#         writer = stream.files()["count.txt"]
-#         for b in xrange(0, 10):
-#             writer.seek(0, os.SEEK_SET)
-#             writer.write("0123456789")
-#         stream.commit()
-#         self.assertEqual(writer.size(), 10)
-#         self.assertEqual("0123456789", writer.readfrom(0, 20))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", stream.manifest_text())
-
-#     def test_write_rewrite1(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         for b in xrange(0, 10):
-#             writer.seek(10, os.SEEK_SET)
-#             writer.write("abcdefghij")
-#         stream.commit()
-#         self.assertEqual(writer.size(), 20)
-#         self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:10:count.txt 10:10:count.txt\n", stream.manifest_text())
-
-#     def test_write_rewrite2(self):
-#         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
-#                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-#         writer = stream.files()["count.txt"]
-#         for b in xrange(0, 10):
-#             writer.seek(5, os.SEEK_SET)
-#             writer.write("abcdefghij")
-#         stream.commit()
-#         self.assertEqual(writer.size(), 15)
-#         self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
-#         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", stream.manifest_text())
-
+o
 if __name__ == '__main__':
     unittest.main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list