[ARVADOS] updated: 409a1a9ca8d2481e58f3f1b3c7485b8feff850ea

git at public.curoverse.com git at public.curoverse.com
Mon Nov 10 17:29:08 EST 2014


Summary of changes:
 sdk/python/arvados/collection.py     |  3 ++
 sdk/python/arvados/stream.py         | 55 +++++++++++++------------
 sdk/python/tests/test_collections.py | 79 +++++-------------------------------
 sdk/python/tests/test_stream.py      | 51 ++++++++++++++++++++---
 4 files changed, 87 insertions(+), 101 deletions(-)

       via  409a1a9ca8d2481e58f3f1b3c7485b8feff850ea (commit)
      from  038b1bc4fe952b0bf95b566542973e55c4a6ecc7 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 409a1a9ca8d2481e58f3f1b3c7485b8feff850ea
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Nov 10 17:29:03 2014 -0500

    3603: Improve PySDK file API compatibility.
    
    Per code review.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 9755555..b003cfd 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -278,6 +278,9 @@ class _WriterFile(ArvadosFileBase):
         for data in seq:
             self.write(data)
 
+    def flush(self):
+        self.dest.flush_data()
+
 
 class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 6a81634..c263dd8 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -117,12 +117,14 @@ class StreamFileReader(ArvadosFileBase):
         self.segments = segments
         self._filepos = 0L
         self.num_retries = stream.num_retries
-        self._readline_cache = (-1, None, None)
+        self._readline_cache = (None, None)
 
     def __iter__(self):
-        # If we've already started reading the file, don't decompress;
-        # that implicitly seeks to the beginning, which we don't want here.
-        return self.readlines(decompress=self.tell() == 0)
+        while True:
+            data = self.readline()
+            if not data:
+                break
+            yield data
 
     def decompressed_name(self):
         return re.sub('\.(bz2|gz)$', '', self.name)
@@ -131,11 +133,10 @@ class StreamFileReader(ArvadosFileBase):
         return self._stream.name()
 
     @ArvadosFileBase._before_close
-    def seek(self, pos, rel=os.SEEK_SET):
-        """Note that the default is SEEK_SET, not Python's usual SEEK_CUR."""
-        if rel == os.SEEK_CUR:
+    def seek(self, pos, whence=os.SEEK_CUR):
+        if whence == os.SEEK_CUR:
             pos += self._filepos
-        elif rel == os.SEEK_END:
+        elif whence == os.SEEK_END:
             pos += self.size()
         self._filepos = min(max(pos, 0L), self.size())
 
@@ -187,27 +188,26 @@ class StreamFileReader(ArvadosFileBase):
 
     @ArvadosFileBase._before_close
     @retry_method
-    def readline(self, read_iter=None, num_retries=None):
-        cache_pos, cache_iter, cache_data = self._readline_cache
-        if (((read_iter is None) or (read_iter is cache_iter)) and
-              (self.tell() == cache_pos)):
-            read_iter = cache_iter
+    def readline(self, size=float('inf'), num_retries=None):
+        cache_pos, cache_data = self._readline_cache
+        if self.tell() == cache_pos:
             data = [cache_data]
         else:
-            if read_iter is None:
-                read_iter = self.readall_decompressed(num_retries=num_retries)
             data = ['']
-        while '\n' not in data[-1]:
-            try:
-                data.append(next(read_iter))
-            except StopIteration:
+        data_size = len(data[-1])
+        while (data_size < size) and ('\n' not in data[-1]):
+            next_read = self.read(2 ** 20, num_retries=num_retries)
+            if not next_read:
                 break
+            data.append(next_read)
+            data_size += len(next_read)
         data = ''.join(data)
         try:
             nextline_index = data.index('\n') + 1
         except ValueError:
             nextline_index = len(data)
-        self._readline_cache = (self.tell(), read_iter, data[nextline_index:])
+        nextline_index = min(nextline_index, size)
+        self._readline_cache = (self.tell(), data[nextline_index:])
         return data[:nextline_index]
 
     @ArvadosFileBase._before_close
@@ -235,14 +235,15 @@ class StreamFileReader(ArvadosFileBase):
 
     @ArvadosFileBase._before_close
     @retry_method
-    def readlines(self, decompress=True, num_retries=None):
-        read_func = self.readall_decompressed if decompress else self.readall
-        read_iter = read_func(num_retries=num_retries)
-        while True:
-            data = self.readline(read_iter, num_retries=num_retries)
-            if not data:
+    def readlines(self, sizehint=float('inf'), num_retries=None):
+        data = []
+        data_size = 0
+        for s in self.readall(num_retries=num_retries):
+            data.append(s)
+            data_size += len(s)
+            if data_size >= sizehint:
                 break
-            yield data
+        return ''.join(data).splitlines(True)
 
     def as_manifest(self):
         manifest_text = ['.']
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index cbbd0d5..045f93a 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -3,14 +3,12 @@
 # ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
 
 import arvados
-import bz2
 import copy
 import hashlib
 import mock
 import os
 import pprint
 import re
-import subprocess
 import tempfile
 import unittest
 
@@ -124,25 +122,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                            [2, '.', 'ob.txt', 'ob'],
                            [0, '.', 'zero.txt', '']])
 
-    def _test_readline(self, what_in, what_out):
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.txt')
-        cw.write(what_in)
-        test1 = cw.finish()
-        cr = arvados.CollectionReader(test1, self.api_client)
-        got = []
-        for x in list(cr.all_files())[0].readlines():
-            got += [x]
-        self.assertEqual(got,
-                         what_out,
-                         "readlines did not split lines correctly: %s" % got)
-
-    def test_collection_readline(self):
-        self._test_readline("\na\nbcd\n\nefg\nz",
-                            ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
-        self._test_readline("ab\ncd\n",
-                            ["ab\n", "cd\n"])
-
     def test_collection_empty_file(self):
         cw = arvados.CollectionWriter(self.api_client)
         cw.start_new_file('zero.txt')
@@ -179,53 +158,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
             got_sizes += [f.size()]
         self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes))
 
-    def test_collection_bz2_decompression(self):
-        n_lines_in = 2**18
-        data_in = "abc\n"
-        for x in xrange(0, 18):
-            data_in += data_in
-        compressed_data_in = bz2.compress(data_in)
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.bz2')
-        cw.write(compressed_data_in)
-        bz2_manifest = cw.manifest_text()
-
-        cr = arvados.CollectionReader(bz2_manifest, self.api_client)
-
-        got = 0
-        for x in list(cr.all_files())[0].readlines():
-            self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
-            got += 1
-        self.assertEqual(got,
-                         n_lines_in,
-                         "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
-    def test_collection_gzip_decompression(self):
-        n_lines_in = 2**18
-        data_in = "abc\n"
-        for x in xrange(0, 18):
-            data_in += data_in
-        p = subprocess.Popen(["gzip", "-1cn"],
-                             stdout=subprocess.PIPE,
-                             stdin=subprocess.PIPE,
-                             stderr=subprocess.PIPE,
-                             shell=False, close_fds=True)
-        compressed_data_in, stderrdata = p.communicate(data_in)
-
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.gz')
-        cw.write(compressed_data_in)
-        gzip_manifest = cw.manifest_text()
-
-        cr = arvados.CollectionReader(gzip_manifest, self.api_client)
-        got = 0
-        for x in list(cr.all_files())[0].readlines():
-            self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
-            got += 1
-        self.assertEqual(got,
-                         n_lines_in,
-                         "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
     def test_normalized_collection(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
@@ -804,6 +736,17 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             self.assertEqual(". {} 0:6:six\n".format(data_loc),
                              writer.manifest_text())
 
+    def test_open_flush(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('flush_test') as out_file:
+            out_file.write('flushtext')
+            data_loc = hashlib.md5('flushtext').hexdigest() + '+9'
+            with self.mock_keep(data_loc, 200) as keep_mock:
+                out_file.flush()
+            self.assertEqual(". {} 0:9:flush_test\n".format(data_loc),
+                             writer.manifest_text())
+
     def test_two_opens_same_stream(self):
         client = self.api_client_mock()
         writer = arvados.CollectionWriter(client)
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index e272845..fb65925 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,7 +1,10 @@
 #!/usr/bin/env python
 
-import os
+import bz2
+import gzip
+import io
 import mock
+import os
 import unittest
 
 import arvados
@@ -43,8 +46,7 @@ class StreamFileReaderTestCase(unittest.TestCase):
         self.assertEqual('123456789', ''.join(sfile.readall()))
 
     def test_one_arg_seek(self):
-        # Our default has been SEEK_SET since time immemorial.
-        self.test_absolute_seek([])
+        self.test_relative_seek([])
 
     def test_absolute_seek(self, args=[os.SEEK_SET]):
         sfile = self.make_count_reader()
@@ -53,10 +55,10 @@ class StreamFileReaderTestCase(unittest.TestCase):
         sfile.seek(4, *args)
         self.assertEqual('56', sfile.read(2))
 
-    def test_relative_seek(self):
+    def test_relative_seek(self, args=[os.SEEK_CUR]):
         sfile = self.make_count_reader()
         self.assertEqual('12', sfile.read(2))
-        sfile.seek(2, os.SEEK_CUR)
+        sfile.seek(2, *args)
         self.assertEqual('56', sfile.read(2))
 
     def test_end_seek(self):
@@ -117,11 +119,24 @@ class StreamFileReaderTestCase(unittest.TestCase):
         self.check_lines(actual)
 
     def test_readlines(self):
-        self.check_lines(list(self.make_newlines_reader().readlines()))
+        self.check_lines(self.make_newlines_reader().readlines())
 
     def test_iteration(self):
         self.check_lines(list(iter(self.make_newlines_reader())))
 
+    def test_readline_size(self):
+        reader = self.make_newlines_reader()
+        self.assertEqual('on', reader.readline(2))
+        self.assertEqual('e\n', reader.readline(4))
+        self.assertEqual('two\n', reader.readline(6))
+        self.assertEqual('\n', reader.readline(8))
+        self.assertEqual('thre', reader.readline(4))
+
+    def test_readlines_sizehint(self):
+        result = self.make_newlines_reader().readlines(8)
+        self.assertEqual(['one\n', 'two\n'], result[:2])
+        self.assertNotIn('three\n', result)
+
     def test_name_attribute(self):
         # Test both .name and .name() (for backward compatibility)
         stream = tutil.MockStreamReader()
@@ -129,6 +144,30 @@ class StreamFileReaderTestCase(unittest.TestCase):
         self.assertEqual('nametest', sfile.name)
         self.assertEqual('nametest', sfile.name())
 
+    def check_decompression(self, compress_ext, compress_func):
+        test_text = 'decompression\ntest\n'
+        test_data = compress_func(test_text)
+        stream = tutil.MockStreamReader('.', test_data)
+        reader = StreamFileReader(stream, [[0, len(test_data), 0]],
+                                  'test.' + compress_ext)
+        self.assertEqual(test_text, ''.join(reader.readall_decompressed()))
+
+    @staticmethod
+    def gzip_compress(data):
+        compressed_data = io.BytesIO()
+        with gzip.GzipFile(fileobj=compressed_data, mode='wb') as gzip_file:
+            gzip_file.write(data)
+        return compressed_data.getvalue()
+
+    def test_no_decompression(self):
+        self.check_decompression('log', lambda s: s)
+
+    def test_gzip_decompression(self):
+        self.check_decompression('gz', self.gzip_compress)
+
+    def test_bz2_decompression(self):
+        self.check_decompression('bz2', bz2.compress)
+
 
 class StreamRetryTestMixin(object):
     # Define reader_for(coll_name, **kwargs)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list