[ARVADOS] updated: 409a1a9ca8d2481e58f3f1b3c7485b8feff850ea
git at public.curoverse.com
git at public.curoverse.com
Mon Nov 10 17:29:08 EST 2014
Summary of changes:
sdk/python/arvados/collection.py | 3 ++
sdk/python/arvados/stream.py | 55 +++++++++++++------------
sdk/python/tests/test_collections.py | 79 +++++-------------------------------
sdk/python/tests/test_stream.py | 51 ++++++++++++++++++++---
4 files changed, 87 insertions(+), 101 deletions(-)
via 409a1a9ca8d2481e58f3f1b3c7485b8feff850ea (commit)
from 038b1bc4fe952b0bf95b566542973e55c4a6ecc7 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 409a1a9ca8d2481e58f3f1b3c7485b8feff850ea
Author: Brett Smith <brett at curoverse.com>
Date: Mon Nov 10 17:29:03 2014 -0500
3603: Improve PySDK file API compatibility.
Per code review.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 9755555..b003cfd 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -278,6 +278,9 @@ class _WriterFile(ArvadosFileBase):
for data in seq:
self.write(data)
+ def flush(self):
+ self.dest.flush_data()
+
class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 6a81634..c263dd8 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -117,12 +117,14 @@ class StreamFileReader(ArvadosFileBase):
self.segments = segments
self._filepos = 0L
self.num_retries = stream.num_retries
- self._readline_cache = (-1, None, None)
+ self._readline_cache = (None, None)
def __iter__(self):
- # If we've already started reading the file, don't decompress;
- # that implicitly seeks to the beginning, which we don't want here.
- return self.readlines(decompress=self.tell() == 0)
+ while True:
+ data = self.readline()
+ if not data:
+ break
+ yield data
def decompressed_name(self):
return re.sub('\.(bz2|gz)$', '', self.name)
@@ -131,11 +133,10 @@ class StreamFileReader(ArvadosFileBase):
return self._stream.name()
@ArvadosFileBase._before_close
- def seek(self, pos, rel=os.SEEK_SET):
- """Note that the default is SEEK_SET, not Python's usual SEEK_CUR."""
- if rel == os.SEEK_CUR:
+ def seek(self, pos, whence=os.SEEK_CUR):
+ if whence == os.SEEK_CUR:
pos += self._filepos
- elif rel == os.SEEK_END:
+ elif whence == os.SEEK_END:
pos += self.size()
self._filepos = min(max(pos, 0L), self.size())
@@ -187,27 +188,26 @@ class StreamFileReader(ArvadosFileBase):
@ArvadosFileBase._before_close
@retry_method
- def readline(self, read_iter=None, num_retries=None):
- cache_pos, cache_iter, cache_data = self._readline_cache
- if (((read_iter is None) or (read_iter is cache_iter)) and
- (self.tell() == cache_pos)):
- read_iter = cache_iter
+ def readline(self, size=float('inf'), num_retries=None):
+ cache_pos, cache_data = self._readline_cache
+ if self.tell() == cache_pos:
data = [cache_data]
else:
- if read_iter is None:
- read_iter = self.readall_decompressed(num_retries=num_retries)
data = ['']
- while '\n' not in data[-1]:
- try:
- data.append(next(read_iter))
- except StopIteration:
+ data_size = len(data[-1])
+ while (data_size < size) and ('\n' not in data[-1]):
+ next_read = self.read(2 ** 20, num_retries=num_retries)
+ if not next_read:
break
+ data.append(next_read)
+ data_size += len(next_read)
data = ''.join(data)
try:
nextline_index = data.index('\n') + 1
except ValueError:
nextline_index = len(data)
- self._readline_cache = (self.tell(), read_iter, data[nextline_index:])
+ nextline_index = min(nextline_index, size)
+ self._readline_cache = (self.tell(), data[nextline_index:])
return data[:nextline_index]
@ArvadosFileBase._before_close
@@ -235,14 +235,15 @@ class StreamFileReader(ArvadosFileBase):
@ArvadosFileBase._before_close
@retry_method
- def readlines(self, decompress=True, num_retries=None):
- read_func = self.readall_decompressed if decompress else self.readall
- read_iter = read_func(num_retries=num_retries)
- while True:
- data = self.readline(read_iter, num_retries=num_retries)
- if not data:
+ def readlines(self, sizehint=float('inf'), num_retries=None):
+ data = []
+ data_size = 0
+ for s in self.readall(num_retries=num_retries):
+ data.append(s)
+ data_size += len(s)
+ if data_size >= sizehint:
break
- yield data
+ return ''.join(data).splitlines(True)
def as_manifest(self):
manifest_text = ['.']
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index cbbd0d5..045f93a 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -3,14 +3,12 @@
# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
import arvados
-import bz2
import copy
import hashlib
import mock
import os
import pprint
import re
-import subprocess
import tempfile
import unittest
@@ -124,25 +122,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
[2, '.', 'ob.txt', 'ob'],
[0, '.', 'zero.txt', '']])
- def _test_readline(self, what_in, what_out):
- cw = arvados.CollectionWriter(self.api_client)
- cw.start_new_file('test.txt')
- cw.write(what_in)
- test1 = cw.finish()
- cr = arvados.CollectionReader(test1, self.api_client)
- got = []
- for x in list(cr.all_files())[0].readlines():
- got += [x]
- self.assertEqual(got,
- what_out,
- "readlines did not split lines correctly: %s" % got)
-
- def test_collection_readline(self):
- self._test_readline("\na\nbcd\n\nefg\nz",
- ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
- self._test_readline("ab\ncd\n",
- ["ab\n", "cd\n"])
-
def test_collection_empty_file(self):
cw = arvados.CollectionWriter(self.api_client)
cw.start_new_file('zero.txt')
@@ -179,53 +158,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
got_sizes += [f.size()]
self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes))
- def test_collection_bz2_decompression(self):
- n_lines_in = 2**18
- data_in = "abc\n"
- for x in xrange(0, 18):
- data_in += data_in
- compressed_data_in = bz2.compress(data_in)
- cw = arvados.CollectionWriter(self.api_client)
- cw.start_new_file('test.bz2')
- cw.write(compressed_data_in)
- bz2_manifest = cw.manifest_text()
-
- cr = arvados.CollectionReader(bz2_manifest, self.api_client)
-
- got = 0
- for x in list(cr.all_files())[0].readlines():
- self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
- got += 1
- self.assertEqual(got,
- n_lines_in,
- "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
- def test_collection_gzip_decompression(self):
- n_lines_in = 2**18
- data_in = "abc\n"
- for x in xrange(0, 18):
- data_in += data_in
- p = subprocess.Popen(["gzip", "-1cn"],
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE,
- shell=False, close_fds=True)
- compressed_data_in, stderrdata = p.communicate(data_in)
-
- cw = arvados.CollectionWriter(self.api_client)
- cw.start_new_file('test.gz')
- cw.write(compressed_data_in)
- gzip_manifest = cw.manifest_text()
-
- cr = arvados.CollectionReader(gzip_manifest, self.api_client)
- got = 0
- for x in list(cr.all_files())[0].readlines():
- self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
- got += 1
- self.assertEqual(got,
- n_lines_in,
- "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
def test_normalized_collection(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
@@ -804,6 +736,17 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
self.assertEqual(". {} 0:6:six\n".format(data_loc),
writer.manifest_text())
+ def test_open_flush(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('flush_test') as out_file:
+ out_file.write('flushtext')
+ data_loc = hashlib.md5('flushtext').hexdigest() + '+9'
+ with self.mock_keep(data_loc, 200) as keep_mock:
+ out_file.flush()
+ self.assertEqual(". {} 0:9:flush_test\n".format(data_loc),
+ writer.manifest_text())
+
def test_two_opens_same_stream(self):
client = self.api_client_mock()
writer = arvados.CollectionWriter(client)
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index e272845..fb65925 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,7 +1,10 @@
#!/usr/bin/env python
-import os
+import bz2
+import gzip
+import io
import mock
+import os
import unittest
import arvados
@@ -43,8 +46,7 @@ class StreamFileReaderTestCase(unittest.TestCase):
self.assertEqual('123456789', ''.join(sfile.readall()))
def test_one_arg_seek(self):
- # Our default has been SEEK_SET since time immemorial.
- self.test_absolute_seek([])
+ self.test_relative_seek([])
def test_absolute_seek(self, args=[os.SEEK_SET]):
sfile = self.make_count_reader()
@@ -53,10 +55,10 @@ class StreamFileReaderTestCase(unittest.TestCase):
sfile.seek(4, *args)
self.assertEqual('56', sfile.read(2))
- def test_relative_seek(self):
+ def test_relative_seek(self, args=[os.SEEK_CUR]):
sfile = self.make_count_reader()
self.assertEqual('12', sfile.read(2))
- sfile.seek(2, os.SEEK_CUR)
+ sfile.seek(2, *args)
self.assertEqual('56', sfile.read(2))
def test_end_seek(self):
@@ -117,11 +119,24 @@ class StreamFileReaderTestCase(unittest.TestCase):
self.check_lines(actual)
def test_readlines(self):
- self.check_lines(list(self.make_newlines_reader().readlines()))
+ self.check_lines(self.make_newlines_reader().readlines())
def test_iteration(self):
self.check_lines(list(iter(self.make_newlines_reader())))
+ def test_readline_size(self):
+ reader = self.make_newlines_reader()
+ self.assertEqual('on', reader.readline(2))
+ self.assertEqual('e\n', reader.readline(4))
+ self.assertEqual('two\n', reader.readline(6))
+ self.assertEqual('\n', reader.readline(8))
+ self.assertEqual('thre', reader.readline(4))
+
+ def test_readlines_sizehint(self):
+ result = self.make_newlines_reader().readlines(8)
+ self.assertEqual(['one\n', 'two\n'], result[:2])
+ self.assertNotIn('three\n', result)
+
def test_name_attribute(self):
# Test both .name and .name() (for backward compatibility)
stream = tutil.MockStreamReader()
@@ -129,6 +144,30 @@ class StreamFileReaderTestCase(unittest.TestCase):
self.assertEqual('nametest', sfile.name)
self.assertEqual('nametest', sfile.name())
+ def check_decompression(self, compress_ext, compress_func):
+ test_text = 'decompression\ntest\n'
+ test_data = compress_func(test_text)
+ stream = tutil.MockStreamReader('.', test_data)
+ reader = StreamFileReader(stream, [[0, len(test_data), 0]],
+ 'test.' + compress_ext)
+ self.assertEqual(test_text, ''.join(reader.readall_decompressed()))
+
+ @staticmethod
+ def gzip_compress(data):
+ compressed_data = io.BytesIO()
+ with gzip.GzipFile(fileobj=compressed_data, mode='wb') as gzip_file:
+ gzip_file.write(data)
+ return compressed_data.getvalue()
+
+ def test_no_decompression(self):
+ self.check_decompression('log', lambda s: s)
+
+ def test_gzip_decompression(self):
+ self.check_decompression('gz', self.gzip_compress)
+
+ def test_bz2_decompression(self):
+ self.check_decompression('bz2', bz2.compress)
+
class StreamRetryTestMixin(object):
# Define reader_for(coll_name, **kwargs)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list