[ARVADOS] updated: 600378f933f028ba497cac86a978fa71401d209b
git at public.curoverse.com
git at public.curoverse.com
Mon Feb 17 11:51:15 EST 2014
Summary of changes:
sdk/python/arvados/collection.py | 81 ++++++++++---------
sdk/python/arvados/stream.py | 112 ++++++++++++--------------
sdk/python/test_collections.py | 165 ++++++++++++++++++++++++++++++++-----
3 files changed, 236 insertions(+), 122 deletions(-)
via 600378f933f028ba497cac86a978fa71401d209b (commit)
from 43e09c03c00f14d923f7335aa3d632d7e4785fbb (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 600378f933f028ba497cac86a978fa71401d209b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Feb 17 11:52:21 2014 -0500
Tests pass
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 6727dce..d790580 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -23,6 +23,43 @@ from stream import *
import config
import errors
+def normalize_stream(s, stream):
+ stream_tokens = [s]
+ sortedfiles = list(stream.keys())
+ sortedfiles.sort()
+
+ blocks = {}
+ streamoffset = 0L
+ for f in sortedfiles:
+ for b in stream[f]:
+ if b[arvados.LOCATOR] not in blocks:
+ stream_tokens.append(b[arvados.LOCATOR])
+ blocks[b[arvados.LOCATOR]] = streamoffset
+ streamoffset += b[arvados.BLOCKSIZE]
+
+ for f in sortedfiles:
+ current_span = None
+ fout = f.replace(' ', '\\040')
+ for segment in stream[f]:
+ segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
+ if current_span == None:
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+ else:
+ if segmentoffset == current_span[1]:
+ current_span[1] += segment[arvados.SEGMENTSIZE]
+ else:
+ stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+
+ if current_span != None:
+ stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+
+ if len(stream[f]) == 0:
+ stream_tokens.append("0:0:{0}".format(fout))
+
+ return stream_tokens
+
+
def normalize(collection):
streams = {}
for s in collection.all_streams():
@@ -42,50 +79,18 @@ def normalize(collection):
sortedstreams = list(streams.keys())
sortedstreams.sort()
for s in sortedstreams:
- stream = streams[s]
- stream_tokens = [s]
-
- sortedfiles = list(stream.keys())
- sortedfiles.sort()
-
- blocks = {}
- streamoffset = 0L
- for f in sortedfiles:
- for b in stream[f]:
- if b[arvados.LOCATOR] not in blocks:
- stream_tokens.append(b[arvados.LOCATOR])
- blocks[b[arvados.LOCATOR]] = streamoffset
- streamoffset += b[arvados.BLOCKSIZE]
-
- for f in sortedfiles:
- current_span = None
- fout = f.replace(' ', '\\040')
- for segment in stream[f]:
- segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
- if current_span == None:
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
- else:
- if segmentoffset == current_span[1]:
- current_span[1] += segment[arvados.SEGMENTSIZE]
- else:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-
- if current_span != None:
- stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-
- normalized_streams.append(stream_tokens)
+ normalized_streams.append(normalize_stream(s, streams[s]))
return normalized_streams
class CollectionReader(object):
def __init__(self, manifest_locator_or_text):
- if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
- self._manifest_text = manifest_locator_or_text
- self._manifest_locator = None
- else:
+ if re.search(r'^[a-f0-9]{32}\+\d+(\+\S)*$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
+ else:
+ self._manifest_text = manifest_locator_or_text
+ self._manifest_locator = None
self._streams = None
def __enter__(self):
@@ -115,7 +120,7 @@ class CollectionReader(object):
# now regenerate the manifest text based on the normalized stream
- #print "normalizing", self._manifest_text
+ #print "normalizing", self._manifest_text
self._manifest_text = ''
for stream in self._streams:
self._manifest_text += stream[0].replace(' ', '\\040')
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index ac91e2a..6ee12e1 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -17,6 +17,7 @@ import zlib
import fcntl
import time
import threading
+import collections
from keep import *
import config
@@ -43,6 +44,9 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
range_end = range_start + range_size
block_start = 0L
+ # range_start/block_start is the inclusive lower bound
+ # range_end/block_end is the exclusive upper bound
+
hi = len(data_locators)
lo = 0
i = int((hi + lo) / 2)
@@ -50,9 +54,14 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
block_start = data_locators[i][OFFSET]
block_end = block_start + block_size
if debug: print '---'
- while not (range_start >= block_start and range_start <= block_end):
+
+ # perform a binary search for the first block
+ # assumes that all of the blocks are contigious, so range_start is guaranteed
+ # to either fall into the range of a block or be outside the block range entirely
+ while not (range_start >= block_start and range_start < block_end):
if lo == i:
- break
+ # must be out of range, fail
+ return []
if range_start > block_start:
lo = i
else:
@@ -71,10 +80,13 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
if range_end <= block_start:
# range ends before this block starts, so don't look at any more locators
break
- if range_start >= block_end:
+
+ #if range_start >= block_end:
# range starts after this block ends, so go to next block
- next
- elif range_start >= block_start and range_end <= block_end:
+ # we should always start at the first block due to the binary above, so this test is redundant
+ #next
+
+ if range_start >= block_start and range_end <= block_end:
# range starts and ends in this block
resp.append([locator, block_size, range_start - block_start, range_size])
elif range_start >= block_start and range_end > block_end:
@@ -110,7 +122,7 @@ class StreamFileReader(object):
def seek(self, pos):
self._filepos = min(max(pos, 0L), self.size())
- def tell(self, pos):
+ def tell(self):
return self._filepos
def size(self):
@@ -124,8 +136,7 @@ class StreamFileReader(object):
data = ''
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
- self._stream.seek(locator+segmentoffset)
- data += self._stream.read(segmentsize)
+ data += self._stream.readfrom(locator+segmentoffset, segmentsize)
self._filepos += len(data)
return data
@@ -134,10 +145,10 @@ class StreamFileReader(object):
if size == 0:
return ''
- data = []
+ data = ''
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data.join()
+ return data
def readall(self, size=2**20):
while True:
@@ -146,26 +157,20 @@ class StreamFileReader(object):
break
yield data
- def bunzip2(self, size):
- decompressor = bz2.BZ2Decompressor()
- for segment in self.readall(size):
- data = decompressor.decompress(segment)
- if data and data != '':
- yield data
-
- def gunzip(self, size):
- decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
+ def decompress(self, decompress, size):
for segment in self.readall(size):
- data = decompressor.decompress(decompressor.unconsumed_tail + segment)
+ data = decompress(segment)
if data and data != '':
yield data
def readall_decompressed(self, size=2**20):
self.seek(0)
if re.search('\.bz2$', self._name):
- return self.bunzip2(size)
+ dc = bz2.BZ2Decompressor()
+ return self.decompress(lambda segment: dc.decompress(segment), size)
elif re.search('\.gz$', self._name):
- return self.gunzip(size)
+ dc = zlib.decompressobj(16+zlib.MAX_WBITS)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
else:
return self.readall(size)
@@ -173,7 +178,6 @@ class StreamFileReader(object):
if decompress:
datasource = self.readall_decompressed()
else:
- self._stream.seek(self._pos + self._filepos)
datasource = self.readall()
data = ''
for newdata in datasource:
@@ -189,19 +193,22 @@ class StreamFileReader(object):
if data != '':
yield data
-
class StreamReader(object):
- def __init__(self, tokens):
- self._tokens = tokens
- self._pos = 0L
-
+ def __init__(self, tokens, keep=None, debug=False):
self._stream_name = None
- self.data_locators = []
- self.files = {}
+ self._data_locators = []
+ self._files = collections.OrderedDict()
+ if keep != None:
+ self._keep = keep
+ else:
+ self._keep = Keep.global_client_object()
+
streamoffset = 0L
- for tok in self._tokens:
+ # parse stream
+ for tok in tokens:
+ if debug: print 'tok', tok
if self._stream_name == None:
self._stream_name = tok.replace('\\040', ' ')
continue
@@ -209,7 +216,7 @@ class StreamReader(object):
s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
if s:
blocksize = long(s.group(1))
- self.data_locators.append([tok, blocksize, streamoffset])
+ self._data_locators.append([tok, blocksize, streamoffset])
streamoffset += blocksize
continue
@@ -218,53 +225,36 @@ class StreamReader(object):
pos = long(s.group(1))
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
- if name not in self.files:
- self.files[name] = StreamFileReader(self, [[pos, size, 0]], name)
+ if name not in self._files:
+ self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
else:
- n = self.files[name]
+ n = self._files[name]
n.segments.append([pos, size, n.size()])
continue
raise errors.SyntaxError("Invalid manifest format")
-
- def tokens(self):
- return self._tokens
def name(self):
return self._stream_name
- def all_files(self):
- return self.files.values()
-
- def seek(self, pos):
- """Set the position of the next read operation."""
- self._pos = pos
+ def files(self):
+ return self._files
- def tell(self):
- return self._pos
+ def all_files(self):
+ return self._files.values()
def size(self):
- n = self.data_locators[-1]
- return n[self.OFFSET] + n[self.BLOCKSIZE]
+ n = self._data_locators[-1]
+ return n[OFFSET] + n[BLOCKSIZE]
def locators_and_ranges(self, range_start, range_size):
- return locators_and_ranges(self.data_locators, range_start, range_size)
-
- def read(self, size):
- """Read up to 'size' bytes from the stream, starting at the current file position"""
- if size == 0:
- return ''
- data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.data_locators, self._pos, size):
- data += Keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
- self._pos += len(data)
- return data
+ return locators_and_ranges(self._data_locators, range_start, range_size)
def readfrom(self, start, size):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.data_locators, start, size):
- data += Keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
+ data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
return data
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index 08589dd..50687b0 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -51,19 +51,15 @@ class LocalCollectionReaderTest(unittest.TestCase):
[3, '.', 'foo.txt', 'foo'],
[3, './baz', 'baz.txt', 'baz']]
self.assertEqual(got,
- expected,
- 'resulting file list is not what I expected')
+ expected)
stream0 = cr.all_streams()[0]
- self.assertEqual(stream0.read(0),
+ self.assertEqual(stream0.readfrom(0, 0),
'',
'reading zero bytes should have returned empty string')
- self.assertEqual(stream0.read(2**26),
+ self.assertEqual(stream0.readfrom(0, 2**26),
'foobar',
'reading entire stream failed')
- self.assertEqual(stream0.read(2**26),
- None,
- 'reading past end of stream should have returned None')
- self.assertEqual(stream0.read(0),
+ self.assertEqual(stream0.readfrom(2**26, 0),
'',
'reading zero bytes should have returned empty string')
@@ -91,24 +87,19 @@ class LocalCollectionManifestSubsetTest(unittest.TestCase):
arvados.Keep.put("bar"))),
[[2, '.', 'ar.txt', 'ar'],
[2, '.', 'fo.txt', 'fo'],
- [1, '.', 'ob.txt', 'o'],
- [1, '.', 'ob.txt', 'b'],
- [0, '.', 'zero.txt', ''],])
+ [2, '.', 'ob.txt', 'ob'],
+ [0, '.', 'zero.txt', '']])
+
def _runTest(self, collection, expected):
cr = arvados.CollectionReader(collection)
- manifest_subsets = []
for s in cr.all_streams():
- for f in s.all_files():
- manifest_subsets += [f.as_manifest()]
- expect_i = 0
- for m in manifest_subsets:
- cr = arvados.CollectionReader(m)
- for f in cr.all_files():
- got = [f.size(), f.stream_name(), f.name(), "".join(f.readall(2**26))]
- self.assertEqual(got,
- expected[expect_i],
- 'all_files|as_manifest did not preserve manifest contents: got %s expected %s' % (got, expected[expect_i]))
- expect_i += 1
+ for ex in expected:
+ if ex[0] == s:
+ f = s.files()[ex[2]]
+ got = [f.size(), f.stream_name(), f.name(), "".join(f.readall(2**26))]
+ self.assertEqual(got,
+ ex,
+ 'all_files|as_manifest did not preserve manifest contents: got %s expected %s' % (got, ex))
class LocalCollectionReadlineTest(unittest.TestCase):
def setUp(self):
@@ -138,6 +129,10 @@ class LocalCollectionEmptyFileTest(unittest.TestCase):
cw = arvados.CollectionWriter()
cw.start_new_file('zero.txt')
cw.write('')
+
+ print 'stuff'
+
+ self.assertEqual(cw.manifest_text(), ". 0:0:zero.txt\n")
self.check_manifest_file_sizes(cw.manifest_text(), [0])
cw = arvados.CollectionWriter()
cw.start_new_file('zero.txt')
@@ -148,6 +143,7 @@ class LocalCollectionEmptyFileTest(unittest.TestCase):
cw.start_new_file('zero.txt')
cw.write('')
self.check_manifest_file_sizes(cw.manifest_text(), [1,0,0])
+
def check_manifest_file_sizes(self, manifest_text, expect_sizes):
cr = arvados.CollectionReader(manifest_text)
got_sizes = []
@@ -272,6 +268,23 @@ class LocatorsAndRangesTest(unittest.TestCase):
self.assertEqual(arvados.locators_and_ranges(blocks2, 52, 2), [['f', 10, 2, 2]])
self.assertEqual(arvados.locators_and_ranges(blocks2, 62, 2), [])
self.assertEqual(arvados.locators_and_ranges(blocks2, -2, 2), [])
+
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 0, 2), [['a', 10, 0, 2]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 10, 2), [['b', 10, 0, 2]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 20, 2), [['c', 10, 0, 2]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 30, 2), [['d', 10, 0, 2]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 40, 2), [['e', 10, 0, 2]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 50, 2), [['f', 10, 0, 2]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 60, 2), [])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, -2, 2), [])
+
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 9, 2), [['a', 10, 9, 1], ['b', 10, 0, 1]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 19, 2), [['b', 10, 9, 1], ['c', 10, 0, 1]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 29, 2), [['c', 10, 9, 1], ['d', 10, 0, 1]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 39, 2), [['d', 10, 9, 1], ['e', 10, 0, 1]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 49, 2), [['e', 10, 9, 1], ['f', 10, 0, 1]])
+ self.assertEqual(arvados.locators_and_ranges(blocks2, 59, 2), [['f', 10, 9, 1]])
+
blocks3 = [['a', 10, 0],
['b', 10, 10],
@@ -332,3 +345,109 @@ class LocatorsAndRangesTest(unittest.TestCase):
self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
['c', 5, 0, 1]])
+class FileStreamTest(unittest.TestCase):
+ class MockStreamReader(object):
+ def __init__(self, content):
+ self.content = content
+
+ def readfrom(self, start, size):
+ return self.content[start:start+size]
+
+ def runTest(self):
+ content = 'abcdefghijklmnopqrstuvwxyz0123456789'
+ msr = FileStreamTest.MockStreamReader(content)
+ segments = [[0, 10, 0],
+ [10, 15, 10],
+ [25, 5, 25]]
+
+ sfr = arvados.StreamFileReader(msr, segments, "test")
+
+ self.assertEqual(sfr.name(), "test")
+ self.assertEqual(sfr.size(), 30)
+
+ self.assertEqual(sfr.readfrom(0, 30), content[0:30])
+ self.assertEqual(sfr.readfrom(2, 30), content[2:30])
+
+ self.assertEqual(sfr.readfrom(2, 8), content[2:10])
+ self.assertEqual(sfr.readfrom(0, 10), content[0:10])
+
+ self.assertEqual(sfr.tell(), 0)
+ self.assertEqual(sfr.read(5), content[0:5])
+ self.assertEqual(sfr.tell(), 5)
+ self.assertEqual(sfr.read(5), content[5:10])
+ self.assertEqual(sfr.tell(), 10)
+ self.assertEqual(sfr.read(5), content[10:15])
+ self.assertEqual(sfr.tell(), 15)
+ self.assertEqual(sfr.read(5), content[15:20])
+ self.assertEqual(sfr.tell(), 20)
+ self.assertEqual(sfr.read(5), content[20:25])
+ self.assertEqual(sfr.tell(), 25)
+ self.assertEqual(sfr.read(5), content[25:30])
+ self.assertEqual(sfr.tell(), 30)
+ self.assertEqual(sfr.read(5), '')
+ self.assertEqual(sfr.tell(), 30)
+
+ segments = [[26, 10, 0],
+ [0, 15, 10],
+ [15, 5, 25]]
+
+ sfr = arvados.StreamFileReader(msr, segments, "test")
+
+ self.assertEqual(sfr.size(), 30)
+
+ self.assertEqual(sfr.readfrom(0, 30), content[26:36] + content[0:20])
+ self.assertEqual(sfr.readfrom(2, 30), content[28:36] + content[0:20])
+
+ self.assertEqual(sfr.readfrom(2, 8), content[28:36])
+ self.assertEqual(sfr.readfrom(0, 10), content[26:36])
+
+ self.assertEqual(sfr.tell(), 0)
+ self.assertEqual(sfr.read(5), content[26:31])
+ self.assertEqual(sfr.tell(), 5)
+ self.assertEqual(sfr.read(5), content[31:36])
+ self.assertEqual(sfr.tell(), 10)
+ self.assertEqual(sfr.read(5), content[0:5])
+ self.assertEqual(sfr.tell(), 15)
+ self.assertEqual(sfr.read(5), content[5:10])
+ self.assertEqual(sfr.tell(), 20)
+ self.assertEqual(sfr.read(5), content[10:15])
+ self.assertEqual(sfr.tell(), 25)
+ self.assertEqual(sfr.read(5), content[15:20])
+ self.assertEqual(sfr.tell(), 30)
+ self.assertEqual(sfr.read(5), '')
+ self.assertEqual(sfr.tell(), 30)
+
+
+class StreamReaderTest(unittest.TestCase):
+
+ class MockKeep(object):
+ def __init__(self, content):
+ self.content = content
+
+ def get(self, locator):
+ return self.content[locator]
+
+ def runTest(self):
+ keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
+ 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
+ 'cccccccccccccccccccccccccccccccc+5': 'z0123'}
+ mk = StreamReaderTest.MockKeep(keepblocks)
+
+ sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:30:foo"], mk)
+
+ content = 'abcdefghijklmnopqrstuvwxyz0123456789'
+
+ self.assertEqual(sr.readfrom(0, 30), content[0:30])
+ self.assertEqual(sr.readfrom(2, 30), content[2:30])
+
+ self.assertEqual(sr.readfrom(2, 8), content[2:10])
+ self.assertEqual(sr.readfrom(0, 10), content[0:10])
+
+ self.assertEqual(sr.readfrom(0, 5), content[0:5])
+ self.assertEqual(sr.readfrom(5, 5), content[5:10])
+ self.assertEqual(sr.readfrom(10, 5), content[10:15])
+ self.assertEqual(sr.readfrom(15, 5), content[15:20])
+ self.assertEqual(sr.readfrom(20, 5), content[20:25])
+ self.assertEqual(sr.readfrom(25, 5), content[25:30])
+ self.assertEqual(sr.readfrom(30, 5), '')
+
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list