[ARVADOS] updated: 71959d0d8434b7af5de9a5b8260f0ebe11ec7238
git at public.curoverse.com
git at public.curoverse.com
Tue Dec 16 17:10:48 EST 2014
Summary of changes:
sdk/python/arvados/stream.py | 87 ++++++++++++++++++++++++++++++++++++++---
sdk/python/tests/test_stream.py | 10 +++++
2 files changed, 91 insertions(+), 6 deletions(-)
via 71959d0d8434b7af5de9a5b8260f0ebe11ec7238 (commit)
from 78700dbcc4f5d34f0a4cfee5c040e716d684ed62 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 71959d0d8434b7af5de9a5b8260f0ebe11ec7238
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Dec 16 17:11:26 2014 -0500
3198: Can use write to append only.
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 55718d9..3f64615 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -100,6 +100,70 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
i += 1
return resp
+def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+ '''
+ Replace a range with a new block.
+ data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+ range_start: start of range
+ range_size: size of range
+ new_locator: locator for new block to be inserted
+ !!! data_locators will be updated in place !!!
+ '''
+ if range_size == 0:
+ return
+
+ range_start = long(range_start)
+ range_size = long(range_size)
+ range_end = range_start + range_size
+
+ last = data_locators[-1]
+ if (last[OFFSET]+last[BLOCKSIZE]) == range_start:
+ # append new block
+ data_locators.append([new_locator, range_size, range_start])
+ return
+
+ i = first_block(data_locators, range_start, range_size, debug)
+ if i is None:
+ return
+
+ while i < len(data_locators):
+ locator, block_size, block_start = data_locators[i]
+ block_end = block_start + block_size
+ if debug:
+ print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+ if range_end <= block_start:
+ # range ends before this block starts, so don't look at any more locators
+ break
+
+ #if range_start >= block_end:
+ # range starts after this block ends, so go to next block
+ # we should always start at the first block due to the binary above, so this test is redundant
+ #next
+
+ if range_start >= block_start and range_end <= block_end:
+ # range starts and ends in this block
+ # split block into 3 pieces
+ #resp.append([locator, block_size, range_start - block_start, range_size])
+ pass
+ elif range_start >= block_start and range_end > block_end:
+ # range starts in this block
+ # split block into 2 pieces
+ #resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+ pass
+ elif range_start < block_start and range_end > block_end:
+ # range starts in a previous block and extends to further blocks
+ # zero out this block
+ #resp.append([locator, block_size, 0L, block_size])
+ pass
+ elif range_start < block_start and range_end <= block_end:
+ # range starts in a previous block and ends in this block
+ # split into 2 pieces
+ #resp.append([locator, block_size, 0L, range_end - block_start])
+ pass
+ block_start = block_end
+ i += 1
+
+
def split(path):
"""split(path) -> streamname, filename
@@ -148,15 +212,18 @@ class StreamFileReader(ArvadosFileBase):
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0L), self.size())
+ self._filepos = min(max(pos, 0L), self._size())
def tell(self):
return self._filepos
- def size(self):
+ def _size(self):
n = self.segments[-1]
return n[OFFSET] + n[BLOCKSIZE]
+ def size(self):
+ return self._size()
+
@ArvadosFileBase._before_close
@retry_method
def read(self, size, num_retries=None):
@@ -310,10 +377,13 @@ class StreamReader(object):
def all_files(self):
return self._files.values()
- def size(self):
+ def _size(self):
n = self._data_locators[-1]
return n[OFFSET] + n[BLOCKSIZE]
+ def size(self):
+ return self._size()
+
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
@@ -396,7 +466,8 @@ class StreamWriter(StreamReader):
def _append(self, data):
if self.current_bblock is None:
- streamoffset = sum([x[1] for x in self._data_locators])
+ last = self._data_locators[-1]
+ streamoffset = last[OFFSET] + last[BLOCKSIZE]
self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
self.bufferblocks[self.current_bblock.locator] = self.current_bblock
self._data_locators.append(self.current_bblock.locator_list_entry)
@@ -441,13 +512,17 @@ class StreamFileWriter(StreamFileReader):
self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
streamoffset += blocksize
fileoffset += segmentsize
+ if len(newstream) == 0:
+ newstream.append(config.EMPTY_BLOCK_LOCATOR)
+ self.segments.append([0, 0, 0])
self._stream._data_locators = newstream
if self._filepos > fileoffset:
self._filepos = fileoffset
def _writeto(self, offset, data):
- # TODO
- pass
+ self._stream._append(data)
+ replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
+ self._filepos += len(data)
def writeto(self, offset, data):
with self._stream.mutex:
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index b5130cb..f07ca6c 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -307,5 +307,15 @@ class StreamFileWriterTestCase(unittest.TestCase):
writer.truncate(8)
self.assertEqual("567", writer.readfrom(5, 8))
+ def test_append(self):
+ stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = stream.files()["count.txt"]
+ self.assertEqual("56789", writer.readfrom(5, 8))
+ writer.seek(10)
+ writer.write("foo")
+ self.assertEqual("56789foo", writer.readfrom(5, 8))
+ #print stream.manifest_text()
+
if __name__ == '__main__':
unittest.main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list