[ARVADOS] created: 038b1bc4fe952b0bf95b566542973e55c4a6ecc7

git at public.curoverse.com git at public.curoverse.com
Tue Nov 4 09:09:31 EST 2014


        at  038b1bc4fe952b0bf95b566542973e55c4a6ecc7 (commit)


commit 038b1bc4fe952b0bf95b566542973e55c4a6ecc7
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Nov 4 09:09:16 2014 -0500

    3603: PySDK Collection objects support file-like APIs.
    
    This commit adds an open() method to CollectionReader and
    CollectionWriter.  They mimic the built-in open(), returning objects
    that implement as much as the Python file API as I can reasonably
    manage.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
new file mode 100644
index 0000000..ab695e9
--- /dev/null
+++ b/sdk/python/arvados/arvfile.py
@@ -0,0 +1,32 @@
+import functools
+
+class ArvadosFileBase(object):
+    def __init__(self, name, mode):
+        self.name = name
+        self.mode = mode
+        self.closed = False
+
+    @staticmethod
+    def _before_close(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            if self.closed:
+                raise ValueError("I/O operation on closed stream file")
+            return orig_func(self, *args, **kwargs)
+        return wrapper
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        try:
+            self.close()
+        except Exception:
+            if exc_type is None:
+                raise
+
+    def close(self):
+        self.closed = True
+
+    def isatty(self):
+        return False
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f7e2bf0..9755555 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -5,8 +5,9 @@ import re
 from collections import deque
 from stat import *
 
+from .arvfile import ArvadosFileBase
 from keep import *
-from stream import *
+from .stream import StreamReader, split
 import config
 import errors
 import util
@@ -196,10 +197,7 @@ class CollectionReader(CollectionBase):
         streams = {}
         for s in self.all_streams():
             for f in s.all_files():
-                filestream = s.name() + "/" + f.name()
-                r = filestream.rindex("/")
-                streamname = filestream[:r]
-                filename = filestream[r+1:]
+                streamname, filename = split(s.name() + "/" + f.name())
                 if streamname not in streams:
                     streams[streamname] = {}
                 if filename not in streams[streamname]:
@@ -215,6 +213,31 @@ class CollectionReader(CollectionBase):
             [StreamReader(stream, keep=self._my_keep()).manifest_text()
              for stream in self._streams])
 
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to read from the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object to read that file.
+        """
+        self._populate()
+        if filename is None:
+            streampath, filename = split(streampath)
+        keep_client = self._my_keep()
+        for stream_s in self._streams:
+            stream = StreamReader(stream_s, keep_client,
+                                  num_retries=self.num_retries)
+            if stream.name() == streampath:
+                break
+        else:
+            raise ValueError("stream '{}' not found in Collection".
+                             format(streampath))
+        try:
+            return stream.files()[filename]
+        except KeyError:
+            raise ValueError("file '{}' not found in Collection stream '{}'".
+                             format(filename, streampath))
+
     def all_streams(self):
         self._populate()
         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
@@ -237,6 +260,25 @@ class CollectionReader(CollectionBase):
             return self._manifest_text
 
 
+class _WriterFile(ArvadosFileBase):
+    def __init__(self, coll_writer, name):
+        super(_WriterFile, self).__init__(name, 'wb')
+        self.dest = coll_writer
+
+    def close(self):
+        super(_WriterFile, self).close()
+        self.dest.finish_current_file()
+
+    @ArvadosFileBase._before_close
+    def write(self, data):
+        self.dest.write(data)
+
+    @ArvadosFileBase._before_close
+    def writelines(self, seq):
+        for data in seq:
+            self.write(data)
+
+
 class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
@@ -273,6 +315,7 @@ class CollectionWriter(CollectionBase):
         self._queued_file = None
         self._queued_dirents = deque()
         self._queued_trees = deque()
+        self._last_open = None
 
     def __exit__(self, exc_type, exc_value, traceback):
         if exc_type is None:
@@ -379,6 +422,35 @@ class CollectionWriter(CollectionBase):
         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
             self.flush_data()
 
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to write to the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object you can write to add it to the
+        Collection.
+
+        You may only have one file object from the Collection open at a time,
+        so be sure to close the object when you're done.  Using the object in
+        a with statement makes that easy::
+
+          with cwriter.open('./doc/page1.txt') as outfile:
+              outfile.write(page1_data)
+          with cwriter.open('./doc/page2.txt') as outfile:
+              outfile.write(page2_data)
+        """
+        if filename is None:
+            streampath, filename = split(streampath)
+        if self._last_open and not self._last_open.closed:
+            raise errors.AssertionError(
+                "can't open '{}' when '{}' is still open".format(
+                    filename, self._last_open.name))
+        if streampath != self.current_stream_name():
+            self.start_new_stream(streampath)
+        self.set_current_file_name(filename)
+        self._last_open = _WriterFile(self, filename)
+        return self._last_open
+
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 8f787b7..6a81634 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -5,6 +5,7 @@ import os
 import re
 import zlib
 
+from .arvfile import ArvadosFileBase
 from arvados.retry import retry_method
 from keep import *
 import config
@@ -89,25 +90,53 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
         i += 1
     return resp
 
+def split(path):
+    """split(path) -> streamname, filename
+
+    Separate the stream name and file name in a /-separated stream path.
+    If no stream name is available, assume '.'.
+    """
+    try:
+        stream_name, file_name = path.rsplit('/', 1)
+    except ValueError:  # No / in string
+        stream_name, file_name = '.', path
+    return stream_name, file_name
+
+class StreamFileReader(ArvadosFileBase):
+    class _NameAttribute(str):
+        # The Python file API provides a plain .name attribute.
+        # Older SDK provided a name() method.
+        # This class provides both, for maximum compatibility.
+        def __call__(self):
+            return self
+
 
-class StreamFileReader(object):
     def __init__(self, stream, segments, name):
+        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
         self._stream = stream
         self.segments = segments
-        self._name = name
         self._filepos = 0L
         self.num_retries = stream.num_retries
+        self._readline_cache = (-1, None, None)
 
-    def name(self):
-        return self._name
+    def __iter__(self):
+        # If we've already started reading the file, don't decompress;
+        # that implicitly seeks to the beginning, which we don't want here.
+        return self.readlines(decompress=self.tell() == 0)
 
     def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self._name)
+        return re.sub('\.(bz2|gz)$', '', self.name)
 
     def stream_name(self):
         return self._stream.name()
 
-    def seek(self, pos):
+    @ArvadosFileBase._before_close
+    def seek(self, pos, rel=os.SEEK_SET):
+        """Note that the default is SEEK_SET, not Python's usual SEEK_CUR."""
+        if rel == os.SEEK_CUR:
+            pos += self._filepos
+        elif rel == os.SEEK_END:
+            pos += self.size()
         self._filepos = min(max(pos, 0L), self.size())
 
     def tell(self):
@@ -117,6 +146,7 @@ class StreamFileReader(object):
         n = self.segments[-1]
         return n[OFFSET] + n[BLOCKSIZE]
 
+    @ArvadosFileBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at the current file position"""
@@ -133,6 +163,7 @@ class StreamFileReader(object):
         self._filepos += len(data)
         return data
 
+    @ArvadosFileBase._before_close
     @retry_method
     def readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
@@ -145,6 +176,7 @@ class StreamFileReader(object):
                                               num_retries=num_retries))
         return ''.join(data)
 
+    @ArvadosFileBase._before_close
     @retry_method
     def readall(self, size=2**20, num_retries=None):
         while True:
@@ -153,42 +185,63 @@ class StreamFileReader(object):
                 break
             yield data
 
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readline(self, read_iter=None, num_retries=None):
+        cache_pos, cache_iter, cache_data = self._readline_cache
+        if (((read_iter is None) or (read_iter is cache_iter)) and
+              (self.tell() == cache_pos)):
+            read_iter = cache_iter
+            data = [cache_data]
+        else:
+            if read_iter is None:
+                read_iter = self.readall_decompressed(num_retries=num_retries)
+            data = ['']
+        while '\n' not in data[-1]:
+            try:
+                data.append(next(read_iter))
+            except StopIteration:
+                break
+        data = ''.join(data)
+        try:
+            nextline_index = data.index('\n') + 1
+        except ValueError:
+            nextline_index = len(data)
+        self._readline_cache = (self.tell(), read_iter, data[nextline_index:])
+        return data[:nextline_index]
+
+    @ArvadosFileBase._before_close
     @retry_method
     def decompress(self, decompress, size, num_retries=None):
         for segment in self.readall(size, num_retries):
             data = decompress(segment)
-            if data and data != '':
+            if data:
                 yield data
 
+    @ArvadosFileBase._before_close
     @retry_method
     def readall_decompressed(self, size=2**20, num_retries=None):
         self.seek(0)
-        if re.search('\.bz2$', self._name):
+        if self.name.endswith('.bz2'):
             dc = bz2.BZ2Decompressor()
             return self.decompress(dc.decompress, size,
                                    num_retries=num_retries)
-        elif re.search('\.gz$', self._name):
+        elif self.name.endswith('.gz'):
             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
                                    size, num_retries=num_retries)
         else:
             return self.readall(size, num_retries=num_retries)
 
+    @ArvadosFileBase._before_close
     @retry_method
     def readlines(self, decompress=True, num_retries=None):
         read_func = self.readall_decompressed if decompress else self.readall
-        data = ''
-        for newdata in read_func(num_retries=num_retries):
-            data += newdata
-            sol = 0
-            while True:
-                eol = data.find("\n", sol)
-                if eol < 0:
-                    break
-                yield data[sol:eol+1]
-                sol = eol+1
-            data = data[sol:]
-        if data != '':
+        read_iter = read_func(num_retries=num_retries)
+        while True:
+            data = self.readline(read_iter, num_retries=num_retries)
+            if not data:
+                break
             yield data
 
     def as_manifest(self):
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 0dbf9bc..9655f25 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import errno
+import hashlib
 import httplib
 import httplib2
 import mock
@@ -24,6 +25,21 @@ def mock_responses(body, *codes, **headers):
     return mock.patch('httplib2.Http.request', side_effect=(
             (fake_httplib2_response(code, **headers), body) for code in codes))
 
+class MockStreamReader(object):
+    def __init__(self, name='.', *data):
+        self._name = name
+        self._data = ''.join(data)
+        self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
+                                              len(d)) for d in data]
+        self.num_retries = 0
+
+    def name(self):
+        return self._name
+
+    def readfrom(self, start, size, num_retries=None):
+        return self._data[start:start + size]
+
+
 class ArvadosBaseTestCase(unittest.TestCase):
     # This class provides common utility functions for our tests.
 
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index c4c7ca2..cbbd0d5 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -370,79 +370,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
                                                                        ['c', 5, 0, 1]])
 
-    class MockStreamReader(object):
-        def __init__(self, content):
-            self.content = content
-            self.num_retries = 0
-
-        def readfrom(self, start, size, num_retries=0):
-            return self.content[start:start+size]
-
-    def test_file_stream(self):
-        content = 'abcdefghijklmnopqrstuvwxyz0123456789'
-        msr = self.MockStreamReader(content)
-        segments = [[0, 10, 0],
-                    [10, 15, 10],
-                    [25, 5, 25]]
-
-        sfr = arvados.StreamFileReader(msr, segments, "test")
-
-        self.assertEqual(sfr.name(), "test")
-        self.assertEqual(sfr.size(), 30)
-
-        self.assertEqual(sfr.readfrom(0, 30), content[0:30])
-        self.assertEqual(sfr.readfrom(2, 30), content[2:30])
-
-        self.assertEqual(sfr.readfrom(2, 8), content[2:10])
-        self.assertEqual(sfr.readfrom(0, 10), content[0:10])
-
-        self.assertEqual(sfr.tell(), 0)
-        self.assertEqual(sfr.read(5), content[0:5])
-        self.assertEqual(sfr.tell(), 5)
-        self.assertEqual(sfr.read(5), content[5:10])
-        self.assertEqual(sfr.tell(), 10)
-        self.assertEqual(sfr.read(5), content[10:15])
-        self.assertEqual(sfr.tell(), 15)
-        self.assertEqual(sfr.read(5), content[15:20])
-        self.assertEqual(sfr.tell(), 20)
-        self.assertEqual(sfr.read(5), content[20:25])
-        self.assertEqual(sfr.tell(), 25)
-        self.assertEqual(sfr.read(5), content[25:30])
-        self.assertEqual(sfr.tell(), 30)
-        self.assertEqual(sfr.read(5), '')
-        self.assertEqual(sfr.tell(), 30)
-
-        segments = [[26, 10, 0],
-                    [0, 15, 10],
-                    [15, 5, 25]]
-
-        sfr = arvados.StreamFileReader(msr, segments, "test")
-
-        self.assertEqual(sfr.size(), 30)
-
-        self.assertEqual(sfr.readfrom(0, 30), content[26:36] + content[0:20])
-        self.assertEqual(sfr.readfrom(2, 30), content[28:36] + content[0:20])
-
-        self.assertEqual(sfr.readfrom(2, 8), content[28:36])
-        self.assertEqual(sfr.readfrom(0, 10), content[26:36])
-
-        self.assertEqual(sfr.tell(), 0)
-        self.assertEqual(sfr.read(5), content[26:31])
-        self.assertEqual(sfr.tell(), 5)
-        self.assertEqual(sfr.read(5), content[31:36])
-        self.assertEqual(sfr.tell(), 10)
-        self.assertEqual(sfr.read(5), content[0:5])
-        self.assertEqual(sfr.tell(), 15)
-        self.assertEqual(sfr.read(5), content[5:10])
-        self.assertEqual(sfr.tell(), 20)
-        self.assertEqual(sfr.read(5), content[10:15])
-        self.assertEqual(sfr.tell(), 25)
-        self.assertEqual(sfr.read(5), content[15:20])
-        self.assertEqual(sfr.tell(), 30)
-        self.assertEqual(sfr.read(5), '')
-        self.assertEqual(sfr.tell(), 30)
-
-
     class MockKeep(object):
         def __init__(self, content, num_retries=0):
             self.content = content
@@ -474,30 +401,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         self.assertEqual(sr.readfrom(25, 5), content[25:30])
         self.assertEqual(sr.readfrom(30, 5), '')
 
-    def test_file_reader(self):
-        keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
-                      'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
-                      'cccccccccccccccccccccccccccccccc+5': 'z0123'}
-        mk = self.MockKeep(keepblocks)
-
-        sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:10:foo", "15:10:foo"], mk)
-
-        content = 'abcdefghijpqrstuvwxy'
-
-        f = sr.files()["foo"]
-
-        # f.read() calls will be aligned on block boundaries (as a
-        # result of ticket #3663).
-
-        f.seek(0)
-        self.assertEqual(f.read(20), content[0:10])
-
-        f.seek(0)
-        self.assertEqual(f.read(6), content[0:6])
-        self.assertEqual(f.read(6), content[6:10])
-        self.assertEqual(f.read(6), content[10:16])
-        self.assertEqual(f.read(6), content[16:20])
-
     def test_extract_file(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
@@ -801,6 +704,44 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
             [[f.size(), f.stream_name(), f.name()]
              for f in reader.all_streams()[0].all_files()])
 
+    def check_open_file(self, coll_file, stream_name, file_name, file_size):
+        self.assertFalse(coll_file.closed, "returned file is not open")
+        self.assertEqual(stream_name, coll_file.stream_name())
+        self.assertEqual(file_name, coll_file.name())
+        self.assertEqual(file_size, coll_file.size())
+
+    def test_open_collection_file_one_argument(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        cfile = reader.open('./foo')
+        self.check_open_file(cfile, '.', 'foo', 3)
+
+    def test_open_collection_file_two_arguments(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        cfile = reader.open('.', 'foo')
+        self.check_open_file(cfile, '.', 'foo', 3)
+
+    def test_open_deep_file(self):
+        coll_name = 'collection_with_files_in_subdir'
+        client = self.api_client_mock(200)
+        self.mock_get_collection(client, 200, coll_name)
+        reader = arvados.CollectionReader(
+            self.API_COLLECTIONS[coll_name]['uuid'], api_client=client)
+        cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt')
+        self.check_open_file(cfile, './subdir2/subdir3', 'file2_in_subdir3.txt',
+                             32)
+
+    def test_open_nonexistent_stream(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        self.assertRaises(ValueError, reader.open, './nonexistent', 'foo')
+
+    def test_open_nonexistent_file(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        self.assertRaises(ValueError, reader.open, '.', 'nonexistent')
+
 
 @tutil.skip_sleep
 class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
@@ -839,6 +780,63 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             writer.flush_data()
         self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
 
+    def test_one_open(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('out') as out_file:
+            self.assertEqual('.', writer.current_stream_name())
+            self.assertEqual('out', writer.current_file_name())
+            out_file.write('test data')
+            data_loc = hashlib.md5('test data').hexdigest() + '+9'
+        self.assertTrue(out_file.closed, "writer file not closed after context")
+        self.assertRaises(ValueError, out_file.write, 'extra text')
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:9:out\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_open_writelines(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('six') as out_file:
+            out_file.writelines(['12', '34', '56'])
+            data_loc = hashlib.md5('123456').hexdigest() + '+6'
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:6:six\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_two_opens_same_stream(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('.', '1') as out_file:
+            out_file.write('1st')
+        with writer.open('.', '2') as out_file:
+            out_file.write('2nd')
+        data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_two_opens_two_streams(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('file') as out_file:
+            out_file.write('file')
+            data_loc1 = hashlib.md5('file').hexdigest() + '+4'
+        with self.mock_keep(data_loc1, 200) as keep_mock:
+            with writer.open('./dir', 'indir') as out_file:
+                out_file.write('indir')
+                data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+        with self.mock_keep(data_loc2, 200) as keep_mock:
+            expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
+                data_loc1, data_loc2)
+            self.assertEqual(expected, writer.manifest_text())
+
+    def test_dup_open_fails(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        file1 = writer.open('one')
+        self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 3970d67..e272845 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+import os
 import mock
 import unittest
 
@@ -9,6 +10,126 @@ from arvados import StreamReader, StreamFileReader
 import arvados_testutil as tutil
 import run_test_server
 
+class StreamFileReaderTestCase(unittest.TestCase):
+    def make_count_reader(self):
+        stream = tutil.MockStreamReader('.', '01234', '34567', '67890')
+        return StreamFileReader(stream, [[1, 3, 0], [6, 3, 3], [11, 3, 6]],
+                                'count.txt')
+
+    def test_read_returns_first_block(self):
+        # read() calls will be aligned on block boundaries - see #3663.
+        sfile = self.make_count_reader()
+        self.assertEqual('123', sfile.read(10))
+
+    def test_small_read(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('12', sfile.read(2))
+
+    def test_successive_reads(self):
+        sfile = self.make_count_reader()
+        for expect in ['123', '456', '789', '']:
+            self.assertEqual(expect, sfile.read(10))
+
+    def test_readfrom_spans_blocks(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('6789', sfile.readfrom(5, 12))
+
+    def test_small_readfrom_spanning_blocks(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('2345', sfile.readfrom(1, 4))
+
+    def test_readall(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('123456789', ''.join(sfile.readall()))
+
+    def test_one_arg_seek(self):
+        # Our default has been SEEK_SET since time immemorial.
+        self.test_absolute_seek([])
+
+    def test_absolute_seek(self, args=[os.SEEK_SET]):
+        sfile = self.make_count_reader()
+        sfile.seek(6, *args)
+        self.assertEqual('78', sfile.read(2))
+        sfile.seek(4, *args)
+        self.assertEqual('56', sfile.read(2))
+
+    def test_relative_seek(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('12', sfile.read(2))
+        sfile.seek(2, os.SEEK_CUR)
+        self.assertEqual('56', sfile.read(2))
+
+    def test_end_seek(self):
+        sfile = self.make_count_reader()
+        sfile.seek(-6, os.SEEK_END)
+        self.assertEqual('45', sfile.read(2))
+
+    def test_seek_min_zero(self):
+        sfile = self.make_count_reader()
+        sfile.seek(-2, os.SEEK_SET)
+        self.assertEqual(0, sfile.tell())
+
+    def test_seek_max_size(self):
+        sfile = self.make_count_reader()
+        sfile.seek(2, os.SEEK_END)
+        self.assertEqual(9, sfile.tell())
+
+    def test_size(self):
+        self.assertEqual(9, self.make_count_reader().size())
+
+    def test_tell_after_block_read(self):
+        sfile = self.make_count_reader()
+        sfile.read(5)
+        self.assertEqual(3, sfile.tell())
+
+    def test_tell_after_small_read(self):
+        sfile = self.make_count_reader()
+        sfile.read(1)
+        self.assertEqual(1, sfile.tell())
+
+    def test_no_read_after_close(self):
+        sfile = self.make_count_reader()
+        sfile.close()
+        self.assertRaises(ValueError, sfile.read, 2)
+
+    def test_context(self):
+        with self.make_count_reader() as sfile:
+            self.assertFalse(sfile.closed, "reader is closed inside context")
+            self.assertEqual('12', sfile.read(2))
+        self.assertTrue(sfile.closed, "reader is open after context")
+
+    def make_newlines_reader(self):
+        stream = tutil.MockStreamReader('.', 'one\ntwo\n\nth', 'ree\nfour\n\n')
+        return StreamFileReader(stream, [[0, 11, 0], [11, 10, 11]], 'count.txt')
+
+    def check_lines(self, actual):
+        self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'],
+                         actual)
+
+    def test_readline(self):
+        reader = self.make_newlines_reader()
+        actual = []
+        while True:
+            data = reader.readline()
+            if not data:
+                break
+            actual.append(data)
+        self.check_lines(actual)
+
+    def test_readlines(self):
+        self.check_lines(list(self.make_newlines_reader().readlines()))
+
+    def test_iteration(self):
+        self.check_lines(list(iter(self.make_newlines_reader())))
+
+    def test_name_attribute(self):
+        # Test both .name and .name() (for backward compatibility)
+        stream = tutil.MockStreamReader()
+        sfile = StreamFileReader(stream, [[0, 0, 0]], 'nametest')
+        self.assertEqual('nametest', sfile.name)
+        self.assertEqual('nametest', sfile.name())
+
+
 class StreamRetryTestMixin(object):
     # Define reader_for(coll_name, **kwargs)
     # and read_for_test(reader, size, **kwargs).

commit 9166af7ca932880e7577501b8a4a3a8c14b45640
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Oct 24 11:16:05 2014 -0400

    3603: Clean up PySDK imports.
    
    Sort; remove unused imports.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index bf09285..f7e2bf0 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1,22 +1,6 @@
-import gflags
-import httplib
-import httplib2
 import logging
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
 import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
 
 from collections import deque
 from stat import *
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index f5975b9..8f787b7 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -1,22 +1,9 @@
-import gflags
-import httplib
-import httplib2
+import bz2
+import collections
+import hashlib
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
 import re
-import hashlib
-import string
-import bz2
 import zlib
-import fcntl
-import time
-import threading
-import collections
 
 from arvados.retry import retry_method
 from keep import *
@@ -195,7 +182,7 @@ class StreamFileReader(object):
             data += newdata
             sol = 0
             while True:
-                eol = string.find(data, "\n", sol)
+                eol = data.find("\n", sol)
                 if eol < 0:
                     break
                 yield data[sol:eol+1]

commit 52a2fc66619f3796853304e300d3d9df5f27fa32
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Oct 24 11:23:09 2014 -0400

    3603: Fix context methods for PySDK Collection objects.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 3a90d6d..bf09285 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -71,9 +71,9 @@ def normalize_stream(s, stream):
 
 class CollectionBase(object):
     def __enter__(self):
-        pass
+        return self
 
-    def __exit__(self):
+    def __exit__(self, exc_type, exc_value, traceback):
         pass
 
     def _my_keep(self):
@@ -290,8 +290,9 @@ class CollectionWriter(CollectionBase):
         self._queued_dirents = deque()
         self._queued_trees = deque()
 
-    def __exit__(self):
-        self.finish()
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type is None:
+            self.finish()
 
     def do_queued_work(self):
         # The work queue consists of three pieces:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list