[ARVADOS] created: 038b1bc4fe952b0bf95b566542973e55c4a6ecc7
git at public.curoverse.com
git at public.curoverse.com
Tue Nov 4 09:09:31 EST 2014
at 038b1bc4fe952b0bf95b566542973e55c4a6ecc7 (commit)
commit 038b1bc4fe952b0bf95b566542973e55c4a6ecc7
Author: Brett Smith <brett at curoverse.com>
Date: Tue Nov 4 09:09:16 2014 -0500
3603: PySDK Collection objects support file-like APIs.
This commit adds an open() method to CollectionReader and
CollectionWriter. They mimic the built-in open(), returning objects
that implement as much as the Python file API as I can reasonably
manage.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
new file mode 100644
index 0000000..ab695e9
--- /dev/null
+++ b/sdk/python/arvados/arvfile.py
@@ -0,0 +1,32 @@
+import functools
+
+class ArvadosFileBase(object):
+ def __init__(self, name, mode):
+ self.name = name
+ self.mode = mode
+ self.closed = False
+
+ @staticmethod
+ def _before_close(orig_func):
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ if self.closed:
+ raise ValueError("I/O operation on closed stream file")
+ return orig_func(self, *args, **kwargs)
+ return wrapper
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ try:
+ self.close()
+ except Exception:
+ if exc_type is None:
+ raise
+
+ def close(self):
+ self.closed = True
+
+ def isatty(self):
+ return False
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f7e2bf0..9755555 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -5,8 +5,9 @@ import re
from collections import deque
from stat import *
+from .arvfile import ArvadosFileBase
from keep import *
-from stream import *
+from .stream import StreamReader, split
import config
import errors
import util
@@ -196,10 +197,7 @@ class CollectionReader(CollectionBase):
streams = {}
for s in self.all_streams():
for f in s.all_files():
- filestream = s.name() + "/" + f.name()
- r = filestream.rindex("/")
- streamname = filestream[:r]
- filename = filestream[r+1:]
+ streamname, filename = split(s.name() + "/" + f.name())
if streamname not in streams:
streams[streamname] = {}
if filename not in streams[streamname]:
@@ -215,6 +213,31 @@ class CollectionReader(CollectionBase):
[StreamReader(stream, keep=self._my_keep()).manifest_text()
for stream in self._streams])
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to read from the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object to read that file.
+ """
+ self._populate()
+ if filename is None:
+ streampath, filename = split(streampath)
+ keep_client = self._my_keep()
+ for stream_s in self._streams:
+ stream = StreamReader(stream_s, keep_client,
+ num_retries=self.num_retries)
+ if stream.name() == streampath:
+ break
+ else:
+ raise ValueError("stream '{}' not found in Collection".
+ format(streampath))
+ try:
+ return stream.files()[filename]
+ except KeyError:
+ raise ValueError("file '{}' not found in Collection stream '{}'".
+ format(filename, streampath))
+
def all_streams(self):
self._populate()
return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
@@ -237,6 +260,25 @@ class CollectionReader(CollectionBase):
return self._manifest_text
+class _WriterFile(ArvadosFileBase):
+ def __init__(self, coll_writer, name):
+ super(_WriterFile, self).__init__(name, 'wb')
+ self.dest = coll_writer
+
+ def close(self):
+ super(_WriterFile, self).close()
+ self.dest.finish_current_file()
+
+ @ArvadosFileBase._before_close
+ def write(self, data):
+ self.dest.write(data)
+
+ @ArvadosFileBase._before_close
+ def writelines(self, seq):
+ for data in seq:
+ self.write(data)
+
+
class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
@@ -273,6 +315,7 @@ class CollectionWriter(CollectionBase):
self._queued_file = None
self._queued_dirents = deque()
self._queued_trees = deque()
+ self._last_open = None
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
@@ -379,6 +422,35 @@ class CollectionWriter(CollectionBase):
while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
self.flush_data()
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to write to the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object you can write to add it to the
+ Collection.
+
+ You may only have one file object from the Collection open at a time,
+ so be sure to close the object when you're done. Using the object in
+ a with statement makes that easy::
+
+ with cwriter.open('./doc/page1.txt') as outfile:
+ outfile.write(page1_data)
+ with cwriter.open('./doc/page2.txt') as outfile:
+ outfile.write(page2_data)
+ """
+ if filename is None:
+ streampath, filename = split(streampath)
+ if self._last_open and not self._last_open.closed:
+ raise errors.AssertionError(
+ "can't open '{}' when '{}' is still open".format(
+ filename, self._last_open.name))
+ if streampath != self.current_stream_name():
+ self.start_new_stream(streampath)
+ self.set_current_file_name(filename)
+ self._last_open = _WriterFile(self, filename)
+ return self._last_open
+
def flush_data(self):
data_buffer = ''.join(self._data_buffer)
if data_buffer:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 8f787b7..6a81634 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -5,6 +5,7 @@ import os
import re
import zlib
+from .arvfile import ArvadosFileBase
from arvados.retry import retry_method
from keep import *
import config
@@ -89,25 +90,53 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
i += 1
return resp
+def split(path):
+ """split(path) -> streamname, filename
+
+ Separate the stream name and file name in a /-separated stream path.
+ If no stream name is available, assume '.'.
+ """
+ try:
+ stream_name, file_name = path.rsplit('/', 1)
+ except ValueError: # No / in string
+ stream_name, file_name = '.', path
+ return stream_name, file_name
+
+class StreamFileReader(ArvadosFileBase):
+ class _NameAttribute(str):
+ # The Python file API provides a plain .name attribute.
+ # Older SDK provided a name() method.
+ # This class provides both, for maximum compatibility.
+ def __call__(self):
+ return self
+
-class StreamFileReader(object):
def __init__(self, stream, segments, name):
+ super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
self._stream = stream
self.segments = segments
- self._name = name
self._filepos = 0L
self.num_retries = stream.num_retries
+ self._readline_cache = (-1, None, None)
- def name(self):
- return self._name
+ def __iter__(self):
+ # If we've already started reading the file, don't decompress;
+ # that implicitly seeks to the beginning, which we don't want here.
+ return self.readlines(decompress=self.tell() == 0)
def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self._name)
+ return re.sub('\.(bz2|gz)$', '', self.name)
def stream_name(self):
return self._stream.name()
- def seek(self, pos):
+ @ArvadosFileBase._before_close
+ def seek(self, pos, rel=os.SEEK_SET):
+ """Note that the default is SEEK_SET, not Python's usual SEEK_CUR."""
+ if rel == os.SEEK_CUR:
+ pos += self._filepos
+ elif rel == os.SEEK_END:
+ pos += self.size()
self._filepos = min(max(pos, 0L), self.size())
def tell(self):
@@ -117,6 +146,7 @@ class StreamFileReader(object):
n = self.segments[-1]
return n[OFFSET] + n[BLOCKSIZE]
+ @ArvadosFileBase._before_close
@retry_method
def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
@@ -133,6 +163,7 @@ class StreamFileReader(object):
self._filepos += len(data)
return data
+ @ArvadosFileBase._before_close
@retry_method
def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
@@ -145,6 +176,7 @@ class StreamFileReader(object):
num_retries=num_retries))
return ''.join(data)
+ @ArvadosFileBase._before_close
@retry_method
def readall(self, size=2**20, num_retries=None):
while True:
@@ -153,42 +185,63 @@ class StreamFileReader(object):
break
yield data
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readline(self, read_iter=None, num_retries=None):
+ cache_pos, cache_iter, cache_data = self._readline_cache
+ if (((read_iter is None) or (read_iter is cache_iter)) and
+ (self.tell() == cache_pos)):
+ read_iter = cache_iter
+ data = [cache_data]
+ else:
+ if read_iter is None:
+ read_iter = self.readall_decompressed(num_retries=num_retries)
+ data = ['']
+ while '\n' not in data[-1]:
+ try:
+ data.append(next(read_iter))
+ except StopIteration:
+ break
+ data = ''.join(data)
+ try:
+ nextline_index = data.index('\n') + 1
+ except ValueError:
+ nextline_index = len(data)
+ self._readline_cache = (self.tell(), read_iter, data[nextline_index:])
+ return data[:nextline_index]
+
+ @ArvadosFileBase._before_close
@retry_method
def decompress(self, decompress, size, num_retries=None):
for segment in self.readall(size, num_retries):
data = decompress(segment)
- if data and data != '':
+ if data:
yield data
+ @ArvadosFileBase._before_close
@retry_method
def readall_decompressed(self, size=2**20, num_retries=None):
self.seek(0)
- if re.search('\.bz2$', self._name):
+ if self.name.endswith('.bz2'):
dc = bz2.BZ2Decompressor()
return self.decompress(dc.decompress, size,
num_retries=num_retries)
- elif re.search('\.gz$', self._name):
+ elif self.name.endswith('.gz'):
dc = zlib.decompressobj(16+zlib.MAX_WBITS)
return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
size, num_retries=num_retries)
else:
return self.readall(size, num_retries=num_retries)
+ @ArvadosFileBase._before_close
@retry_method
def readlines(self, decompress=True, num_retries=None):
read_func = self.readall_decompressed if decompress else self.readall
- data = ''
- for newdata in read_func(num_retries=num_retries):
- data += newdata
- sol = 0
- while True:
- eol = data.find("\n", sol)
- if eol < 0:
- break
- yield data[sol:eol+1]
- sol = eol+1
- data = data[sol:]
- if data != '':
+ read_iter = read_func(num_retries=num_retries)
+ while True:
+ data = self.readline(read_iter, num_retries=num_retries)
+ if not data:
+ break
yield data
def as_manifest(self):
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 0dbf9bc..9655f25 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
import errno
+import hashlib
import httplib
import httplib2
import mock
@@ -24,6 +25,21 @@ def mock_responses(body, *codes, **headers):
return mock.patch('httplib2.Http.request', side_effect=(
(fake_httplib2_response(code, **headers), body) for code in codes))
+class MockStreamReader(object):
+ def __init__(self, name='.', *data):
+ self._name = name
+ self._data = ''.join(data)
+ self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
+ len(d)) for d in data]
+ self.num_retries = 0
+
+ def name(self):
+ return self._name
+
+ def readfrom(self, start, size, num_retries=None):
+ return self._data[start:start + size]
+
+
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index c4c7ca2..cbbd0d5 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -370,79 +370,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
['c', 5, 0, 1]])
- class MockStreamReader(object):
- def __init__(self, content):
- self.content = content
- self.num_retries = 0
-
- def readfrom(self, start, size, num_retries=0):
- return self.content[start:start+size]
-
- def test_file_stream(self):
- content = 'abcdefghijklmnopqrstuvwxyz0123456789'
- msr = self.MockStreamReader(content)
- segments = [[0, 10, 0],
- [10, 15, 10],
- [25, 5, 25]]
-
- sfr = arvados.StreamFileReader(msr, segments, "test")
-
- self.assertEqual(sfr.name(), "test")
- self.assertEqual(sfr.size(), 30)
-
- self.assertEqual(sfr.readfrom(0, 30), content[0:30])
- self.assertEqual(sfr.readfrom(2, 30), content[2:30])
-
- self.assertEqual(sfr.readfrom(2, 8), content[2:10])
- self.assertEqual(sfr.readfrom(0, 10), content[0:10])
-
- self.assertEqual(sfr.tell(), 0)
- self.assertEqual(sfr.read(5), content[0:5])
- self.assertEqual(sfr.tell(), 5)
- self.assertEqual(sfr.read(5), content[5:10])
- self.assertEqual(sfr.tell(), 10)
- self.assertEqual(sfr.read(5), content[10:15])
- self.assertEqual(sfr.tell(), 15)
- self.assertEqual(sfr.read(5), content[15:20])
- self.assertEqual(sfr.tell(), 20)
- self.assertEqual(sfr.read(5), content[20:25])
- self.assertEqual(sfr.tell(), 25)
- self.assertEqual(sfr.read(5), content[25:30])
- self.assertEqual(sfr.tell(), 30)
- self.assertEqual(sfr.read(5), '')
- self.assertEqual(sfr.tell(), 30)
-
- segments = [[26, 10, 0],
- [0, 15, 10],
- [15, 5, 25]]
-
- sfr = arvados.StreamFileReader(msr, segments, "test")
-
- self.assertEqual(sfr.size(), 30)
-
- self.assertEqual(sfr.readfrom(0, 30), content[26:36] + content[0:20])
- self.assertEqual(sfr.readfrom(2, 30), content[28:36] + content[0:20])
-
- self.assertEqual(sfr.readfrom(2, 8), content[28:36])
- self.assertEqual(sfr.readfrom(0, 10), content[26:36])
-
- self.assertEqual(sfr.tell(), 0)
- self.assertEqual(sfr.read(5), content[26:31])
- self.assertEqual(sfr.tell(), 5)
- self.assertEqual(sfr.read(5), content[31:36])
- self.assertEqual(sfr.tell(), 10)
- self.assertEqual(sfr.read(5), content[0:5])
- self.assertEqual(sfr.tell(), 15)
- self.assertEqual(sfr.read(5), content[5:10])
- self.assertEqual(sfr.tell(), 20)
- self.assertEqual(sfr.read(5), content[10:15])
- self.assertEqual(sfr.tell(), 25)
- self.assertEqual(sfr.read(5), content[15:20])
- self.assertEqual(sfr.tell(), 30)
- self.assertEqual(sfr.read(5), '')
- self.assertEqual(sfr.tell(), 30)
-
-
class MockKeep(object):
def __init__(self, content, num_retries=0):
self.content = content
@@ -474,30 +401,6 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
self.assertEqual(sr.readfrom(25, 5), content[25:30])
self.assertEqual(sr.readfrom(30, 5), '')
- def test_file_reader(self):
- keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
- 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
- 'cccccccccccccccccccccccccccccccc+5': 'z0123'}
- mk = self.MockKeep(keepblocks)
-
- sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:10:foo", "15:10:foo"], mk)
-
- content = 'abcdefghijpqrstuvwxy'
-
- f = sr.files()["foo"]
-
- # f.read() calls will be aligned on block boundaries (as a
- # result of ticket #3663).
-
- f.seek(0)
- self.assertEqual(f.read(20), content[0:10])
-
- f.seek(0)
- self.assertEqual(f.read(6), content[0:6])
- self.assertEqual(f.read(6), content[6:10])
- self.assertEqual(f.read(6), content[10:16])
- self.assertEqual(f.read(6), content[16:20])
-
def test_extract_file(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
@@ -801,6 +704,44 @@ class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
[[f.size(), f.stream_name(), f.name()]
for f in reader.all_streams()[0].all_files()])
+ def check_open_file(self, coll_file, stream_name, file_name, file_size):
+ self.assertFalse(coll_file.closed, "returned file is not open")
+ self.assertEqual(stream_name, coll_file.stream_name())
+ self.assertEqual(file_name, coll_file.name())
+ self.assertEqual(file_size, coll_file.size())
+
+ def test_open_collection_file_one_argument(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ cfile = reader.open('./foo')
+ self.check_open_file(cfile, '.', 'foo', 3)
+
+ def test_open_collection_file_two_arguments(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ cfile = reader.open('.', 'foo')
+ self.check_open_file(cfile, '.', 'foo', 3)
+
+ def test_open_deep_file(self):
+ coll_name = 'collection_with_files_in_subdir'
+ client = self.api_client_mock(200)
+ self.mock_get_collection(client, 200, coll_name)
+ reader = arvados.CollectionReader(
+ self.API_COLLECTIONS[coll_name]['uuid'], api_client=client)
+ cfile = reader.open('./subdir2/subdir3/file2_in_subdir3.txt')
+ self.check_open_file(cfile, './subdir2/subdir3', 'file2_in_subdir3.txt',
+ 32)
+
+ def test_open_nonexistent_stream(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ self.assertRaises(ValueError, reader.open, './nonexistent', 'foo')
+
+ def test_open_nonexistent_file(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ self.assertRaises(ValueError, reader.open, '.', 'nonexistent')
+
@tutil.skip_sleep
class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
@@ -839,6 +780,63 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
writer.flush_data()
self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
+ def test_one_open(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('out') as out_file:
+ self.assertEqual('.', writer.current_stream_name())
+ self.assertEqual('out', writer.current_file_name())
+ out_file.write('test data')
+ data_loc = hashlib.md5('test data').hexdigest() + '+9'
+ self.assertTrue(out_file.closed, "writer file not closed after context")
+ self.assertRaises(ValueError, out_file.write, 'extra text')
+ with self.mock_keep(data_loc, 200) as keep_mock:
+ self.assertEqual(". {} 0:9:out\n".format(data_loc),
+ writer.manifest_text())
+
+ def test_open_writelines(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('six') as out_file:
+ out_file.writelines(['12', '34', '56'])
+ data_loc = hashlib.md5('123456').hexdigest() + '+6'
+ with self.mock_keep(data_loc, 200) as keep_mock:
+ self.assertEqual(". {} 0:6:six\n".format(data_loc),
+ writer.manifest_text())
+
+ def test_two_opens_same_stream(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('.', '1') as out_file:
+ out_file.write('1st')
+ with writer.open('.', '2') as out_file:
+ out_file.write('2nd')
+ data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+ with self.mock_keep(data_loc, 200) as keep_mock:
+ self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
+ writer.manifest_text())
+
+ def test_two_opens_two_streams(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ with writer.open('file') as out_file:
+ out_file.write('file')
+ data_loc1 = hashlib.md5('file').hexdigest() + '+4'
+ with self.mock_keep(data_loc1, 200) as keep_mock:
+ with writer.open('./dir', 'indir') as out_file:
+ out_file.write('indir')
+ data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+ with self.mock_keep(data_loc2, 200) as keep_mock:
+ expected = ". {} 0:4:file\n./dir {} 0:5:indir\n".format(
+ data_loc1, data_loc2)
+ self.assertEqual(expected, writer.manifest_text())
+
+ def test_dup_open_fails(self):
+ client = self.api_client_mock()
+ writer = arvados.CollectionWriter(client)
+ file1 = writer.open('one')
+ self.assertRaises(arvados.errors.AssertionError, writer.open, 'two')
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 3970d67..e272845 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+import os
import mock
import unittest
@@ -9,6 +10,126 @@ from arvados import StreamReader, StreamFileReader
import arvados_testutil as tutil
import run_test_server
+class StreamFileReaderTestCase(unittest.TestCase):
+ def make_count_reader(self):
+ stream = tutil.MockStreamReader('.', '01234', '34567', '67890')
+ return StreamFileReader(stream, [[1, 3, 0], [6, 3, 3], [11, 3, 6]],
+ 'count.txt')
+
+ def test_read_returns_first_block(self):
+ # read() calls will be aligned on block boundaries - see #3663.
+ sfile = self.make_count_reader()
+ self.assertEqual('123', sfile.read(10))
+
+ def test_small_read(self):
+ sfile = self.make_count_reader()
+ self.assertEqual('12', sfile.read(2))
+
+ def test_successive_reads(self):
+ sfile = self.make_count_reader()
+ for expect in ['123', '456', '789', '']:
+ self.assertEqual(expect, sfile.read(10))
+
+ def test_readfrom_spans_blocks(self):
+ sfile = self.make_count_reader()
+ self.assertEqual('6789', sfile.readfrom(5, 12))
+
+ def test_small_readfrom_spanning_blocks(self):
+ sfile = self.make_count_reader()
+ self.assertEqual('2345', sfile.readfrom(1, 4))
+
+ def test_readall(self):
+ sfile = self.make_count_reader()
+ self.assertEqual('123456789', ''.join(sfile.readall()))
+
+ def test_one_arg_seek(self):
+ # Our default has been SEEK_SET since time immemorial.
+ self.test_absolute_seek([])
+
+ def test_absolute_seek(self, args=[os.SEEK_SET]):
+ sfile = self.make_count_reader()
+ sfile.seek(6, *args)
+ self.assertEqual('78', sfile.read(2))
+ sfile.seek(4, *args)
+ self.assertEqual('56', sfile.read(2))
+
+ def test_relative_seek(self):
+ sfile = self.make_count_reader()
+ self.assertEqual('12', sfile.read(2))
+ sfile.seek(2, os.SEEK_CUR)
+ self.assertEqual('56', sfile.read(2))
+
+ def test_end_seek(self):
+ sfile = self.make_count_reader()
+ sfile.seek(-6, os.SEEK_END)
+ self.assertEqual('45', sfile.read(2))
+
+ def test_seek_min_zero(self):
+ sfile = self.make_count_reader()
+ sfile.seek(-2, os.SEEK_SET)
+ self.assertEqual(0, sfile.tell())
+
+ def test_seek_max_size(self):
+ sfile = self.make_count_reader()
+ sfile.seek(2, os.SEEK_END)
+ self.assertEqual(9, sfile.tell())
+
+ def test_size(self):
+ self.assertEqual(9, self.make_count_reader().size())
+
+ def test_tell_after_block_read(self):
+ sfile = self.make_count_reader()
+ sfile.read(5)
+ self.assertEqual(3, sfile.tell())
+
+ def test_tell_after_small_read(self):
+ sfile = self.make_count_reader()
+ sfile.read(1)
+ self.assertEqual(1, sfile.tell())
+
+ def test_no_read_after_close(self):
+ sfile = self.make_count_reader()
+ sfile.close()
+ self.assertRaises(ValueError, sfile.read, 2)
+
+ def test_context(self):
+ with self.make_count_reader() as sfile:
+ self.assertFalse(sfile.closed, "reader is closed inside context")
+ self.assertEqual('12', sfile.read(2))
+ self.assertTrue(sfile.closed, "reader is open after context")
+
+ def make_newlines_reader(self):
+ stream = tutil.MockStreamReader('.', 'one\ntwo\n\nth', 'ree\nfour\n\n')
+ return StreamFileReader(stream, [[0, 11, 0], [11, 10, 11]], 'count.txt')
+
+ def check_lines(self, actual):
+ self.assertEqual(['one\n', 'two\n', '\n', 'three\n', 'four\n', '\n'],
+ actual)
+
+ def test_readline(self):
+ reader = self.make_newlines_reader()
+ actual = []
+ while True:
+ data = reader.readline()
+ if not data:
+ break
+ actual.append(data)
+ self.check_lines(actual)
+
+ def test_readlines(self):
+ self.check_lines(list(self.make_newlines_reader().readlines()))
+
+ def test_iteration(self):
+ self.check_lines(list(iter(self.make_newlines_reader())))
+
+ def test_name_attribute(self):
+ # Test both .name and .name() (for backward compatibility)
+ stream = tutil.MockStreamReader()
+ sfile = StreamFileReader(stream, [[0, 0, 0]], 'nametest')
+ self.assertEqual('nametest', sfile.name)
+ self.assertEqual('nametest', sfile.name())
+
+
class StreamRetryTestMixin(object):
# Define reader_for(coll_name, **kwargs)
# and read_for_test(reader, size, **kwargs).
commit 9166af7ca932880e7577501b8a4a3a8c14b45640
Author: Brett Smith <brett at curoverse.com>
Date: Fri Oct 24 11:16:05 2014 -0400
3603: Clean up PySDK imports.
Sort; remove unused imports.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index bf09285..f7e2bf0 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1,22 +1,6 @@
-import gflags
-import httplib
-import httplib2
import logging
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
from collections import deque
from stat import *
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index f5975b9..8f787b7 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -1,22 +1,9 @@
-import gflags
-import httplib
-import httplib2
+import bz2
+import collections
+import hashlib
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
-import string
-import bz2
import zlib
-import fcntl
-import time
-import threading
-import collections
from arvados.retry import retry_method
from keep import *
@@ -195,7 +182,7 @@ class StreamFileReader(object):
data += newdata
sol = 0
while True:
- eol = string.find(data, "\n", sol)
+ eol = data.find("\n", sol)
if eol < 0:
break
yield data[sol:eol+1]
commit 52a2fc66619f3796853304e300d3d9df5f27fa32
Author: Brett Smith <brett at curoverse.com>
Date: Fri Oct 24 11:23:09 2014 -0400
3603: Fix context methods for PySDK Collection objects.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 3a90d6d..bf09285 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -71,9 +71,9 @@ def normalize_stream(s, stream):
class CollectionBase(object):
def __enter__(self):
- pass
+ return self
- def __exit__(self):
+ def __exit__(self, exc_type, exc_value, traceback):
pass
def _my_keep(self):
@@ -290,8 +290,9 @@ class CollectionWriter(CollectionBase):
self._queued_dirents = deque()
self._queued_trees = deque()
- def __exit__(self):
- self.finish()
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is None:
+ self.finish()
def do_queued_work(self):
# The work queue consists of three pieces:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list