[ARVADOS] updated: 96d2964dcbc02c3873084ee09183f0f5fb0c44ba

git at public.curoverse.com git at public.curoverse.com
Wed Dec 17 16:31:41 EST 2014


Summary of changes:
 sdk/python/arvados/arvfile.py   |  1 -
 sdk/python/arvados/stream.py    | 50 +++++++++++++++++++++++++++++++++++------
 sdk/python/tests/test_stream.py | 40 +++++++++++++++++++++++++++------
 3 files changed, 76 insertions(+), 15 deletions(-)

       via  96d2964dcbc02c3873084ee09183f0f5fb0c44ba (commit)
      from  b5efdd0afbe7795b036dc19b8f7d6b8a32da52df (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 96d2964dcbc02c3873084ee09183f0f5fb0c44ba
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Dec 17 16:33:06 2014 -0500

    3198: Working on optimizing rewrites

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ef7a6c8..768a5eb 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -244,7 +244,6 @@ class StreamFileWriter(StreamFileReader):
     def _writeto(self, offset, data):
         self._stream._append(data)
         replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
-        self._filepos += len(data)
 
     def writeto(self, offset, data):
         with self._stream.mutex:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 8623ab9..0dec9e4 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -149,14 +149,19 @@ class StreamReader(object):
 
 
 class BufferBlock(object):
-    def __init__(self, locator, streamoffset):
+    def __init__(self, locator, streamoffset, starting_size=2**16):
         self.locator = locator
-        self.buffer_block = bytearray(config.KEEP_BLOCK_SIZE)
+        self.buffer_block = bytearray(starting_size)
         self.buffer_view = memoryview(self.buffer_block)
         self.write_pointer = 0
         self.locator_list_entry = [locator, 0, streamoffset]
 
     def append(self, data):
+        while (self.write_pointer+len(data)) > len(self.buffer_block):
+            new_buffer_block = bytearray(len(self.buffer_block) * 2)
+            new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
+            self.buffer_block = new_buffer_block
+            self.buffer_view = memoryview(self.buffer_block)
         self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
         self.write_pointer += len(data)
         self.locator_list_entry[1] = self.write_pointer
@@ -196,13 +201,44 @@ class StreamWriter(StreamReader):
         else:
             return self._keep.get(locator, num_retries=num_retries)
 
+    def _init_bufferblock(self):
+        last = self._data_locators[-1]
+        streamoffset = last[OFFSET] + last[BLOCKSIZE]
+        if last[BLOCKSIZE] == 0:
+            del self._data_locators[-1]
+        self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
+        self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+        self._data_locators.append(self.current_bblock.locator_list_entry)
+
+    def _commit(self):
+        # commit buffer block
+
+        segs = self._files.values()[0].segments
+        print "segs %s bb %s" % (segs, self.current_bblock.locator_list_entry)
+        final_writes = [s for s in segs if s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]]
+        print "final_writes %s" % final_writes
+        # if size of final_writes < size of buffer block ...
+
+        # TODO: do 'put' in the background?
+        pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
+        self._data_locators[-1][0] = pdh
+        self.current_bblock = None
+
+    def commit(self):
+        with self.mutex:
+            self._commit()
+
     def _append(self, data):
+        if len(data) > config.KEEP_BLOCK_SIZE:
+            raise ArgumentError("Please append data chunks smaller than config.KEEP_BLOCK_SIZE")
+
         if self.current_bblock is None:
-            last = self._data_locators[-1]
-            streamoffset = last[OFFSET] + last[BLOCKSIZE]
-            self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
-            self.bufferblocks[self.current_bblock.locator] = self.current_bblock
-            self._data_locators.append(self.current_bblock.locator_list_entry)
+            self._init_bufferblock()
+
+        if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+            self._commit()
+            self._init_bufferblock()
+
         self.current_bblock.append(data)
 
     def append(self, data):
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 30f08ba..0af274d 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -6,6 +6,7 @@ import io
 import mock
 import os
 import unittest
+import hashlib
 
 import arvados
 from arvados import StreamReader, StreamFileReader, StreamWriter, StreamFileWriter
@@ -278,6 +279,10 @@ class StreamWriterTestCase(unittest.TestCase):
             self.blocks = blocks
         def get(self, locator, num_retries=0):
             return self.blocks[locator]
+        def put(self, data):
+            pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+            self.blocks[pdh] = data
+            return pdh
 
     def test_init(self):
         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
@@ -293,12 +298,6 @@ class StreamWriterTestCase(unittest.TestCase):
 
 
 class StreamFileWriterTestCase(unittest.TestCase):
-    class MockKeep(object):
-        def __init__(self, blocks):
-            self.blocks = blocks
-        def get(self, locator, num_retries=0):
-            return self.blocks[locator]
-
     def test_truncate(self):
         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
@@ -314,6 +313,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
         self.assertEqual("56789", writer.readfrom(5, 8))
         writer.seek(10)
         writer.write("foo")
+        self.assertEqual(writer.size(), 13)
         self.assertEqual("56789foo", writer.readfrom(5, 8))
 
     def test_write0(self):
@@ -323,6 +323,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
         self.assertEqual("0123456789", writer.readfrom(0, 13))
         writer.seek(0)
         writer.write("foo")
+        self.assertEqual(writer.size(), 10)
         self.assertEqual("foo3456789", writer.readfrom(0, 13))
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 10:3:count.txt 3:7:count.txt\n", stream.manifest_text())
 
@@ -333,6 +334,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
         self.assertEqual("0123456789", writer.readfrom(0, 13))
         writer.seek(3)
         writer.write("foo")
+        self.assertEqual(writer.size(), 10)
         self.assertEqual("012foo6789", writer.readfrom(0, 13))
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", stream.manifest_text())
 
@@ -343,6 +345,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
         self.assertEqual("0123456789", writer.readfrom(0, 13))
         writer.seek(7)
         writer.write("foo")
+        self.assertEqual(writer.size(), 10)
         self.assertEqual("0123456foo", writer.readfrom(0, 13))
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:3:count.txt\n", stream.manifest_text())
 
@@ -353,7 +356,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
         self.assertEqual("012345678901234", writer.readfrom(0, 15))
         writer.seek(7)
         writer.write("foobar")
-        print stream.manifest_text()
+        self.assertEqual(writer.size(), 20)
         self.assertEqual("0123456foobar34", writer.readfrom(0, 15))
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", stream.manifest_text())
 
@@ -364,8 +367,31 @@ class StreamFileWriterTestCase(unittest.TestCase):
         self.assertEqual("012301230123", writer.readfrom(0, 15))
         writer.seek(2)
         writer.write("abcdefg")
+        self.assertEqual(writer.size(), 12)
         self.assertEqual("01abcdefg123", writer.readfrom(0, 15))
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 bufferblock0 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", stream.manifest_text())
 
+    def test_write_large(self):
+        stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({}))
+        writer = stream.files()["count.txt"]
+        text = ''.join(["0123456789" for a in xrange(0, 100)])
+        for b in xrange(0, 100000):
+            writer.write(text)
+        self.assertEqual(writer.size(), 100000000)
+        stream.commit()
+        self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", stream.manifest_text())
+
+    def test_write_rewrite(self):
+        stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({}))
+        writer = stream.files()["count.txt"]
+        for b in xrange(0, 10):
+            writer.seek(0, os.SEEK_SET)
+            writer.write("0123456789")
+        stream.commit()
+        self.assertEqual(writer.size(), 10)
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", stream.manifest_text())
+
 if __name__ == '__main__':
     unittest.main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list