[ARVADOS] updated: 78700dbcc4f5d34f0a4cfee5c040e716d684ed62
git at public.curoverse.com
git at public.curoverse.com
Tue Dec 16 13:50:27 EST 2014
Summary of changes:
sdk/python/arvados/commands/put.py | 4 +-
sdk/python/arvados/stream.py | 90 +++++++++++++++++++++++++++++++-------
sdk/python/tests/test_stream.py | 24 ++++++++--
3 files changed, 95 insertions(+), 23 deletions(-)
via 78700dbcc4f5d34f0a4cfee5c040e716d684ed62 (commit)
from 5bf8784175517327b334fb4f7c8d3b8ee505353d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 78700dbcc4f5d34f0a4cfee5c040e716d684ed62
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Dec 16 13:51:06 2014 -0500
3198: Support for truncating files. A few tests. Next step work on random access writing.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index d070a8b..95ba172 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -295,12 +295,12 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
def flush_data(self):
start_buffer_len = self._data_buffer_len
- start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
+ start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
super(ArvPutCollectionWriter, self).flush_data()
if self._data_buffer_len < start_buffer_len: # We actually PUT data.
self.bytes_written += (start_buffer_len - self._data_buffer_len)
self.report_progress()
- if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+ if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
self.cache_state()
def _record_new_input(self, input_type, source_name, dest_name):
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index f57e303..55718d9 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -168,7 +168,7 @@ class StreamFileReader(ArvadosFileBase):
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
- data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+ data = self._stream._readfrom(locator+segmentoffset, segmentsize,
num_retries=num_retries)
self._filepos += len(data)
@@ -183,7 +183,7 @@ class StreamFileReader(ArvadosFileBase):
data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+ data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
num_retries=num_retries))
return ''.join(data)
@@ -295,8 +295,8 @@ class StreamReader(object):
if name not in self._files:
self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
else:
- n = self._files[name]
- n.segments.append([pos, size, n.size()])
+ filereader = self._files[name]
+ filereader.segments.append([pos, size, filereader.size()])
continue
raise errors.SyntaxError("Invalid manifest format")
@@ -318,10 +318,13 @@ class StreamReader(object):
return locators_and_ranges(self._data_locators, range_start, range_size)
def _keepget(self, locator, num_retries=None):
- self._keep.get(locator, num_retries=num_retries)
+ return self._keep.get(locator, num_retries=num_retries)
@retry_method
def readfrom(self, start, size, num_retries=None):
+ self._readfrom(start, size, num_retries=num_retries)
+
+ def _readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
@@ -362,11 +365,17 @@ class StreamWriter(StreamReader):
def __init__(self, tokens, keep=None, debug=False, _empty=False,
num_retries=0):
super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
+
+ if len(self._files) != 1:
+ raise AssertionError("StreamWriter can only have one file at a time")
+ sr = self._files.popitem()[1]
+ self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
+
self.mutex = threading.Lock()
self.current_bblock = None
self.bufferblocks = {}
- # Proxy the methods listed below to self.nodes.
+ # wrap superclass methods in mutex
def _proxy_method(name):
method = getattr(StreamReader, name)
@functools.wraps(method, ('__name__', '__doc__'))
@@ -385,28 +394,75 @@ class StreamWriter(StreamReader):
else:
return self._keep.get(locator, num_retries=num_retries)
+ def _append(self, data):
+ if self.current_bblock is None:
+ streamoffset = sum([x[1] for x in self._data_locators])
+ self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
+ self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+ self._data_locators.append(self.current_bblock.locator_list_entry)
+ self.current_bblock.append(data)
+
def append(self, data):
with self.mutex:
- if self.current_bblock is None:
- streamoffset = sum([x[1] for x in self._data_locators])
- self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
- self.bufferblocks[self.current_bblock.locator] = self.current_bblock
- self._data_locators.append(self.current_bblock.locator_list_entry)
- self.current_bblock.append(data)
+ self._append(data)
class StreamFileWriter(StreamFileReader):
- def __init__(self, name, mode):
- super(StreamFileWriter, self).__init__(name, mode)
- self.mutex = threading.Lock()
+ def __init__(self, stream, segments, name):
+ super(StreamFileWriter, self).__init__(stream, segments, name)
+ self.mode = 'wb'
- # Proxy the methods listed below to self.nodes.
+ # wrap superclass methods in mutex
def _proxy_method(name):
method = getattr(StreamFileReader, name)
@functools.wraps(method, ('__name__', '__doc__'))
def wrapper(self, *args, **kwargs):
- with self.mutex:
+ with self._stream.mutex:
return method(self, *args, **kwargs)
return wrapper
for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
locals()[_method_name] = _proxy_method(_method_name)
+
+ def truncate(self, size=None):
+ with self._stream.mutex:
+ if size is None:
+ size = self._filepos
+
+ segs = locators_and_ranges(self.segments, 0, size)
+
+ newstream = []
+ self.segments = []
+ streamoffset = 0L
+ fileoffset = 0L
+
+ for seg in segs:
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
+ newstream.append([locator, blocksize, streamoffset])
+ self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
+ streamoffset += blocksize
+ fileoffset += segmentsize
+ self._stream._data_locators = newstream
+ if self._filepos > fileoffset:
+ self._filepos = fileoffset
+
+ def _writeto(self, offset, data):
+ # TODO
+ pass
+
+ def writeto(self, offset, data):
+ with self._stream.mutex:
+ self._writeto(offset, data)
+
+ def write(self, data):
+ with self._stream.mutex:
+ self._writeto(self._filepos, data)
+ self._filepos += len(data)
+
+ def writelines(self, seq):
+ with self._stream.mutex:
+ for s in seq:
+ self._writeto(self._filepos, s)
+ self._filepos += len(s)
+
+ def flush(self):
+ pass
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 0e26f27..b5130cb 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -8,7 +8,7 @@ import os
import unittest
import arvados
-from arvados import StreamReader, StreamFileReader, StreamWriter
+from arvados import StreamReader, StreamFileReader, StreamWriter, StreamFileWriter
import arvados_testutil as tutil
import run_test_server
@@ -282,14 +282,30 @@ class StreamWriterTestCase(unittest.TestCase):
def test_init(self):
stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
- self.assertEqual(stream.readfrom(0, 5), "01234")
+ self.assertEqual("01234", stream.readfrom(0, 5))
def test_append(self):
stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
- self.assertEqual(stream.readfrom(5, 8), "56789")
+ self.assertEqual("56789", stream.readfrom(5, 8))
stream.append("foo")
- self.assertEqual(stream.readfrom(5, 8), "56789foo")
+ self.assertEqual("56789foo", stream.readfrom(5, 8))
+
+
+class StreamFileWriterTestCase(unittest.TestCase):
+ class MockKeep(object):
+ def __init__(self, blocks):
+ self.blocks = blocks
+ def get(self, locator, num_retries=0):
+ return self.blocks[locator]
+
+ def test_truncate(self):
+ stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = stream.files()["count.txt"]
+ self.assertEqual("56789", writer.readfrom(5, 8))
+ writer.truncate(8)
+ self.assertEqual("567", writer.readfrom(5, 8))
if __name__ == '__main__':
unittest.main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list