[ARVADOS] updated: b0e2fe9d0a18d5e809bc8c0d3382e0e023cb949c

git at public.curoverse.com git at public.curoverse.com
Wed Dec 17 10:03:34 EST 2014


Summary of changes:
 sdk/python/arvados/__init__.py       |   3 +-
 sdk/python/arvados/arvfile.py        | 236 +++++++++++++++++++
 sdk/python/arvados/collection.py     |  44 +---
 sdk/python/arvados/ranges.py         | 149 ++++++++++++
 sdk/python/arvados/stream.py         | 427 ++++-------------------------------
 sdk/python/tests/arvados_testutil.py |   4 +-
 sdk/python/tests/test_stream.py      |   2 +-
 7 files changed, 439 insertions(+), 426 deletions(-)
 create mode 100644 sdk/python/arvados/ranges.py

       via  b0e2fe9d0a18d5e809bc8c0d3382e0e023cb949c (commit)
      from  71959d0d8434b7af5de9a5b8260f0ebe11ec7238 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b0e2fe9d0a18d5e809bc8c0d3382e0e023cb949c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Dec 17 10:04:08 2014 -0500

    3198: Refactor stream and file classes and functions a little bit for readability.

diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py
index 4cae20d..19a7ad8 100644
--- a/sdk/python/arvados/__init__.py
+++ b/sdk/python/arvados/__init__.py
@@ -22,6 +22,7 @@ from api import *
 from collection import *
 from keep import *
 from stream import *
+from arvfile import *
 import errors
 import util
 
@@ -131,5 +132,3 @@ class job_setup:
                                        body={'success':True}
                                        ).execute()
             exit(0)
-
-
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index e8dac46..ef7a6c8 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1,4 +1,21 @@
 import functools
+import os
+import zlib
+import bz2
+from .ranges import *
+from arvados.retry import retry_method
+
+def split(path):
+    """split(path) -> streamname, filename
+
+    Separate the stream name and file name in a /-separated stream path.
+    If no stream name is available, assume '.'.
+    """
+    try:
+        stream_name, file_name = path.rsplit('/', 1)
+    except ValueError:  # No / in string
+        stream_name, file_name = '.', path
+    return stream_name, file_name
 
 class ArvadosFileBase(object):
     def __init__(self, name, mode):
@@ -27,3 +44,222 @@ class ArvadosFileBase(object):
 
     def close(self):
         self.closed = True
+
+
+class StreamFileReader(ArvadosFileBase):
+    class _NameAttribute(str):
+        # The Python file API provides a plain .name attribute.
+        # Older SDK provided a name() method.
+        # This class provides both, for maximum compatibility.
+        def __call__(self):
+            return self
+
+
+    def __init__(self, stream, segments, name):
+        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
+        self._stream = stream
+        self.segments = segments
+        self._filepos = 0L
+        self.num_retries = stream.num_retries
+        self._readline_cache = (None, None)
+
+    def __iter__(self):
+        while True:
+            data = self.readline()
+            if not data:
+                break
+            yield data
+
+    def decompressed_name(self):
+        return re.sub('\.(bz2|gz)$', '', self.name)
+
+    def stream_name(self):
+        return self._stream.name()
+
+    @ArvadosFileBase._before_close
+    def seek(self, pos, whence=os.SEEK_CUR):
+        if whence == os.SEEK_CUR:
+            pos += self._filepos
+        elif whence == os.SEEK_END:
+            pos += self.size()
+        self._filepos = min(max(pos, 0L), self._size())
+
+    def tell(self):
+        return self._filepos
+
+    def _size(self):
+        n = self.segments[-1]
+        return n[OFFSET] + n[BLOCKSIZE]
+
+    def size(self):
+        return self._size()
+
+    @ArvadosFileBase._before_close
+    @retry_method
+    def read(self, size, num_retries=None):
+        """Read up to 'size' bytes from the stream, starting at the current file position"""
+        if size == 0:
+            return ''
+
+        data = ''
+        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
+        if available_chunks:
+            locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
+            data = self._stream._readfrom(locator+segmentoffset, segmentsize,
+                                         num_retries=num_retries)
+
+        self._filepos += len(data)
+        return data
+
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readfrom(self, start, size, num_retries=None):
+        """Read up to 'size' bytes from the stream, starting at 'start'"""
+        if size == 0:
+            return ''
+
+        data = []
+        for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
+            data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
+                                              num_retries=num_retries))
+        return ''.join(data)
+
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readall(self, size=2**20, num_retries=None):
+        while True:
+            data = self.read(size, num_retries=num_retries)
+            if data == '':
+                break
+            yield data
+
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readline(self, size=float('inf'), num_retries=None):
+        cache_pos, cache_data = self._readline_cache
+        if self.tell() == cache_pos:
+            data = [cache_data]
+        else:
+            data = ['']
+        data_size = len(data[-1])
+        while (data_size < size) and ('\n' not in data[-1]):
+            next_read = self.read(2 ** 20, num_retries=num_retries)
+            if not next_read:
+                break
+            data.append(next_read)
+            data_size += len(next_read)
+        data = ''.join(data)
+        try:
+            nextline_index = data.index('\n') + 1
+        except ValueError:
+            nextline_index = len(data)
+        nextline_index = min(nextline_index, size)
+        self._readline_cache = (self.tell(), data[nextline_index:])
+        return data[:nextline_index]
+
+    @ArvadosFileBase._before_close
+    @retry_method
+    def decompress(self, decompress, size, num_retries=None):
+        for segment in self.readall(size, num_retries):
+            data = decompress(segment)
+            if data:
+                yield data
+
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readall_decompressed(self, size=2**20, num_retries=None):
+        self.seek(0)
+        if self.name.endswith('.bz2'):
+            dc = bz2.BZ2Decompressor()
+            return self.decompress(dc.decompress, size,
+                                   num_retries=num_retries)
+        elif self.name.endswith('.gz'):
+            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
+            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+                                   size, num_retries=num_retries)
+        else:
+            return self.readall(size, num_retries=num_retries)
+
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readlines(self, sizehint=float('inf'), num_retries=None):
+        data = []
+        data_size = 0
+        for s in self.readall(num_retries=num_retries):
+            data.append(s)
+            data_size += len(s)
+            if data_size >= sizehint:
+                break
+        return ''.join(data).splitlines(True)
+
+    def as_manifest(self):
+        manifest_text = ['.']
+        manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
+        manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
+        return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
+
+
+class StreamFileWriter(StreamFileReader):
+    def __init__(self, stream, segments, name):
+        super(StreamFileWriter, self).__init__(stream, segments, name)
+        self.mode = 'wb'
+
+    # wrap superclass methods in mutex
+    def _proxy_method(name):
+        method = getattr(StreamFileReader, name)
+        @functools.wraps(method, ('__name__', '__doc__'))
+        def wrapper(self, *args, **kwargs):
+            with self._stream.mutex:
+                return method(self, *args, **kwargs)
+        return wrapper
+
+    for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
+        locals()[_method_name] = _proxy_method(_method_name)
+
+    def truncate(self, size=None):
+        with self._stream.mutex:
+            if size is None:
+                size = self._filepos
+
+            segs = locators_and_ranges(self.segments, 0, size)
+
+            newstream = []
+            self.segments = []
+            streamoffset = 0L
+            fileoffset = 0L
+
+            for seg in segs:
+                for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
+                    newstream.append([locator, blocksize, streamoffset])
+                    self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
+                    streamoffset += blocksize
+                    fileoffset += segmentsize
+            if len(newstream) == 0:
+                newstream.append(config.EMPTY_BLOCK_LOCATOR)
+                self.segments.append([0, 0, 0])
+            self._stream._data_locators = newstream
+            if self._filepos > fileoffset:
+                self._filepos = fileoffset
+
+    def _writeto(self, offset, data):
+        self._stream._append(data)
+        replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
+        self._filepos += len(data)
+
+    def writeto(self, offset, data):
+        with self._stream.mutex:
+            self._writeto(offset, data)
+
+    def write(self, data):
+        with self._stream.mutex:
+            self._writeto(self._filepos, data)
+            self._filepos += len(data)
+
+    def writelines(self, seq):
+        with self._stream.mutex:
+            for s in seq:
+                self._writeto(self._filepos, s)
+                self._filepos += len(s)
+
+    def flush(self):
+        pass
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 5ab7e77..24572ed 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -6,55 +6,15 @@ import re
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase
+from .arvfile import ArvadosFileBase, split
 from keep import *
-from .stream import StreamReader, split
+from .stream import StreamReader, normalize_stream
 import config
 import errors
 import util
 
 _logger = logging.getLogger('arvados.collection')
 
-def normalize_stream(s, stream):
-    stream_tokens = [s]
-    sortedfiles = list(stream.keys())
-    sortedfiles.sort()
-
-    blocks = {}
-    streamoffset = 0L
-    for f in sortedfiles:
-        for b in stream[f]:
-            if b[arvados.LOCATOR] not in blocks:
-                stream_tokens.append(b[arvados.LOCATOR])
-                blocks[b[arvados.LOCATOR]] = streamoffset
-                streamoffset += b[arvados.BLOCKSIZE]
-
-    if len(stream_tokens) == 1:
-        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
-
-    for f in sortedfiles:
-        current_span = None
-        fout = f.replace(' ', '\\040')
-        for segment in stream[f]:
-            segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
-            if current_span is None:
-                current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-            else:
-                if segmentoffset == current_span[1]:
-                    current_span[1] += segment[arvados.SEGMENTSIZE]
-                else:
-                    stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-                    current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-
-        if current_span is not None:
-            stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-
-        if not stream[f]:
-            stream_tokens.append("0:0:{0}".format(fout))
-
-    return stream_tokens
-
-
 class CollectionBase(object):
     def __enter__(self):
         return self
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
new file mode 100644
index 0000000..fe9c04b
--- /dev/null
+++ b/sdk/python/arvados/ranges.py
@@ -0,0 +1,149 @@
+LOCATOR = 0
+BLOCKSIZE = 1
+OFFSET = 2
+SEGMENTSIZE = 3
+
+def first_block(data_locators, range_start, range_size, debug=False):
+    block_start = 0L
+
+    # range_start/block_start is the inclusive lower bound
+    # range_end/block_end is the exclusive upper bound
+
+    hi = len(data_locators)
+    lo = 0
+    i = int((hi + lo) / 2)
+    block_size = data_locators[i][BLOCKSIZE]
+    block_start = data_locators[i][OFFSET]
+    block_end = block_start + block_size
+    if debug: print '---'
+
+    # perform a binary search for the first block
+    # assumes that all of the blocks are contigious, so range_start is guaranteed
+    # to either fall into the range of a block or be outside the block range entirely
+    while not (range_start >= block_start and range_start < block_end):
+        if lo == i:
+            # must be out of range, fail
+            return None
+        if range_start > block_start:
+            lo = i
+        else:
+            hi = i
+        i = int((hi + lo) / 2)
+        if debug: print lo, i, hi
+        block_size = data_locators[i][BLOCKSIZE]
+        block_start = data_locators[i][OFFSET]
+        block_end = block_start + block_size
+
+    return i
+
+def locators_and_ranges(data_locators, range_start, range_size, debug=False):
+    '''
+    Get blocks that are covered by the range
+    data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+    range_start: start of range
+    range_size: size of range
+    returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
+    '''
+    if range_size == 0:
+        return []
+    resp = []
+    range_start = long(range_start)
+    range_size = long(range_size)
+    range_end = range_start + range_size
+
+    i = first_block(data_locators, range_start, range_size, debug)
+    if i is None:
+        return []
+
+    while i < len(data_locators):
+        locator, block_size, block_start = data_locators[i]
+        block_end = block_start + block_size
+        if debug:
+            print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+        if range_end <= block_start:
+            # range ends before this block starts, so don't look at any more locators
+            break
+
+        #if range_start >= block_end:
+            # range starts after this block ends, so go to next block
+            # we should always start at the first block due to the binary above, so this test is redundant
+            #next
+
+        if range_start >= block_start and range_end <= block_end:
+            # range starts and ends in this block
+            resp.append([locator, block_size, range_start - block_start, range_size])
+        elif range_start >= block_start and range_end > block_end:
+            # range starts in this block
+            resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+        elif range_start < block_start and range_end > block_end:
+            # range starts in a previous block and extends to further blocks
+            resp.append([locator, block_size, 0L, block_size])
+        elif range_start < block_start and range_end <= block_end:
+            # range starts in a previous block and ends in this block
+            resp.append([locator, block_size, 0L, range_end - block_start])
+        block_start = block_end
+        i += 1
+    return resp
+
+def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+    '''
+    Replace a range with a new block.
+    data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+    range_start: start of range
+    range_size: size of range
+    new_locator: locator for new block to be inserted
+    !!! data_locators will be updated in place !!!
+    '''
+    if range_size == 0:
+        return
+
+    range_start = long(range_start)
+    range_size = long(range_size)
+    range_end = range_start + range_size
+
+    last = data_locators[-1]
+    if (last[OFFSET]+last[BLOCKSIZE]) == range_start:
+        # append new block
+        data_locators.append([new_locator, range_size, range_start])
+        return
+
+    i = first_block(data_locators, range_start, range_size, debug)
+    if i is None:
+        return
+
+    while i < len(data_locators):
+        locator, block_size, block_start = data_locators[i]
+        block_end = block_start + block_size
+        if debug:
+            print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+        if range_end <= block_start:
+            # range ends before this block starts, so don't look at any more locators
+            break
+
+        #if range_start >= block_end:
+            # range starts after this block ends, so go to next block
+            # we should always start at the first block due to the binary above, so this test is redundant
+            #next
+
+        if range_start >= block_start and range_end <= block_end:
+            # range starts and ends in this block
+            # split block into 3 pieces
+            #resp.append([locator, block_size, range_start - block_start, range_size])
+            pass
+        elif range_start >= block_start and range_end > block_end:
+            # range starts in this block
+            # split block into 2 pieces
+            #resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+            pass
+        elif range_start < block_start and range_end > block_end:
+            # range starts in a previous block and extends to further blocks
+            # zero out this block
+            #resp.append([locator, block_size, 0L, block_size])
+            pass
+        elif range_start < block_start and range_end <= block_end:
+            # range starts in a previous block and ends in this block
+            # split into 2 pieces
+            #resp.append([locator, block_size, 0L, range_end - block_start])
+            pass
+        block_start = block_end
+        i += 1
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 3f64615..8623ab9 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -1,332 +1,59 @@
-import bz2
 import collections
 import hashlib
 import os
 import re
-import zlib
 import threading
 import functools
 
-from .arvfile import ArvadosFileBase
+from .ranges import *
+from .arvfile import ArvadosFileBase, StreamFileReader, StreamFileWriter
 from arvados.retry import retry_method
 from keep import *
 import config
 import errors
 
-LOCATOR = 0
-BLOCKSIZE = 1
-OFFSET = 2
-SEGMENTSIZE = 3
-
-def first_block(data_locators, range_start, range_size, debug=False):
-    block_start = 0L
-
-    # range_start/block_start is the inclusive lower bound
-    # range_end/block_end is the exclusive upper bound
-
-    hi = len(data_locators)
-    lo = 0
-    i = int((hi + lo) / 2)
-    block_size = data_locators[i][BLOCKSIZE]
-    block_start = data_locators[i][OFFSET]
-    block_end = block_start + block_size
-    if debug: print '---'
-
-    # perform a binary search for the first block
-    # assumes that all of the blocks are contigious, so range_start is guaranteed
-    # to either fall into the range of a block or be outside the block range entirely
-    while not (range_start >= block_start and range_start < block_end):
-        if lo == i:
-            # must be out of range, fail
-            return None
-        if range_start > block_start:
-            lo = i
-        else:
-            hi = i
-        i = int((hi + lo) / 2)
-        if debug: print lo, i, hi
-        block_size = data_locators[i][BLOCKSIZE]
-        block_start = data_locators[i][OFFSET]
-        block_end = block_start + block_size
-
-    return i
-
-def locators_and_ranges(data_locators, range_start, range_size, debug=False):
-    '''
-    Get blocks that are covered by the range
-    data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
-    range_start: start of range
-    range_size: size of range
-    returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
+def normalize_stream(s, stream):
     '''
-    if range_size == 0:
-        return []
-    resp = []
-    range_start = long(range_start)
-    range_size = long(range_size)
-    range_end = range_start + range_size
-
-    i = first_block(data_locators, range_start, range_size, debug)
-    if i is None:
-        return []
-
-    while i < len(data_locators):
-        locator, block_size, block_start = data_locators[i]
-        block_end = block_start + block_size
-        if debug:
-            print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
-        if range_end <= block_start:
-            # range ends before this block starts, so don't look at any more locators
-            break
-
-        #if range_start >= block_end:
-            # range starts after this block ends, so go to next block
-            # we should always start at the first block due to the binary above, so this test is redundant
-            #next
-
-        if range_start >= block_start and range_end <= block_end:
-            # range starts and ends in this block
-            resp.append([locator, block_size, range_start - block_start, range_size])
-        elif range_start >= block_start and range_end > block_end:
-            # range starts in this block
-            resp.append([locator, block_size, range_start - block_start, block_end - range_start])
-        elif range_start < block_start and range_end > block_end:
-            # range starts in a previous block and extends to further blocks
-            resp.append([locator, block_size, 0L, block_size])
-        elif range_start < block_start and range_end <= block_end:
-            # range starts in a previous block and ends in this block
-            resp.append([locator, block_size, 0L, range_end - block_start])
-        block_start = block_end
-        i += 1
-    return resp
-
-def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+    s is the stream name
+    stream is a StreamReader object
     '''
-    Replace a range with a new block.
-    data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
-    range_start: start of range
-    range_size: size of range
-    new_locator: locator for new block to be inserted
-    !!! data_locators will be updated in place !!!
-    '''
-    if range_size == 0:
-        return
-
-    range_start = long(range_start)
-    range_size = long(range_size)
-    range_end = range_start + range_size
-
-    last = data_locators[-1]
-    if (last[OFFSET]+last[BLOCKSIZE]) == range_start:
-        # append new block
-        data_locators.append([new_locator, range_size, range_start])
-        return
-
-    i = first_block(data_locators, range_start, range_size, debug)
-    if i is None:
-        return
-
-    while i < len(data_locators):
-        locator, block_size, block_start = data_locators[i]
-        block_end = block_start + block_size
-        if debug:
-            print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
-        if range_end <= block_start:
-            # range ends before this block starts, so don't look at any more locators
-            break
-
-        #if range_start >= block_end:
-            # range starts after this block ends, so go to next block
-            # we should always start at the first block due to the binary above, so this test is redundant
-            #next
-
-        if range_start >= block_start and range_end <= block_end:
-            # range starts and ends in this block
-            # split block into 3 pieces
-            #resp.append([locator, block_size, range_start - block_start, range_size])
-            pass
-        elif range_start >= block_start and range_end > block_end:
-            # range starts in this block
-            # split block into 2 pieces
-            #resp.append([locator, block_size, range_start - block_start, block_end - range_start])
-            pass
-        elif range_start < block_start and range_end > block_end:
-            # range starts in a previous block and extends to further blocks
-            # zero out this block
-            #resp.append([locator, block_size, 0L, block_size])
-            pass
-        elif range_start < block_start and range_end <= block_end:
-            # range starts in a previous block and ends in this block
-            # split into 2 pieces
-            #resp.append([locator, block_size, 0L, range_end - block_start])
-            pass
-        block_start = block_end
-        i += 1
-
-
-def split(path):
-    """split(path) -> streamname, filename
-
-    Separate the stream name and file name in a /-separated stream path.
-    If no stream name is available, assume '.'.
-    """
-    try:
-        stream_name, file_name = path.rsplit('/', 1)
-    except ValueError:  # No / in string
-        stream_name, file_name = '.', path
-    return stream_name, file_name
-
-class StreamFileReader(ArvadosFileBase):
-    class _NameAttribute(str):
-        # The Python file API provides a plain .name attribute.
-        # Older SDK provided a name() method.
-        # This class provides both, for maximum compatibility.
-        def __call__(self):
-            return self
-
-
-    def __init__(self, stream, segments, name):
-        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
-        self._stream = stream
-        self.segments = segments
-        self._filepos = 0L
-        self.num_retries = stream.num_retries
-        self._readline_cache = (None, None)
-
-    def __iter__(self):
-        while True:
-            data = self.readline()
-            if not data:
-                break
-            yield data
-
-    def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self.name)
-
-    def stream_name(self):
-        return self._stream.name()
-
-    @ArvadosFileBase._before_close
-    def seek(self, pos, whence=os.SEEK_CUR):
-        if whence == os.SEEK_CUR:
-            pos += self._filepos
-        elif whence == os.SEEK_END:
-            pos += self.size()
-        self._filepos = min(max(pos, 0L), self._size())
-
-    def tell(self):
-        return self._filepos
-
-    def _size(self):
-        n = self.segments[-1]
-        return n[OFFSET] + n[BLOCKSIZE]
-
-    def size(self):
-        return self._size()
-
-    @ArvadosFileBase._before_close
-    @retry_method
-    def read(self, size, num_retries=None):
-        """Read up to 'size' bytes from the stream, starting at the current file position"""
-        if size == 0:
-            return ''
-
-        data = ''
-        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
-        if available_chunks:
-            locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
-            data = self._stream._readfrom(locator+segmentoffset, segmentsize,
-                                         num_retries=num_retries)
-
-        self._filepos += len(data)
-        return data
-
-    @ArvadosFileBase._before_close
-    @retry_method
-    def readfrom(self, start, size, num_retries=None):
-        """Read up to 'size' bytes from the stream, starting at 'start'"""
-        if size == 0:
-            return ''
-
-        data = []
-        for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
-            data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
-                                              num_retries=num_retries))
-        return ''.join(data)
+    stream_tokens = [s]
+    sortedfiles = list(stream.keys())
+    sortedfiles.sort()
+
+    blocks = {}
+    streamoffset = 0L
+    for f in sortedfiles:
+        for b in stream[f]:
+            if b[arvados.LOCATOR] not in blocks:
+                stream_tokens.append(b[arvados.LOCATOR])
+                blocks[b[arvados.LOCATOR]] = streamoffset
+                streamoffset += b[arvados.BLOCKSIZE]
+
+    if len(stream_tokens) == 1:
+        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
+    for f in sortedfiles:
+        current_span = None
+        fout = f.replace(' ', '\\040')
+        for segment in stream[f]:
+            segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
+            if current_span is None:
+                current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+            else:
+                if segmentoffset == current_span[1]:
+                    current_span[1] += segment[arvados.SEGMENTSIZE]
+                else:
+                    stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+                    current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
 
-    @ArvadosFileBase._before_close
-    @retry_method
-    def readall(self, size=2**20, num_retries=None):
-        while True:
-            data = self.read(size, num_retries=num_retries)
-            if data == '':
-                break
-            yield data
-
-    @ArvadosFileBase._before_close
-    @retry_method
-    def readline(self, size=float('inf'), num_retries=None):
-        cache_pos, cache_data = self._readline_cache
-        if self.tell() == cache_pos:
-            data = [cache_data]
-        else:
-            data = ['']
-        data_size = len(data[-1])
-        while (data_size < size) and ('\n' not in data[-1]):
-            next_read = self.read(2 ** 20, num_retries=num_retries)
-            if not next_read:
-                break
-            data.append(next_read)
-            data_size += len(next_read)
-        data = ''.join(data)
-        try:
-            nextline_index = data.index('\n') + 1
-        except ValueError:
-            nextline_index = len(data)
-        nextline_index = min(nextline_index, size)
-        self._readline_cache = (self.tell(), data[nextline_index:])
-        return data[:nextline_index]
-
-    @ArvadosFileBase._before_close
-    @retry_method
-    def decompress(self, decompress, size, num_retries=None):
-        for segment in self.readall(size, num_retries):
-            data = decompress(segment)
-            if data:
-                yield data
+        if current_span is not None:
+            stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
 
-    @ArvadosFileBase._before_close
-    @retry_method
-    def readall_decompressed(self, size=2**20, num_retries=None):
-        self.seek(0)
-        if self.name.endswith('.bz2'):
-            dc = bz2.BZ2Decompressor()
-            return self.decompress(dc.decompress, size,
-                                   num_retries=num_retries)
-        elif self.name.endswith('.gz'):
-            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
-            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
-                                   size, num_retries=num_retries)
-        else:
-            return self.readall(size, num_retries=num_retries)
+        if not stream[f]:
+            stream_tokens.append("0:0:{0}".format(fout))
 
-    @ArvadosFileBase._before_close
-    @retry_method
-    def readlines(self, sizehint=float('inf'), num_retries=None):
-        data = []
-        data_size = 0
-        for s in self.readall(num_retries=num_retries):
-            data.append(s)
-            data_size += len(s)
-            if data_size >= sizehint:
-                break
-        return ''.join(data).splitlines(True)
-
-    def as_manifest(self):
-        manifest_text = ['.']
-        manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
-        manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
-        return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
+    return stream_tokens
 
 
 class StreamReader(object):
@@ -387,13 +114,15 @@ class StreamReader(object):
     def locators_and_ranges(self, range_start, range_size):
         return locators_and_ranges(self._data_locators, range_start, range_size)
 
+    @retry_method
     def _keepget(self, locator, num_retries=None):
         return self._keep.get(locator, num_retries=num_retries)
 
     @retry_method
     def readfrom(self, start, size, num_retries=None):
-        self._readfrom(start, size, num_retries=num_retries)
+        return self._readfrom(start, size, num_retries=num_retries)
 
+    @retry_method
     def _readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
         if size == 0:
@@ -413,11 +142,12 @@ class StreamReader(object):
                 manifest_text.append(m.group(0))
         else:
             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
-        manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
+        manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name.replace(' ', '\\040'))
                                         for seg in f.segments])
                               for f in self._files.values()])
         return ' '.join(manifest_text) + '\n'
 
+
 class BufferBlock(object):
     def __init__(self, locator, streamoffset):
         self.locator = locator
@@ -431,6 +161,7 @@ class BufferBlock(object):
         self.write_pointer += len(data)
         self.locator_list_entry[1] = self.write_pointer
 
+
 class StreamWriter(StreamReader):
     def __init__(self, tokens, keep=None, debug=False, _empty=False,
                  num_retries=0):
@@ -454,9 +185,10 @@ class StreamWriter(StreamReader):
                 return method(self, *args, **kwargs)
         return wrapper
 
-    for _method_name in ['name', 'files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
+    for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
         locals()[_method_name] = _proxy_method(_method_name)
 
+    @retry_method
     def _keepget(self, locator, num_retries=None):
         if locator in self.bufferblocks:
             bb = self.bufferblocks[locator]
@@ -476,68 +208,3 @@ class StreamWriter(StreamReader):
     def append(self, data):
         with self.mutex:
             self._append(data)
-
-class StreamFileWriter(StreamFileReader):
-    def __init__(self, stream, segments, name):
-        super(StreamFileWriter, self).__init__(stream, segments, name)
-        self.mode = 'wb'
-
-    # wrap superclass methods in mutex
-    def _proxy_method(name):
-        method = getattr(StreamFileReader, name)
-        @functools.wraps(method, ('__name__', '__doc__'))
-        def wrapper(self, *args, **kwargs):
-            with self._stream.mutex:
-                return method(self, *args, **kwargs)
-        return wrapper
-
-    for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
-        locals()[_method_name] = _proxy_method(_method_name)
-
-    def truncate(self, size=None):
-        with self._stream.mutex:
-            if size is None:
-                size = self._filepos
-
-            segs = locators_and_ranges(self.segments, 0, size)
-
-            newstream = []
-            self.segments = []
-            streamoffset = 0L
-            fileoffset = 0L
-
-            for seg in segs:
-                for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
-                    newstream.append([locator, blocksize, streamoffset])
-                    self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
-                    streamoffset += blocksize
-                    fileoffset += segmentsize
-            if len(newstream) == 0:
-                newstream.append(config.EMPTY_BLOCK_LOCATOR)
-                self.segments.append([0, 0, 0])
-            self._stream._data_locators = newstream
-            if self._filepos > fileoffset:
-                self._filepos = fileoffset
-
-    def _writeto(self, offset, data):
-        self._stream._append(data)
-        replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
-        self._filepos += len(data)
-
-    def writeto(self, offset, data):
-        with self._stream.mutex:
-            self._writeto(offset, data)
-
-    def write(self, data):
-        with self._stream.mutex:
-            self._writeto(self._filepos, data)
-            self._filepos += len(data)
-
-    def writelines(self, seq):
-        with self._stream.mutex:
-            for s in seq:
-                self._writeto(self._filepos, s)
-                self._filepos += len(s)
-
-    def flush(self):
-        pass
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 04ca6b5..aa7e632 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -63,8 +63,10 @@ class MockStreamReader(object):
         return self._name
 
     def readfrom(self, start, size, num_retries=None):
-        return self._data[start:start + size]
+        self._readfrom(start, size, num_retries=num_retries)
 
+    def _readfrom(self, start, size, num_retries=None):
+        return self._data[start:start + size]
 
 class ArvadosBaseTestCase(unittest.TestCase):
     # This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index f07ca6c..baafc32 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -315,7 +315,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
         writer.seek(10)
         writer.write("foo")
         self.assertEqual("56789foo", writer.readfrom(5, 8))
-        #print stream.manifest_text()
+        #print arvados.normalize_stream(".", {"count.txt": stream.locators_and_ranges(0, stream.size())})
 
 if __name__ == '__main__':
     unittest.main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list