[ARVADOS] updated: 26aa2f1a1c8c9883beac1538c318279190f91c8a

git at public.curoverse.com git at public.curoverse.com
Wed Nov 12 09:57:59 EST 2014


Summary of changes:
 sdk/python/arvados/arvfile.py        |  29 ++++
 sdk/python/arvados/collection.py     | 109 +++++++++++---
 sdk/python/arvados/stream.py         | 119 ++++++++++-----
 sdk/python/tests/arvados_testutil.py |  16 ++
 sdk/python/tests/test_collections.py | 275 ++++++++++++++---------------------
 sdk/python/tests/test_stream.py      | 160 ++++++++++++++++++++
 6 files changed, 482 insertions(+), 226 deletions(-)
 create mode 100644 sdk/python/arvados/arvfile.py

       via  26aa2f1a1c8c9883beac1538c318279190f91c8a (commit)
       via  7bf8f6c701e28e574c137b0c942522e8f8ee4d8c (commit)
       via  7924077e3db2898de26c86599d9f311e02d6db46 (commit)
       via  c1d9150de2977f7eb85e7cb058d5c41ae6f06173 (commit)
      from  ee3e85858a60bffc2f132e2f95ba92d1deb78c2c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 26aa2f1a1c8c9883beac1538c318279190f91c8a
Merge: ee3e858 7bf8f6c
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 12 09:57:44 2014 -0500

    Merge branch '3603-pysdk-file-api-wip'
    
    Refs #3603.  Closes #4316.


commit 7bf8f6c701e28e574c137b0c942522e8f8ee4d8c
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 12 09:42:17 2014 -0500

    3603: PySDK Collection objects support file-like APIs.
    
    This commit adds an open() method to CollectionReader and
    CollectionWriter.  They mimic the built-in open(), returning objects
    that implement as much as the Python file API as I can reasonably
    manage.
    
    There are a couple of backwards-incompatible changes to
    StreamFileReader here:
    
    * seek() now defaults to SEEK_CUR rather than SEEK_SET behavior.
    * readlines() returns a list, and no longer supports decompression.
    
    These changes bring the object closer to a file-like object.  We
    reviewed our existing Python code, including known Crunch scripts from
    users.  Nobody was using seek(), and nobody was obviously relying on
    the decompression behavior of readlines().  So we believe this is
    reasonably safe.
    
    (Contrast name(), which lots of things are using, so we provide a
    backward compatibility shim for it.)

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
new file mode 100644
index 0000000..e8dac46
--- /dev/null
+++ b/sdk/python/arvados/arvfile.py
@@ -0,0 +1,29 @@
+import functools
+
+class ArvadosFileBase(object):
+    def __init__(self, name, mode):
+        self.name = name
+        self.mode = mode
+        self.closed = False
+
+    @staticmethod
+    def _before_close(orig_func):
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            if self.closed:
+                raise ValueError("I/O operation on closed stream file")
+            return orig_func(self, *args, **kwargs)
+        return wrapper
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        try:
+            self.close()
+        except Exception:
+            if exc_type is None:
+                raise
+
+    def close(self):
+        self.closed = True
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 59dc49f..fa782a1 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -5,8 +5,9 @@ import re
 from collections import deque
 from stat import *
 
+from .arvfile import ArvadosFileBase
 from keep import *
-from stream import *
+from .stream import StreamReader, split
 import config
 import errors
 import util
@@ -196,10 +197,7 @@ class CollectionReader(CollectionBase):
         streams = {}
         for s in self.all_streams():
             for f in s.all_files():
-                filestream = s.name() + "/" + f.name()
-                r = filestream.rindex("/")
-                streamname = filestream[:r]
-                filename = filestream[r+1:]
+                streamname, filename = split(s.name() + "/" + f.name())
                 if streamname not in streams:
                     streams[streamname] = {}
                 if filename not in streams[streamname]:
@@ -215,6 +213,31 @@ class CollectionReader(CollectionBase):
             [StreamReader(stream, keep=self._my_keep()).manifest_text()
              for stream in self._streams])
 
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to read from the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object to read that file.
+        """
+        self._populate()
+        if filename is None:
+            streampath, filename = split(streampath)
+        keep_client = self._my_keep()
+        for stream_s in self._streams:
+            stream = StreamReader(stream_s, keep_client,
+                                  num_retries=self.num_retries)
+            if stream.name() == streampath:
+                break
+        else:
+            raise ValueError("stream '{}' not found in Collection".
+                             format(streampath))
+        try:
+            return stream.files()[filename]
+        except KeyError:
+            raise ValueError("file '{}' not found in Collection stream '{}'".
+                             format(filename, streampath))
+
     def all_streams(self):
         self._populate()
         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
@@ -237,6 +260,29 @@ class CollectionReader(CollectionBase):
             return self._manifest_text
 
 
+class _WriterFile(ArvadosFileBase):
+    def __init__(self, coll_writer, name):
+        super(_WriterFile, self).__init__(name, 'wb')
+        self.dest = coll_writer
+
+    def close(self):
+        super(_WriterFile, self).close()
+        self.dest.finish_current_file()
+
+    @ArvadosFileBase._before_close
+    def write(self, data):
+        self.dest.write(data)
+
+    @ArvadosFileBase._before_close
+    def writelines(self, seq):
+        for data in seq:
+            self.write(data)
+
+    @ArvadosFileBase._before_close
+    def flush(self):
+        self.dest.flush_data()
+
+
 class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
@@ -273,6 +319,7 @@ class CollectionWriter(CollectionBase):
         self._queued_file = None
         self._queued_dirents = deque()
         self._queued_trees = deque()
+        self._last_open = None
 
     def __exit__(self, exc_type, exc_value, traceback):
         if exc_type is None:
@@ -379,6 +426,35 @@ class CollectionWriter(CollectionBase):
         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
             self.flush_data()
 
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to write to the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object you can write to add it to the
+        Collection.
+
+        You may only have one file object from the Collection open at a time,
+        so be sure to close the object when you're done.  Using the object in
+        a with statement makes that easy::
+
+          with cwriter.open('./doc/page1.txt') as outfile:
+              outfile.write(page1_data)
+          with cwriter.open('./doc/page2.txt') as outfile:
+              outfile.write(page2_data)
+        """
+        if filename is None:
+            streampath, filename = split(streampath)
+        if self._last_open and not self._last_open.closed:
+            raise errors.AssertionError(
+                "can't open '{}' when '{}' is still open".format(
+                    filename, self._last_open.name))
+        if streampath != self.current_stream_name():
+            self.start_new_stream(streampath)
+        self.set_current_file_name(filename)
+        self._last_open = _WriterFile(self, filename)
+        return self._last_open
+
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 8f787b7..c263dd8 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -5,6 +5,7 @@ import os
 import re
 import zlib
 
+from .arvfile import ArvadosFileBase
 from arvados.retry import retry_method
 from keep import *
 import config
@@ -89,25 +90,54 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
         i += 1
     return resp
 
+def split(path):
+    """split(path) -> streamname, filename
+
+    Separate the stream name and file name in a /-separated stream path.
+    If no stream name is available, assume '.'.
+    """
+    try:
+        stream_name, file_name = path.rsplit('/', 1)
+    except ValueError:  # No / in string
+        stream_name, file_name = '.', path
+    return stream_name, file_name
+
+class StreamFileReader(ArvadosFileBase):
+    class _NameAttribute(str):
+        # The Python file API provides a plain .name attribute.
+        # Older SDK provided a name() method.
+        # This class provides both, for maximum compatibility.
+        def __call__(self):
+            return self
+
 
-class StreamFileReader(object):
     def __init__(self, stream, segments, name):
+        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
         self._stream = stream
         self.segments = segments
-        self._name = name
         self._filepos = 0L
         self.num_retries = stream.num_retries
+        self._readline_cache = (None, None)
 
-    def name(self):
-        return self._name
+    def __iter__(self):
+        while True:
+            data = self.readline()
+            if not data:
+                break
+            yield data
 
     def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self._name)
+        return re.sub('\.(bz2|gz)$', '', self.name)
 
     def stream_name(self):
         return self._stream.name()
 
-    def seek(self, pos):
+    @ArvadosFileBase._before_close
+    def seek(self, pos, whence=os.SEEK_CUR):
+        if whence == os.SEEK_CUR:
+            pos += self._filepos
+        elif whence == os.SEEK_END:
+            pos += self.size()
         self._filepos = min(max(pos, 0L), self.size())
 
     def tell(self):
@@ -117,6 +147,7 @@ class StreamFileReader(object):
         n = self.segments[-1]
         return n[OFFSET] + n[BLOCKSIZE]
 
+    @ArvadosFileBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at the current file position"""
@@ -133,6 +164,7 @@ class StreamFileReader(object):
         self._filepos += len(data)
         return data
 
+    @ArvadosFileBase._before_close
     @retry_method
     def readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
@@ -145,6 +177,7 @@ class StreamFileReader(object):
                                               num_retries=num_retries))
         return ''.join(data)
 
+    @ArvadosFileBase._before_close
     @retry_method
     def readall(self, size=2**20, num_retries=None):
         while True:
@@ -153,43 +186,64 @@ class StreamFileReader(object):
                 break
             yield data
 
+    @ArvadosFileBase._before_close
+    @retry_method
+    def readline(self, size=float('inf'), num_retries=None):
+        cache_pos, cache_data = self._readline_cache
+        if self.tell() == cache_pos:
+            data = [cache_data]
+        else:
+            data = ['']
+        data_size = len(data[-1])
+        while (data_size < size) and ('\n' not in data[-1]):
+            next_read = self.read(2 ** 20, num_retries=num_retries)
+            if not next_read:
+                break
+            data.append(next_read)
+            data_size += len(next_read)
+        data = ''.join(data)
+        try:
+            nextline_index = data.index('\n') + 1
+        except ValueError:
+            nextline_index = len(data)
+        nextline_index = min(nextline_index, size)
+        self._readline_cache = (self.tell(), data[nextline_index:])
+        return data[:nextline_index]
+
+    @ArvadosFileBase._before_close
     @retry_method
     def decompress(self, decompress, size, num_retries=None):
         for segment in self.readall(size, num_retries):
             data = decompress(segment)
-            if data and data != '':
+            if data:
                 yield data
 
+    @ArvadosFileBase._before_close
     @retry_method
     def readall_decompressed(self, size=2**20, num_retries=None):
         self.seek(0)
-        if re.search('\.bz2$', self._name):
+        if self.name.endswith('.bz2'):
             dc = bz2.BZ2Decompressor()
             return self.decompress(dc.decompress, size,
                                    num_retries=num_retries)
-        elif re.search('\.gz$', self._name):
+        elif self.name.endswith('.gz'):
             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
                                    size, num_retries=num_retries)
         else:
             return self.readall(size, num_retries=num_retries)
 
+    @ArvadosFileBase._before_close
     @retry_method
-    def readlines(self, decompress=True, num_retries=None):
-        read_func = self.readall_decompressed if decompress else self.readall
-        data = ''
-        for newdata in read_func(num_retries=num_retries):
-            data += newdata
-            sol = 0
-            while True:
-                eol = data.find("\n", sol)
-                if eol < 0:
-                    break
-                yield data[sol:eol+1]
-                sol = eol+1
-            data = data[sol:]
-        if data != '':
-            yield data
+    def readlines(self, sizehint=float('inf'), num_retries=None):
+        data = []
+        data_size = 0
+        for s in self.readall(num_retries=num_retries):
+            data.append(s)
+            data_size += len(s)
+            if data_size >= sizehint:
+                break
+        return ''.join(data).splitlines(True)
 
     def as_manifest(self):
         manifest_text = ['.']
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 0e2800c..04ca6b5 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import errno
+import hashlib
 import httplib
 import httplib2
 import io
@@ -50,6 +51,21 @@ def mock_requestslib_responses(method, body, *codes, **headers):
     return mock.patch(method, side_effect=(
         fake_requests_response(code, body, **headers) for code in codes))
 
+class MockStreamReader(object):
+    def __init__(self, name='.', *data):
+        self._name = name
+        self._data = ''.join(data)
+        self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
+                                              len(d)) for d in data]
+        self.num_retries = 0
+
+    def name(self):
+        return self._name
+
+    def readfrom(self, start, size, num_retries=None):
+        return self._data[start:start + size]
+
+
 class ArvadosBaseTestCase(unittest.TestCase):
     # This class provides common utility functions for our tests.
 
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 30e7044..254a29f 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -3,14 +3,12 @@
 # ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
 
 import arvados
-import bz2
 import copy
 import hashlib
 import mock
 import os
 import pprint
 import re
-import subprocess
 import tempfile
 import unittest
 
@@ -124,25 +122,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                            [2, '.', 'ob.txt', 'ob'],
                            [0, '.', 'zero.txt', '']])
 
-    def _test_readline(self, what_in, what_out):
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.txt')
-        cw.write(what_in)
-        test1 = cw.finish()
-        cr = arvados.CollectionReader(test1, self.api_client)
-        got = []
-        for x in list(cr.all_files())[0].readlines():
-            got += [x]
-        self.assertEqual(got,
-                         what_out,
-                         "readlines did not split lines correctly: %s" % got)
-
-    def test_collection_readline(self):
-        self._test_readline("\na\nbcd\n\nefg\nz",
-                            ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
-        self._test_readline("ab\ncd\n",
-                            ["ab\n", "cd\n"])
-
     def test_collection_empty_file(self):
         cw = arvados.CollectionWriter(self.api_client)
         cw.start_new_file('zero.txt')
@@ -179,53 +158,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
             got_sizes += [f.size()]
         self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes))
 
-    def test_collection_bz2_decompression(self):
-        n_lines_in = 2**18
-        data_in = "abc\n"
-        for x in xrange(0, 18):
-            data_in += data_in
-        compressed_data_in = bz2.compress(data_in)
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.bz2')
-        cw.write(compressed_data_in)
-        bz2_manifest = cw.manifest_text()
-
-        cr = arvados.CollectionReader(bz2_manifest, self.api_client)
-
-        got = 0
-        for x in list(cr.all_files())[0].readlines():
-            self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
-            got += 1
-        self.assertEqual(got,
-                         n_lines_in,
-                         "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
-    def test_collection_gzip_decompression(self):
-        n_lines_in = 2**18
-        data_in = "abc\n"
-        for x in xrange(0, 18):
-            data_in += data_in
-        p = subprocess.Popen(["gzip", "-1cn"],
-                             stdout=subprocess.PIPE,
-                             stdin=subprocess.PIPE,
-                             stderr=subprocess.PIPE,
-                             shell=False, close_fds=True)
-        compressed_data_in, stderrdata = p.communicate(data_in)
-
-        cw = arvados.CollectionWriter(self.api_client)
-        cw.start_new_file('test.gz')
-        cw.write(compressed_data_in)
-        gzip_manifest = cw.manifest_text()
-
-        cr = arvados.CollectionReader(gzip_manifest, self.api_client)
-        got = 0
-        for x in list(cr.all_files())[0].readlines():
-            self.assertEqual(x, "abc\n", "decompression returned wrong data: %s" % x)
-            got += 1
-        self.assertEqual(got,
-                         n_lines_in,
-                         "decompression returned %d lines instead of %d" % (got, n_lines_in))
-
     def test_normalized_collection(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
@@ -370,79 +302,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
                                                                        ['c', 5, 0, 1]])
 
-    class MockStreamReader(object):
-        def __init__(self, content):
-            self.content = content
-            self.num_retries = 0
-
-        def readfrom(self, start, size, num_retries=0):
-            return self.content[start:start+size]
-
-    def test_file_stream(self):
-        content = 'abcdefghijklmnopqrstuvwxyz0123456789'
-        msr = self.MockStreamReader(content)
-        segments = [[0, 10, 0],
-                    [10, 15, 10],
-                    [25, 5, 25]]
-
-        sfr = arvados.StreamFileReader(msr, segments, "test")
-
-        self.assertEqual(sfr.name(), "test")
-        self.assertEqual(sfr.size(), 30)
-
-        self.assertEqual(sfr.readfrom(0, 30), content[0:30])
-        self.assertEqual(sfr.readfrom(2, 30), content[2:30])
-
-        self.assertEqual(sfr.readfrom(2, 8), content[2:10])
-        self.assertEqual(sfr.readfrom(0, 10), content[0:10])
-
-        self.assertEqual(sfr.tell(), 0)
-        self.assertEqual(sfr.read(5), content[0:5])
-        self.assertEqual(sfr.tell(), 5)
-        self.assertEqual(sfr.read(5), content[5:10])
-        self.assertEqual(sfr.tell(), 10)
-        self.assertEqual(sfr.read(5), content[10:15])
-        self.assertEqual(sfr.tell(), 15)
-        self.assertEqual(sfr.read(5), content[15:20])
-        self.assertEqual(sfr.tell(), 20)
-        self.assertEqual(sfr.read(5), content[20:25])
-        self.assertEqual(sfr.tell(), 25)
-        self.assertEqual(sfr.read(5), content[25:30])
-        self.assertEqual(sfr.tell(), 30)
-        self.assertEqual(sfr.read(5), '')
-        self.assertEqual(sfr.tell(), 30)
-
-        segments = [[26, 10, 0],
-                    [0, 15, 10],
-                    [15, 5, 25]]
-
-        sfr = arvados.StreamFileReader(msr, segments, "test")
-
-        self.assertEqual(sfr.size(), 30)
-
-        self.assertEqual(sfr.readfrom(0, 30), content[26:36] + content[0:20])
-        self.assertEqual(sfr.readfrom(2, 30), content[28:36] + content[0:20])
-
-        self.assertEqual(sfr.readfrom(2, 8), content[28:36])
-        self.assertEqual(sfr.readfrom(0, 10), content[26:36])
-
-        self.assertEqual(sfr.tell(), 0)
-        self.assertEqual(sfr.read(5), content[26:31])
-        self.assertEqual(sfr.tell(), 5)
-        self.assertEqual(sfr.read(5), content[31:36])
-        self.assertEqual(sfr.tell(), 10)
-        self.assertEqual(sfr.read(5), content[0:5])
-        self.assertEqual(sfr.tell(), 15)
-        self.assertEqual(sfr.read(5), content[5:10])
-        self.assertEqual(sfr.tell(), 20)
-        self.assertEqual(sfr.read(5), content[10:15])
-        self.assertEqual(sfr.tell(), 25)
-        self.assertEqual(sfr.read(5), content[15:20])
-        self.assertEqual(sfr.tell(), 30)
-        self.assertEqual(sfr.read(5), '')
-        self.assertEqual(sfr.tell(), 30)
-
-
     class MockKeep(object):
         def __init__(self, content, num_retries=0):
             self.content = content
@@ -474,30 +333,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         self.assertEqual(sr.readfrom(25, 5), content[25:30])
         self.assertEqual(sr.readfrom(30, 5), '')
 
-    def test_file_reader(self):
-        keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
-                      'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
-                      'cccccccccccccccccccccccccccccccc+5': 'z0123'}
-        mk = self.MockKeep(keepblocks)
-
-        sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:10:foo", "15:10:foo"], mk)
-
-        content = 'abcdefghijpqrstuvwxy'
-
-        f = sr.files()["foo"]
-
-        # f.read() calls will be aligned on block boundaries (as a
-        # result of ticket #3663).
-
-        f.seek(0)
-        self.assertEqual(f.read(20), content[0:10])
-
-        f.seek(0)
-        self.assertEqual(f.read(6), content[0:6])
-        self.assertEqual(f.read(6), content[6:10])
-        self.assertEqual(f.read(6), content[10:16])
-        self.assertEqual(f.read(6), content[16:20])
-
     def test_extract_file(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
@@ -808,6 +643,44 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
                                           api_client=client)
         self.assertEqual('', reader.manifest_text())
 
+    def check_open_file(self, coll_file, stream_name, file_name, file_size):
+        self.assertFalse(coll_file.closed, "returned file is not open")
+        self.assertEqual(stream_name, coll_file.stream_name())
+        self.assertEqual(file_name, coll_file.name())
+        self.assertEqual(file_size, coll_file.size())
+
+    def test_open_collection_file_one_argument(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        cfile = reader.open('./foo')
+        self.check_open_file(cfile, '.', 'foo', 3)
+
+    def test_open_collection_file_two_arguments(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        cfile = reader.open('.', 'foo')
+        self.check_open_file(cfile, '.', 'foo', 3)
+
+    def test_open_deep_file(self):
+        coll_name = 'collection_with_files_in_subdir'
+        client = self.api_client_mock(200)
+        self.mock_get_collection(client, 200, coll_name)
+        reader = arvados.CollectionReader(
+            self.API_COLLECTIONS[coll_name]['uuid'], api_client=client)
+        cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt')
+        self.check_open_file(cfile, './subdir2/subdir3', 'file2_in_subdir3.txt',
+                             32)
+
+    def test_open_nonexistent_stream(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        self.assertRaises(ValueError, reader.open, './nonexistent', 'foo')
+
+    def test_open_nonexistent_file(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        self.assertRaises(ValueError, reader.open, '.', 'nonexistent')
+
 
 @tutil.skip_sleep
 class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
@@ -846,6 +719,78 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             writer.flush_data()
         self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
 
+    def test_one_open(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('out') as out_file:
+            self.assertEqual('.', writer.current_stream_name())
+            self.assertEqual('out', writer.current_file_name())
+            out_file.write('test data')
+            data_loc = hashlib.md5('test data').hexdigest() + '+9'
+        self.assertTrue(out_file.closed, "writer file not closed after context")
+        self.assertRaises(ValueError, out_file.write, 'extra text')
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:9:out\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_open_writelines(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('six') as out_file:
+            out_file.writelines(['12', '34', '56'])
+            data_loc = hashlib.md5('123456').hexdigest() + '+6'
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:6:six\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_open_flush(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('flush_test') as out_file:
+            out_file.write('flush1')
+            data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
+            with self.mock_keep(data_loc1, 200) as keep_mock:
+                out_file.flush()
+            out_file.write('flush2')
+            data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+        with self.mock_keep(data_loc2, 200) as keep_mock:
+            self.assertEqual(". {} {} 0:12:flush_test\n".format(data_loc1,
+                                                                data_loc2),
+                             writer.manifest_text())
+
+    def test_two_opens_same_stream(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('.', '1') as out_file:
+            out_file.write('1st')
+        with writer.open('.', '2') as out_file:
+            out_file.write('2nd')
+        data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+        with self.mock_keep(data_loc, 200) as keep_mock:
+            self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
+                             writer.manifest_text())
+
+    def test_two_opens_two_streams(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        with writer.open('file') as out_file:
+            out_file.write('file')
+            data_loc1 = hashlib.md5('file').hexdigest() + '+4'
+        with self.mock_keep(data_loc1, 200) as keep_mock:
+            with writer.open('./dir', 'indir') as out_file:
+                out_file.write('indir')
+                data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+        with self.mock_keep(data_loc2, 200) as keep_mock:
+            expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
+                data_loc1, data_loc2)
+            self.assertEqual(expected, writer.manifest_text())
+
+    def test_dup_open_fails(self):
+        client = self.api_client_mock()
+        writer = arvados.CollectionWriter(client)
+        file1 = writer.open('one')
+        self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index db26aee..08a3d28 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,6 +1,10 @@
 #!/usr/bin/env python
 
+import bz2
+import gzip
+import io
 import mock
+import os
 import unittest
 
 import arvados
@@ -9,6 +13,162 @@ from arvados import StreamReader, StreamFileReader
 import arvados_testutil as tutil
 import run_test_server
 
+class StreamFileReaderTestCase(unittest.TestCase):
+    def make_count_reader(self):
+        stream = tutil.MockStreamReader('.', '01234', '34567', '67890')
+        return StreamFileReader(stream, [[1, 3, 0], [6, 3, 3], [11, 3, 6]],
+                                'count.txt')
+
+    def test_read_returns_first_block(self):
+        # read() calls will be aligned on block boundaries - see #3663.
+        sfile = self.make_count_reader()
+        self.assertEqual('123', sfile.read(10))
+
+    def test_small_read(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('12', sfile.read(2))
+
+    def test_successive_reads(self):
+        sfile = self.make_count_reader()
+        for expect in ['123', '456', '789', '']:
+            self.assertEqual(expect, sfile.read(10))
+
+    def test_readfrom_spans_blocks(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('6789', sfile.readfrom(5, 12))
+
+    def test_small_readfrom_spanning_blocks(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('2345', sfile.readfrom(1, 4))
+
+    def test_readall(self):
+        sfile = self.make_count_reader()
+        self.assertEqual('123456789', ''.join(sfile.readall()))
+
+    def test_one_arg_seek(self):
+        self.test_relative_seek([])
+
+    def test_absolute_seek(self, args=[os.SEEK_SET]):
+        sfile = self.make_count_reader()
+        sfile.seek(6, *args)
+        self.assertEqual('78', sfile.read(2))
+        sfile.seek(4, *args)
+        self.assertEqual('56', sfile.read(2))
+
+    def test_relative_seek(self, args=[os.SEEK_CUR]):
+        sfile = self.make_count_reader()
+        self.assertEqual('12', sfile.read(2))
+        sfile.seek(2, *args)
+        self.assertEqual('56', sfile.read(2))
+
+    def test_end_seek(self):
+        sfile = self.make_count_reader()
+        sfile.seek(-6, os.SEEK_END)
+        self.assertEqual('45', sfile.read(2))
+
+    def test_seek_min_zero(self):
+        sfile = self.make_count_reader()
+        sfile.seek(-2, os.SEEK_SET)
+        self.assertEqual(0, sfile.tell())
+
+    def test_seek_max_size(self):
+        sfile = self.make_count_reader()
+        sfile.seek(2, os.SEEK_END)
+        self.assertEqual(9, sfile.tell())
+
+    def test_size(self):
+        self.assertEqual(9, self.make_count_reader().size())
+
+    def test_tell_after_block_read(self):
+        sfile = self.make_count_reader()
+        sfile.read(5)
+        self.assertEqual(3, sfile.tell())
+
+    def test_tell_after_small_read(self):
+        sfile = self.make_count_reader()
+        sfile.read(1)
+        self.assertEqual(1, sfile.tell())
+
+    def test_no_read_after_close(self):
+        sfile = self.make_count_reader()
+        sfile.close()
+        self.assertRaises(ValueError, sfile.read, 2)
+
+    def test_context(self):
+        with self.make_count_reader() as sfile:
+            self.assertFalse(sfile.closed, "reader is closed inside context")
+            self.assertEqual('12', sfile.read(2))
+        self.assertTrue(sfile.closed, "reader is open after context")
+
+    def make_newlines_reader(self):
+        stream = tutil.MockStreamReader('.', 'one\ntwo\n\nth', 'ree\nfour\n\n')
+        return StreamFileReader(stream, [[0, 11, 0], [11, 10, 11]], 'count.txt')
+
+    def check_lines(self, actual):
+        self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'],
+                         actual)
+
+    def test_readline(self):
+        reader = self.make_newlines_reader()
+        actual = []
+        while True:
+            data = reader.readline()
+            if not data:
+                break
+            actual.append(data)
+        self.check_lines(actual)
+
+    def test_readlines(self):
+        self.check_lines(self.make_newlines_reader().readlines())
+
+    def test_iteration(self):
+        self.check_lines(list(iter(self.make_newlines_reader())))
+
+    def test_readline_size(self):
+        reader = self.make_newlines_reader()
+        self.assertEqual('on', reader.readline(2))
+        self.assertEqual('e\n', reader.readline(4))
+        self.assertEqual('two\n', reader.readline(6))
+        self.assertEqual('\n', reader.readline(8))
+        self.assertEqual('thre', reader.readline(4))
+
+    def test_readlines_sizehint(self):
+        result = self.make_newlines_reader().readlines(8)
+        self.assertEqual(['one\n', 'two\n'], result[:2])
+        self.assertNotIn('three\n', result)
+
+    def test_name_attribute(self):
+        # Test both .name and .name() (for backward compatibility)
+        stream = tutil.MockStreamReader()
+        sfile = StreamFileReader(stream, [[0, 0, 0]], 'nametest')
+        self.assertEqual('nametest', sfile.name)
+        self.assertEqual('nametest', sfile.name())
+
+    def check_decompression(self, compress_ext, compress_func):
+        test_text = 'decompression\ntest\n'
+        test_data = compress_func(test_text)
+        stream = tutil.MockStreamReader('.', test_data)
+        reader = StreamFileReader(stream, [[0, len(test_data), 0]],
+                                  'test.' + compress_ext)
+        self.assertEqual(test_text, ''.join(reader.readall_decompressed()))
+
+    @staticmethod
+    def gzip_compress(data):
+        compressed_data = io.BytesIO()
+        with gzip.GzipFile(fileobj=compressed_data, mode='wb') as gzip_file:
+            gzip_file.write(data)
+        return compressed_data.getvalue()
+
+    def test_no_decompression(self):
+        self.check_decompression('log', lambda s: s)
+
+    def test_gzip_decompression(self):
+        self.check_decompression('gz', self.gzip_compress)
+
+    def test_bz2_decompression(self):
+        self.check_decompression('bz2', bz2.compress)
+
+
 class StreamRetryTestMixin(object):
     # Define reader_for(coll_name, **kwargs)
     # and read_for_test(reader, size, **kwargs).

commit 7924077e3db2898de26c86599d9f311e02d6db46
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 12 09:38:26 2014 -0500

    3603: Clean up PySDK imports.
    
    Sort; remove unused imports.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a88f36d..59dc49f 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1,20 +1,6 @@
-import gflags
 import logging
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
 import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
 
 from collections import deque
 from stat import *
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 9b90cea..8f787b7 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -1,20 +1,9 @@
-import gflags
+import bz2
+import collections
+import hashlib
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
 import re
-import hashlib
-import string
-import bz2
 import zlib
-import fcntl
-import time
-import threading
-import collections
 
 from arvados.retry import retry_method
 from keep import *
@@ -193,7 +182,7 @@ class StreamFileReader(object):
             data += newdata
             sol = 0
             while True:
-                eol = string.find(data, "\n", sol)
+                eol = data.find("\n", sol)
                 if eol < 0:
                     break
                 yield data[sol:eol+1]

commit c1d9150de2977f7eb85e7cb058d5c41ae6f06173
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Oct 24 11:23:09 2014 -0400

    3603: Fix context methods for PySDK Collection objects.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index e6ab424..a88f36d 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -69,9 +69,9 @@ def normalize_stream(s, stream):
 
 class CollectionBase(object):
     def __enter__(self):
-        pass
+        return self
 
-    def __exit__(self):
+    def __exit__(self, exc_type, exc_value, traceback):
         pass
 
     def _my_keep(self):
@@ -288,8 +288,9 @@ class CollectionWriter(CollectionBase):
         self._queued_dirents = deque()
         self._queued_trees = deque()
 
-    def __exit__(self):
-        self.finish()
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type is None:
+            self.finish()
 
     def do_queued_work(self):
         # The work queue consists of three pieces:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list