[ARVADOS] updated: 96d2964dcbc02c3873084ee09183f0f5fb0c44ba
git at public.curoverse.com
git at public.curoverse.com
Wed Dec 17 16:31:41 EST 2014
Summary of changes:
sdk/python/arvados/arvfile.py | 1 -
sdk/python/arvados/stream.py | 50 +++++++++++++++++++++++++++++++++++------
sdk/python/tests/test_stream.py | 40 +++++++++++++++++++++++++++------
3 files changed, 76 insertions(+), 15 deletions(-)
via 96d2964dcbc02c3873084ee09183f0f5fb0c44ba (commit)
from b5efdd0afbe7795b036dc19b8f7d6b8a32da52df (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 96d2964dcbc02c3873084ee09183f0f5fb0c44ba
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Dec 17 16:33:06 2014 -0500
3198: Working on optimizing rewrites
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ef7a6c8..768a5eb 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -244,7 +244,6 @@ class StreamFileWriter(StreamFileReader):
def _writeto(self, offset, data):
self._stream._append(data)
replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
- self._filepos += len(data)
def writeto(self, offset, data):
with self._stream.mutex:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 8623ab9..0dec9e4 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -149,14 +149,19 @@ class StreamReader(object):
class BufferBlock(object):
- def __init__(self, locator, streamoffset):
+ def __init__(self, locator, streamoffset, starting_size=2**16):
self.locator = locator
- self.buffer_block = bytearray(config.KEEP_BLOCK_SIZE)
+ self.buffer_block = bytearray(starting_size)
self.buffer_view = memoryview(self.buffer_block)
self.write_pointer = 0
self.locator_list_entry = [locator, 0, streamoffset]
def append(self, data):
+ while (self.write_pointer+len(data)) > len(self.buffer_block):
+ new_buffer_block = bytearray(len(self.buffer_block) * 2)
+ new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
+ self.buffer_block = new_buffer_block
+ self.buffer_view = memoryview(self.buffer_block)
self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
self.write_pointer += len(data)
self.locator_list_entry[1] = self.write_pointer
@@ -196,13 +201,44 @@ class StreamWriter(StreamReader):
else:
return self._keep.get(locator, num_retries=num_retries)
+ def _init_bufferblock(self):
+ last = self._data_locators[-1]
+ streamoffset = last[OFFSET] + last[BLOCKSIZE]
+ if last[BLOCKSIZE] == 0:
+ del self._data_locators[-1]
+ self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
+ self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+ self._data_locators.append(self.current_bblock.locator_list_entry)
+
+ def _commit(self):
+ # commit buffer block
+
+ segs = self._files.values()[0].segments
+ print "segs %s bb %s" % (segs, self.current_bblock.locator_list_entry)
+ final_writes = [s for s in segs if s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]]
+ print "final_writes %s" % final_writes
+ # if size of final_writes < size of buffer block ...
+
+ # TODO: do 'put' in the background?
+ pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
+ self._data_locators[-1][0] = pdh
+ self.current_bblock = None
+
+ def commit(self):
+ with self.mutex:
+ self._commit()
+
def _append(self, data):
+ if len(data) > config.KEEP_BLOCK_SIZE:
+ raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
+
if self.current_bblock is None:
- last = self._data_locators[-1]
- streamoffset = last[OFFSET] + last[BLOCKSIZE]
- self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
- self.bufferblocks[self.current_bblock.locator] = self.current_bblock
- self._data_locators.append(self.current_bblock.locator_list_entry)
+ self._init_bufferblock()
+
+ if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+ self._commit()
+ self._init_bufferblock()
+
self.current_bblock.append(data)
def append(self, data):
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 30f08ba..0af274d 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -6,6 +6,7 @@ import io
import mock
import os
import unittest
+import hashlib
import arvados
from arvados import StreamReader, StreamFileReader, StreamWriter, StreamFileWriter
@@ -278,6 +279,10 @@ class StreamWriterTestCase(unittest.TestCase):
self.blocks = blocks
def get(self, locator, num_retries=0):
return self.blocks[locator]
+ def put(self, data):
+ pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+ self.blocks[pdh] = data
+ return pdh
def test_init(self):
stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
@@ -293,12 +298,6 @@ class StreamWriterTestCase(unittest.TestCase):
class StreamFileWriterTestCase(unittest.TestCase):
- class MockKeep(object):
- def __init__(self, blocks):
- self.blocks = blocks
- def get(self, locator, num_retries=0):
- return self.blocks[locator]
-
def test_truncate(self):
stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
@@ -314,6 +313,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
self.assertEqual("56789", writer.readfrom(5, 8))
writer.seek(10)
writer.write("foo")
+ self.assertEqual(writer.size(), 13)
self.assertEqual("56789foo", writer.readfrom(5, 8))
def test_write0(self):
@@ -323,6 +323,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
self.assertEqual("0123456789", writer.readfrom(0, 13))
writer.seek(0)
writer.write("foo")
+ self.assertEqual(writer.size(), 10)
self.assertEqual("foo3456789", writer.readfrom(0, 13))
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 10:3:count.txt 3:7:count.txt\n", stream.manifest_text())
@@ -333,6 +334,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
self.assertEqual("0123456789", writer.readfrom(0, 13))
writer.seek(3)
writer.write("foo")
+ self.assertEqual(writer.size(), 10)
self.assertEqual("012foo6789", writer.readfrom(0, 13))
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", stream.manifest_text())
@@ -343,6 +345,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
self.assertEqual("0123456789", writer.readfrom(0, 13))
writer.seek(7)
writer.write("foo")
+ self.assertEqual(writer.size(), 10)
self.assertEqual("0123456foo", writer.readfrom(0, 13))
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:3:count.txt\n", stream.manifest_text())
@@ -353,7 +356,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
self.assertEqual("012345678901234", writer.readfrom(0, 15))
writer.seek(7)
writer.write("foobar")
- print stream.manifest_text()
+ self.assertEqual(writer.size(), 20)
self.assertEqual("0123456foobar34", writer.readfrom(0, 15))
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", stream.manifest_text())
@@ -364,8 +367,31 @@ class StreamFileWriterTestCase(unittest.TestCase):
self.assertEqual("012301230123", writer.readfrom(0, 15))
writer.seek(2)
writer.write("abcdefg")
+ self.assertEqual(writer.size(), 12)
self.assertEqual("01abcdefg123", writer.readfrom(0, 15))
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", stream.manifest_text())
+ def test_write_large(self):
+ stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({}))
+ writer = stream.files()["count.txt"]
+ text = ''.join(["0123456789" for a in xrange(0, 100)])
+ for b in xrange(0, 100000):
+ writer.write(text)
+ self.assertEqual(writer.size(), 100000000)
+ stream.commit()
+ self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", stream.manifest_text())
+
+ def test_write_rewrite(self):
+ stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({}))
+ writer = stream.files()["count.txt"]
+ for b in xrange(0, 10):
+ writer.seek(0, os.SEEK_SET)
+ writer.write("0123456789")
+ stream.commit()
+ self.assertEqual(writer.size(), 10)
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", stream.manifest_text())
+
if __name__ == '__main__':
unittest.main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list