[ARVADOS] updated: cc946c07eade09974423955f8d4b080941f53c7b

git at public.curoverse.com git at public.curoverse.com
Thu Dec 18 11:24:40 EST 2014


Summary of changes:
 sdk/python/arvados/arvfile.py   |  2 ++
 sdk/python/arvados/stream.py    | 63 +++++++++++++++++++++++++++++++++++------
 sdk/python/tests/test_stream.py | 29 +++++++++++++++++--
 3 files changed, 84 insertions(+), 10 deletions(-)

       via  cc946c07eade09974423955f8d4b080941f53c7b (commit)
      from  96d2964dcbc02c3873084ee09183f0f5fb0c44ba (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit cc946c07eade09974423955f8d4b080941f53c7b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Dec 18 11:26:04 2014 -0500

    3198: Support repacking buffer blocks when writes are superceded.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 768a5eb..ad52195 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -242,6 +242,8 @@ class StreamFileWriter(StreamFileReader):
                 self._filepos = fileoffset
 
     def _writeto(self, offset, data):
+        if offset > self._size():
+            raise ArgumentError("Offset is past the end of the file")
         self._stream._append(data)
         replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
 
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 0dec9e4..bbbbb95 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -4,6 +4,7 @@ import os
 import re
 import threading
 import functools
+import copy
 
 from .ranges import *
 from .arvfile import ArvadosFileBase, StreamFileReader, StreamFileWriter
@@ -210,15 +211,58 @@ class StreamWriter(StreamReader):
         self.bufferblocks[self.current_bblock.locator] = self.current_bblock
         self._data_locators.append(self.current_bblock.locator_list_entry)
 
+    def _repack_writes(self):
+        '''Test if the buffer block has more data than is referenced by actual segments
+        (this happens when a buffered write over-writes a file range written in
+        a previous buffered write).  Re-pack the buffer block for efficiency
+        and to avoid leaking information.
+        '''
+        segs = self._files.values()[0].segments
+
+        bufferblock_segs = []
+        i = 0
+        tmp_segs = copy.copy(segs)
+        while i < len(tmp_segs):
+            # Go through each segment and identify segments that include the buffer block
+            s = tmp_segs[i]
+            if s[LOCATOR] < self.current_bblock.locator_list_entry[OFFSET] and (s[LOCATOR] + s[BLOCKSIZE]) > self.current_bblock.locator_list_entry[OFFSET]:
+                # The segment straddles the previous block and the current buffer block.  Split the segment.
+                b1 = self.current_bblock.locator_list_entry[OFFSET] - s[LOCATOR]
+                b2 = (s[LOCATOR] + s[BLOCKSIZE]) - self.current_bblock.locator_list_entry[OFFSET]
+                bb_seg = [self.current_bblock.locator_list_entry[OFFSET], b2, s[OFFSET]+b1]
+                tmp_segs[i] = [s[LOCATOR], b1, s[OFFSET]]
+                tmp_segs.insert(i+1, bb_seg)
+                bufferblock_segs.append(bb_seg)
+                i += 1
+            elif s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]:
+                # The segment's data is in the buffer block.
+                bufferblock_segs.append(s)
+            i += 1
+
+        # Now sum up the segments to get the total bytes
+        # of the file referencing into the buffer block.
+        write_total = sum([s[BLOCKSIZE] for s in bufferblock_segs])
+
+        if write_total < self.current_bblock.locator_list_entry[BLOCKSIZE]:
+            # There is more data in the buffer block than is actually accounted for by segments, so
+            # re-pack into a new buffer by copying over to a new buffer block.
+            new_bb = BufferBlock(self.current_bblock.locator,
+                                 self.current_bblock.locator_list_entry[OFFSET],
+                                 starting_size=write_total)
+            for t in bufferblock_segs:
+                t_start = t[LOCATOR] - self.current_bblock.locator_list_entry[OFFSET]
+                t_end = t_start + t[BLOCKSIZE]
+                t[0] = self.current_bblock.locator_list_entry[OFFSET] + new_bb.write_pointer
+                new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
+
+            self.current_bblock = new_bb
+            self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+            self._data_locators[-1] = self.current_bblock.locator_list_entry
+            self._files.values()[0].segments = tmp_segs
+
     def _commit(self):
         # commit buffer block
 
-        segs = self._files.values()[0].segments
-        print "segs %s bb %s" % (segs, self.current_bblock.locator_list_entry)
-        final_writes = [s for s in segs if s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]]
-        print "final_writes %s" % final_writes
-        # if size of final_writes < size of buffer block ...
-
         # TODO: do 'put' in the background?
         pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
         self._data_locators[-1][0] = pdh
@@ -226,6 +270,7 @@ class StreamWriter(StreamReader):
 
     def commit(self):
         with self.mutex:
+            self._repack_writes()
             self._commit()
 
     def _append(self, data):
@@ -236,8 +281,10 @@ class StreamWriter(StreamReader):
             self._init_bufferblock()
 
         if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
-            self._commit()
-            self._init_bufferblock()
+            self._repack_writes()
+            if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+                self._commit()
+                self._init_bufferblock()
 
         self.current_bblock.append(data)
 
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 0af274d..5f0264c 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -281,7 +281,7 @@ class StreamWriterTestCase(unittest.TestCase):
             return self.blocks[locator]
         def put(self, data):
             pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
-            self.blocks[pdh] = data
+            self.blocks[pdh] = str(data)
             return pdh
 
     def test_init(self):
@@ -382,7 +382,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
         stream.commit()
         self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", stream.manifest_text())
 
-    def test_write_rewrite(self):
+    def test_write_rewrite0(self):
         stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
                               keep=StreamWriterTestCase.MockKeep({}))
         writer = stream.files()["count.txt"]
@@ -391,7 +391,32 @@ class StreamFileWriterTestCase(unittest.TestCase):
             writer.write("0123456789")
         stream.commit()
         self.assertEqual(writer.size(), 10)
+        self.assertEqual("0123456789", writer.readfrom(0, 20))
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", stream.manifest_text())
 
+    def test_write_rewrite1(self):
+        stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = stream.files()["count.txt"]
+        for b in xrange(0, 10):
+            writer.seek(10, os.SEEK_SET)
+            writer.write("abcdefghij")
+        stream.commit()
+        self.assertEqual(writer.size(), 20)
+        self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:10:count.txt 10:10:count.txt\n", stream.manifest_text())
+
+    def test_write_rewrite2(self):
+        stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = stream.files()["count.txt"]
+        for b in xrange(0, 10):
+            writer.seek(5, os.SEEK_SET)
+            writer.write("abcdefghij")
+        stream.commit()
+        self.assertEqual(writer.size(), 15)
+        self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", stream.manifest_text())
+
 if __name__ == '__main__':
     unittest.main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list