[ARVADOS] updated: b0e2fe9d0a18d5e809bc8c0d3382e0e023cb949c
git at public.curoverse.com
git at public.curoverse.com
Wed Dec 17 10:03:34 EST 2014
Summary of changes:
sdk/python/arvados/__init__.py | 3 +-
sdk/python/arvados/arvfile.py | 236 +++++++++++++++++++
sdk/python/arvados/collection.py | 44 +---
sdk/python/arvados/ranges.py | 149 ++++++++++++
sdk/python/arvados/stream.py | 427 ++++-------------------------------
sdk/python/tests/arvados_testutil.py | 4 +-
sdk/python/tests/test_stream.py | 2 +-
7 files changed, 439 insertions(+), 426 deletions(-)
create mode 100644 sdk/python/arvados/ranges.py
via b0e2fe9d0a18d5e809bc8c0d3382e0e023cb949c (commit)
from 71959d0d8434b7af5de9a5b8260f0ebe11ec7238 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b0e2fe9d0a18d5e809bc8c0d3382e0e023cb949c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Dec 17 10:04:08 2014 -0500
3198: Refactor stream and file classes and functions a little bit for readability.
diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py
index 4cae20d..19a7ad8 100644
--- a/sdk/python/arvados/__init__.py
+++ b/sdk/python/arvados/__init__.py
@@ -22,6 +22,7 @@ from api import *
from collection import *
from keep import *
from stream import *
+from arvfile import *
import errors
import util
@@ -131,5 +132,3 @@ class job_setup:
body={'success':True}
).execute()
exit(0)
-
-
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index e8dac46..ef7a6c8 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -1,4 +1,21 @@
import functools
+import os
+import zlib
+import bz2
+from .ranges import *
+from arvados.retry import retry_method
+
+def split(path):
+ """split(path) -> streamname, filename
+
+ Separate the stream name and file name in a /-separated stream path.
+ If no stream name is available, assume '.'.
+ """
+ try:
+ stream_name, file_name = path.rsplit('/', 1)
+ except ValueError: # No / in string
+ stream_name, file_name = '.', path
+ return stream_name, file_name
class ArvadosFileBase(object):
def __init__(self, name, mode):
@@ -27,3 +44,222 @@ class ArvadosFileBase(object):
def close(self):
self.closed = True
+
+
+class StreamFileReader(ArvadosFileBase):
+ class _NameAttribute(str):
+ # The Python file API provides a plain .name attribute.
+ # Older SDK provided a name() method.
+ # This class provides both, for maximum compatibility.
+ def __call__(self):
+ return self
+
+
+ def __init__(self, stream, segments, name):
+ super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
+ self._stream = stream
+ self.segments = segments
+ self._filepos = 0L
+ self.num_retries = stream.num_retries
+ self._readline_cache = (None, None)
+
+ def __iter__(self):
+ while True:
+ data = self.readline()
+ if not data:
+ break
+ yield data
+
+ def decompressed_name(self):
+ return re.sub('\.(bz2|gz)$', '', self.name)
+
+ def stream_name(self):
+ return self._stream.name()
+
+ @ArvadosFileBase._before_close
+ def seek(self, pos, whence=os.SEEK_CUR):
+ if whence == os.SEEK_CUR:
+ pos += self._filepos
+ elif whence == os.SEEK_END:
+ pos += self.size()
+ self._filepos = min(max(pos, 0L), self._size())
+
+ def tell(self):
+ return self._filepos
+
+ def _size(self):
+ n = self.segments[-1]
+ return n[OFFSET] + n[BLOCKSIZE]
+
+ def size(self):
+ return self._size()
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def read(self, size, num_retries=None):
+ """Read up to 'size' bytes from the stream, starting at the current file position"""
+ if size == 0:
+ return ''
+
+ data = ''
+ available_chunks = locators_and_ranges(self.segments, self._filepos, size)
+ if available_chunks:
+ locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
+ data = self._stream._readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries)
+
+ self._filepos += len(data)
+ return data
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
+ """Read up to 'size' bytes from the stream, starting at 'start'"""
+ if size == 0:
+ return ''
+
+ data = []
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
+ data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries))
+ return ''.join(data)
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readall(self, size=2**20, num_retries=None):
+ while True:
+ data = self.read(size, num_retries=num_retries)
+ if data == '':
+ break
+ yield data
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readline(self, size=float('inf'), num_retries=None):
+ cache_pos, cache_data = self._readline_cache
+ if self.tell() == cache_pos:
+ data = [cache_data]
+ else:
+ data = ['']
+ data_size = len(data[-1])
+ while (data_size < size) and ('\n' not in data[-1]):
+ next_read = self.read(2 ** 20, num_retries=num_retries)
+ if not next_read:
+ break
+ data.append(next_read)
+ data_size += len(next_read)
+ data = ''.join(data)
+ try:
+ nextline_index = data.index('\n') + 1
+ except ValueError:
+ nextline_index = len(data)
+ nextline_index = min(nextline_index, size)
+ self._readline_cache = (self.tell(), data[nextline_index:])
+ return data[:nextline_index]
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def decompress(self, decompress, size, num_retries=None):
+ for segment in self.readall(size, num_retries):
+ data = decompress(segment)
+ if data:
+ yield data
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readall_decompressed(self, size=2**20, num_retries=None):
+ self.seek(0)
+ if self.name.endswith('.bz2'):
+ dc = bz2.BZ2Decompressor()
+ return self.decompress(dc.decompress, size,
+ num_retries=num_retries)
+ elif self.name.endswith('.gz'):
+ dc = zlib.decompressobj(16+zlib.MAX_WBITS)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+ size, num_retries=num_retries)
+ else:
+ return self.readall(size, num_retries=num_retries)
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readlines(self, sizehint=float('inf'), num_retries=None):
+ data = []
+ data_size = 0
+ for s in self.readall(num_retries=num_retries):
+ data.append(s)
+ data_size += len(s)
+ if data_size >= sizehint:
+ break
+ return ''.join(data).splitlines(True)
+
+ def as_manifest(self):
+ manifest_text = ['.']
+ manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
+ manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
+ return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
+
+
+class StreamFileWriter(StreamFileReader):
+ def __init__(self, stream, segments, name):
+ super(StreamFileWriter, self).__init__(stream, segments, name)
+ self.mode = 'wb'
+
+ # wrap superclass methods in mutex
+ def _proxy_method(name):
+ method = getattr(StreamFileReader, name)
+ @functools.wraps(method, ('__name__', '__doc__'))
+ def wrapper(self, *args, **kwargs):
+ with self._stream.mutex:
+ return method(self, *args, **kwargs)
+ return wrapper
+
+ for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
+ locals()[_method_name] = _proxy_method(_method_name)
+
+ def truncate(self, size=None):
+ with self._stream.mutex:
+ if size is None:
+ size = self._filepos
+
+ segs = locators_and_ranges(self.segments, 0, size)
+
+ newstream = []
+ self.segments = []
+ streamoffset = 0L
+ fileoffset = 0L
+
+ for seg in segs:
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
+ newstream.append([locator, blocksize, streamoffset])
+ self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
+ streamoffset += blocksize
+ fileoffset += segmentsize
+ if len(newstream) == 0:
+ newstream.append(config.EMPTY_BLOCK_LOCATOR)
+ self.segments.append([0, 0, 0])
+ self._stream._data_locators = newstream
+ if self._filepos > fileoffset:
+ self._filepos = fileoffset
+
+ def _writeto(self, offset, data):
+ self._stream._append(data)
+ replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
+ self._filepos += len(data)
+
+ def writeto(self, offset, data):
+ with self._stream.mutex:
+ self._writeto(offset, data)
+
+ def write(self, data):
+ with self._stream.mutex:
+ self._writeto(self._filepos, data)
+ self._filepos += len(data)
+
+ def writelines(self, seq):
+ with self._stream.mutex:
+ for s in seq:
+ self._writeto(self._filepos, s)
+ self._filepos += len(s)
+
+ def flush(self):
+ pass
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 5ab7e77..24572ed 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -6,55 +6,15 @@ import re
from collections import deque
from stat import *
-from .arvfile import ArvadosFileBase
+from .arvfile import ArvadosFileBase, split
from keep import *
-from .stream import StreamReader, split
+from .stream import StreamReader, normalize_stream
import config
import errors
import util
_logger = logging.getLogger('arvados.collection')
-def normalize_stream(s, stream):
- stream_tokens = [s]
- sortedfiles = list(stream.keys())
- sortedfiles.sort()
-
- blocks = {}
- streamoffset = 0L
- for f in sortedfiles:
- for b in stream[f]:
- if b[arvados.LOCATOR] not in blocks:
- stream_tokens.append(b[arvados.LOCATOR])
- blocks[b[arvados.LOCATOR]] = streamoffset
- streamoffset += b[arvados.BLOCKSIZE]
-
- if len(stream_tokens) == 1:
- stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
-
- for f in sortedfiles:
- current_span = None
- fout = f.replace(' ', '\\040')
- for segment in stream[f]:
- segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
- if current_span is None:
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
- else:
- if segmentoffset == current_span[1]:
- current_span[1] += segment[arvados.SEGMENTSIZE]
- else:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-
- if current_span is not None:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-
- if not stream[f]:
- stream_tokens.append("0:0:{0}".format(fout))
-
- return stream_tokens
-
-
class CollectionBase(object):
def __enter__(self):
return self
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
new file mode 100644
index 0000000..fe9c04b
--- /dev/null
+++ b/sdk/python/arvados/ranges.py
@@ -0,0 +1,149 @@
+LOCATOR = 0
+BLOCKSIZE = 1
+OFFSET = 2
+SEGMENTSIZE = 3
+
+def first_block(data_locators, range_start, range_size, debug=False):
+ block_start = 0L
+
+ # range_start/block_start is the inclusive lower bound
+ # range_end/block_end is the exclusive upper bound
+
+ hi = len(data_locators)
+ lo = 0
+ i = int((hi + lo) / 2)
+ block_size = data_locators[i][BLOCKSIZE]
+ block_start = data_locators[i][OFFSET]
+ block_end = block_start + block_size
+ if debug: print '---'
+
+ # perform a binary search for the first block
+ # assumes that all of the blocks are contigious, so range_start is guaranteed
+ # to either fall into the range of a block or be outside the block range entirely
+ while not (range_start >= block_start and range_start < block_end):
+ if lo == i:
+ # must be out of range, fail
+ return None
+ if range_start > block_start:
+ lo = i
+ else:
+ hi = i
+ i = int((hi + lo) / 2)
+ if debug: print lo, i, hi
+ block_size = data_locators[i][BLOCKSIZE]
+ block_start = data_locators[i][OFFSET]
+ block_end = block_start + block_size
+
+ return i
+
+def locators_and_ranges(data_locators, range_start, range_size, debug=False):
+ '''
+ Get blocks that are covered by the range
+ data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+ range_start: start of range
+ range_size: size of range
+ returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
+ '''
+ if range_size == 0:
+ return []
+ resp = []
+ range_start = long(range_start)
+ range_size = long(range_size)
+ range_end = range_start + range_size
+
+ i = first_block(data_locators, range_start, range_size, debug)
+ if i is None:
+ return []
+
+ while i < len(data_locators):
+ locator, block_size, block_start = data_locators[i]
+ block_end = block_start + block_size
+ if debug:
+ print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+ if range_end <= block_start:
+ # range ends before this block starts, so don't look at any more locators
+ break
+
+ #if range_start >= block_end:
+ # range starts after this block ends, so go to next block
+ # we should always start at the first block due to the binary above, so this test is redundant
+ #next
+
+ if range_start >= block_start and range_end <= block_end:
+ # range starts and ends in this block
+ resp.append([locator, block_size, range_start - block_start, range_size])
+ elif range_start >= block_start and range_end > block_end:
+ # range starts in this block
+ resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+ elif range_start < block_start and range_end > block_end:
+ # range starts in a previous block and extends to further blocks
+ resp.append([locator, block_size, 0L, block_size])
+ elif range_start < block_start and range_end <= block_end:
+ # range starts in a previous block and ends in this block
+ resp.append([locator, block_size, 0L, range_end - block_start])
+ block_start = block_end
+ i += 1
+ return resp
+
+def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+ '''
+ Replace a range with a new block.
+ data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+ range_start: start of range
+ range_size: size of range
+ new_locator: locator for new block to be inserted
+ !!! data_locators will be updated in place !!!
+ '''
+ if range_size == 0:
+ return
+
+ range_start = long(range_start)
+ range_size = long(range_size)
+ range_end = range_start + range_size
+
+ last = data_locators[-1]
+ if (last[OFFSET]+last[BLOCKSIZE]) == range_start:
+ # append new block
+ data_locators.append([new_locator, range_size, range_start])
+ return
+
+ i = first_block(data_locators, range_start, range_size, debug)
+ if i is None:
+ return
+
+ while i < len(data_locators):
+ locator, block_size, block_start = data_locators[i]
+ block_end = block_start + block_size
+ if debug:
+ print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+ if range_end <= block_start:
+ # range ends before this block starts, so don't look at any more locators
+ break
+
+ #if range_start >= block_end:
+ # range starts after this block ends, so go to next block
+ # we should always start at the first block due to the binary above, so this test is redundant
+ #next
+
+ if range_start >= block_start and range_end <= block_end:
+ # range starts and ends in this block
+ # split block into 3 pieces
+ #resp.append([locator, block_size, range_start - block_start, range_size])
+ pass
+ elif range_start >= block_start and range_end > block_end:
+ # range starts in this block
+ # split block into 2 pieces
+ #resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+ pass
+ elif range_start < block_start and range_end > block_end:
+ # range starts in a previous block and extends to further blocks
+ # zero out this block
+ #resp.append([locator, block_size, 0L, block_size])
+ pass
+ elif range_start < block_start and range_end <= block_end:
+ # range starts in a previous block and ends in this block
+ # split into 2 pieces
+ #resp.append([locator, block_size, 0L, range_end - block_start])
+ pass
+ block_start = block_end
+ i += 1
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 3f64615..8623ab9 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -1,332 +1,59 @@
-import bz2
import collections
import hashlib
import os
import re
-import zlib
import threading
import functools
-from .arvfile import ArvadosFileBase
+from .ranges import *
+from .arvfile import ArvadosFileBase, StreamFileReader, StreamFileWriter
from arvados.retry import retry_method
from keep import *
import config
import errors
-LOCATOR = 0
-BLOCKSIZE = 1
-OFFSET = 2
-SEGMENTSIZE = 3
-
-def first_block(data_locators, range_start, range_size, debug=False):
- block_start = 0L
-
- # range_start/block_start is the inclusive lower bound
- # range_end/block_end is the exclusive upper bound
-
- hi = len(data_locators)
- lo = 0
- i = int((hi + lo) / 2)
- block_size = data_locators[i][BLOCKSIZE]
- block_start = data_locators[i][OFFSET]
- block_end = block_start + block_size
- if debug: print '---'
-
- # perform a binary search for the first block
- # assumes that all of the blocks are contigious, so range_start is guaranteed
- # to either fall into the range of a block or be outside the block range entirely
- while not (range_start >= block_start and range_start < block_end):
- if lo == i:
- # must be out of range, fail
- return None
- if range_start > block_start:
- lo = i
- else:
- hi = i
- i = int((hi + lo) / 2)
- if debug: print lo, i, hi
- block_size = data_locators[i][BLOCKSIZE]
- block_start = data_locators[i][OFFSET]
- block_end = block_start + block_size
-
- return i
-
-def locators_and_ranges(data_locators, range_start, range_size, debug=False):
- '''
- Get blocks that are covered by the range
- data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
- range_start: start of range
- range_size: size of range
- returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
+def normalize_stream(s, stream):
'''
- if range_size == 0:
- return []
- resp = []
- range_start = long(range_start)
- range_size = long(range_size)
- range_end = range_start + range_size
-
- i = first_block(data_locators, range_start, range_size, debug)
- if i is None:
- return []
-
- while i < len(data_locators):
- locator, block_size, block_start = data_locators[i]
- block_end = block_start + block_size
- if debug:
- print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
- if range_end <= block_start:
- # range ends before this block starts, so don't look at any more locators
- break
-
- #if range_start >= block_end:
- # range starts after this block ends, so go to next block
- # we should always start at the first block due to the binary above, so this test is redundant
- #next
-
- if range_start >= block_start and range_end <= block_end:
- # range starts and ends in this block
- resp.append([locator, block_size, range_start - block_start, range_size])
- elif range_start >= block_start and range_end > block_end:
- # range starts in this block
- resp.append([locator, block_size, range_start - block_start, block_end - range_start])
- elif range_start < block_start and range_end > block_end:
- # range starts in a previous block and extends to further blocks
- resp.append([locator, block_size, 0L, block_size])
- elif range_start < block_start and range_end <= block_end:
- # range starts in a previous block and ends in this block
- resp.append([locator, block_size, 0L, range_end - block_start])
- block_start = block_end
- i += 1
- return resp
-
-def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+ s is the stream name
+ stream is a StreamReader object
'''
- Replace a range with a new block.
- data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
- range_start: start of range
- range_size: size of range
- new_locator: locator for new block to be inserted
- !!! data_locators will be updated in place !!!
- '''
- if range_size == 0:
- return
-
- range_start = long(range_start)
- range_size = long(range_size)
- range_end = range_start + range_size
-
- last = data_locators[-1]
- if (last[OFFSET]+last[BLOCKSIZE]) == range_start:
- # append new block
- data_locators.append([new_locator, range_size, range_start])
- return
-
- i = first_block(data_locators, range_start, range_size, debug)
- if i is None:
- return
-
- while i < len(data_locators):
- locator, block_size, block_start = data_locators[i]
- block_end = block_start + block_size
- if debug:
- print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
- if range_end <= block_start:
- # range ends before this block starts, so don't look at any more locators
- break
-
- #if range_start >= block_end:
- # range starts after this block ends, so go to next block
- # we should always start at the first block due to the binary above, so this test is redundant
- #next
-
- if range_start >= block_start and range_end <= block_end:
- # range starts and ends in this block
- # split block into 3 pieces
- #resp.append([locator, block_size, range_start - block_start, range_size])
- pass
- elif range_start >= block_start and range_end > block_end:
- # range starts in this block
- # split block into 2 pieces
- #resp.append([locator, block_size, range_start - block_start, block_end - range_start])
- pass
- elif range_start < block_start and range_end > block_end:
- # range starts in a previous block and extends to further blocks
- # zero out this block
- #resp.append([locator, block_size, 0L, block_size])
- pass
- elif range_start < block_start and range_end <= block_end:
- # range starts in a previous block and ends in this block
- # split into 2 pieces
- #resp.append([locator, block_size, 0L, range_end - block_start])
- pass
- block_start = block_end
- i += 1
-
-
-def split(path):
- """split(path) -> streamname, filename
-
- Separate the stream name and file name in a /-separated stream path.
- If no stream name is available, assume '.'.
- """
- try:
- stream_name, file_name = path.rsplit('/', 1)
- except ValueError: # No / in string
- stream_name, file_name = '.', path
- return stream_name, file_name
-
-class StreamFileReader(ArvadosFileBase):
- class _NameAttribute(str):
- # The Python file API provides a plain .name attribute.
- # Older SDK provided a name() method.
- # This class provides both, for maximum compatibility.
- def __call__(self):
- return self
-
-
- def __init__(self, stream, segments, name):
- super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
- self._stream = stream
- self.segments = segments
- self._filepos = 0L
- self.num_retries = stream.num_retries
- self._readline_cache = (None, None)
-
- def __iter__(self):
- while True:
- data = self.readline()
- if not data:
- break
- yield data
-
- def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self.name)
-
- def stream_name(self):
- return self._stream.name()
-
- @ArvadosFileBase._before_close
- def seek(self, pos, whence=os.SEEK_CUR):
- if whence == os.SEEK_CUR:
- pos += self._filepos
- elif whence == os.SEEK_END:
- pos += self.size()
- self._filepos = min(max(pos, 0L), self._size())
-
- def tell(self):
- return self._filepos
-
- def _size(self):
- n = self.segments[-1]
- return n[OFFSET] + n[BLOCKSIZE]
-
- def size(self):
- return self._size()
-
- @ArvadosFileBase._before_close
- @retry_method
- def read(self, size, num_retries=None):
- """Read up to 'size' bytes from the stream, starting at the current file position"""
- if size == 0:
- return ''
-
- data = ''
- available_chunks = locators_and_ranges(self.segments, self._filepos, size)
- if available_chunks:
- locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
- data = self._stream._readfrom(locator+segmentoffset, segmentsize,
- num_retries=num_retries)
-
- self._filepos += len(data)
- return data
-
- @ArvadosFileBase._before_close
- @retry_method
- def readfrom(self, start, size, num_retries=None):
- """Read up to 'size' bytes from the stream, starting at 'start'"""
- if size == 0:
- return ''
-
- data = []
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
- num_retries=num_retries))
- return ''.join(data)
+ stream_tokens = [s]
+ sortedfiles = list(stream.keys())
+ sortedfiles.sort()
+
+ blocks = {}
+ streamoffset = 0L
+ for f in sortedfiles:
+ for b in stream[f]:
+ if b[arvados.LOCATOR] not in blocks:
+ stream_tokens.append(b[arvados.LOCATOR])
+ blocks[b[arvados.LOCATOR]] = streamoffset
+ streamoffset += b[arvados.BLOCKSIZE]
+
+ if len(stream_tokens) == 1:
+ stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
+ for f in sortedfiles:
+ current_span = None
+ fout = f.replace(' ', '\\040')
+ for segment in stream[f]:
+ segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
+ if current_span is None:
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+ else:
+ if segmentoffset == current_span[1]:
+ current_span[1] += segment[arvados.SEGMENTSIZE]
+ else:
+ stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
- @ArvadosFileBase._before_close
- @retry_method
- def readall(self, size=2**20, num_retries=None):
- while True:
- data = self.read(size, num_retries=num_retries)
- if data == '':
- break
- yield data
-
- @ArvadosFileBase._before_close
- @retry_method
- def readline(self, size=float('inf'), num_retries=None):
- cache_pos, cache_data = self._readline_cache
- if self.tell() == cache_pos:
- data = [cache_data]
- else:
- data = ['']
- data_size = len(data[-1])
- while (data_size < size) and ('\n' not in data[-1]):
- next_read = self.read(2 ** 20, num_retries=num_retries)
- if not next_read:
- break
- data.append(next_read)
- data_size += len(next_read)
- data = ''.join(data)
- try:
- nextline_index = data.index('\n') + 1
- except ValueError:
- nextline_index = len(data)
- nextline_index = min(nextline_index, size)
- self._readline_cache = (self.tell(), data[nextline_index:])
- return data[:nextline_index]
-
- @ArvadosFileBase._before_close
- @retry_method
- def decompress(self, decompress, size, num_retries=None):
- for segment in self.readall(size, num_retries):
- data = decompress(segment)
- if data:
- yield data
+ if current_span is not None:
+ stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- @ArvadosFileBase._before_close
- @retry_method
- def readall_decompressed(self, size=2**20, num_retries=None):
- self.seek(0)
- if self.name.endswith('.bz2'):
- dc = bz2.BZ2Decompressor()
- return self.decompress(dc.decompress, size,
- num_retries=num_retries)
- elif self.name.endswith('.gz'):
- dc = zlib.decompressobj(16+zlib.MAX_WBITS)
- return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
- size, num_retries=num_retries)
- else:
- return self.readall(size, num_retries=num_retries)
+ if not stream[f]:
+ stream_tokens.append("0:0:{0}".format(fout))
- @ArvadosFileBase._before_close
- @retry_method
- def readlines(self, sizehint=float('inf'), num_retries=None):
- data = []
- data_size = 0
- for s in self.readall(num_retries=num_retries):
- data.append(s)
- data_size += len(s)
- if data_size >= sizehint:
- break
- return ''.join(data).splitlines(True)
-
- def as_manifest(self):
- manifest_text = ['.']
- manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
- manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
- return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
+ return stream_tokens
class StreamReader(object):
@@ -387,13 +114,15 @@ class StreamReader(object):
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
+ @retry_method
def _keepget(self, locator, num_retries=None):
return self._keep.get(locator, num_retries=num_retries)
@retry_method
def readfrom(self, start, size, num_retries=None):
- self._readfrom(start, size, num_retries=num_retries)
+ return self._readfrom(start, size, num_retries=num_retries)
+ @retry_method
def _readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
@@ -413,11 +142,12 @@ class StreamReader(object):
manifest_text.append(m.group(0))
else:
manifest_text.extend([d[LOCATOR] for d in self._data_locators])
- manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
+ manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name.replace(' ', '\\040'))
for seg in f.segments])
for f in self._files.values()])
return ' '.join(manifest_text) + '\n'
+
class BufferBlock(object):
def __init__(self, locator, streamoffset):
self.locator = locator
@@ -431,6 +161,7 @@ class BufferBlock(object):
self.write_pointer += len(data)
self.locator_list_entry[1] = self.write_pointer
+
class StreamWriter(StreamReader):
def __init__(self, tokens, keep=None, debug=False, _empty=False,
num_retries=0):
@@ -454,9 +185,10 @@ class StreamWriter(StreamReader):
return method(self, *args, **kwargs)
return wrapper
- for _method_name in ['name', 'files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
+ for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
locals()[_method_name] = _proxy_method(_method_name)
+ @retry_method
def _keepget(self, locator, num_retries=None):
if locator in self.bufferblocks:
bb = self.bufferblocks[locator]
@@ -476,68 +208,3 @@ class StreamWriter(StreamReader):
def append(self, data):
with self.mutex:
self._append(data)
-
-class StreamFileWriter(StreamFileReader):
- def __init__(self, stream, segments, name):
- super(StreamFileWriter, self).__init__(stream, segments, name)
- self.mode = 'wb'
-
- # wrap superclass methods in mutex
- def _proxy_method(name):
- method = getattr(StreamFileReader, name)
- @functools.wraps(method, ('__name__', '__doc__'))
- def wrapper(self, *args, **kwargs):
- with self._stream.mutex:
- return method(self, *args, **kwargs)
- return wrapper
-
- for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
- locals()[_method_name] = _proxy_method(_method_name)
-
- def truncate(self, size=None):
- with self._stream.mutex:
- if size is None:
- size = self._filepos
-
- segs = locators_and_ranges(self.segments, 0, size)
-
- newstream = []
- self.segments = []
- streamoffset = 0L
- fileoffset = 0L
-
- for seg in segs:
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
- newstream.append([locator, blocksize, streamoffset])
- self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
- streamoffset += blocksize
- fileoffset += segmentsize
- if len(newstream) == 0:
- newstream.append(config.EMPTY_BLOCK_LOCATOR)
- self.segments.append([0, 0, 0])
- self._stream._data_locators = newstream
- if self._filepos > fileoffset:
- self._filepos = fileoffset
-
- def _writeto(self, offset, data):
- self._stream._append(data)
- replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
- self._filepos += len(data)
-
- def writeto(self, offset, data):
- with self._stream.mutex:
- self._writeto(offset, data)
-
- def write(self, data):
- with self._stream.mutex:
- self._writeto(self._filepos, data)
- self._filepos += len(data)
-
- def writelines(self, seq):
- with self._stream.mutex:
- for s in seq:
- self._writeto(self._filepos, s)
- self._filepos += len(s)
-
- def flush(self):
- pass
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 04ca6b5..aa7e632 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -63,8 +63,10 @@ class MockStreamReader(object):
return self._name
def readfrom(self, start, size, num_retries=None):
- return self._data[start:start + size]
+ self._readfrom(start, size, num_retries=num_retries)
+ def _readfrom(self, start, size, num_retries=None):
+ return self._data[start:start + size]
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index f07ca6c..baafc32 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -315,7 +315,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
writer.seek(10)
writer.write("foo")
self.assertEqual("56789foo", writer.readfrom(5, 8))
- #print stream.manifest_text()
+ #print arvados.normalize_stream(".", {"count.txt": stream.locators_and_ranges(0, stream.size())})
if __name__ == '__main__':
unittest.main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list