[ARVADOS] updated: cc946c07eade09974423955f8d4b080941f53c7b
git at public.curoverse.com
git at public.curoverse.com
Thu Dec 18 11:24:40 EST 2014
Summary of changes:
sdk/python/arvados/arvfile.py | 2 ++
sdk/python/arvados/stream.py | 63 +++++++++++++++++++++++++++++++++++------
sdk/python/tests/test_stream.py | 29 +++++++++++++++++--
3 files changed, 84 insertions(+), 10 deletions(-)
via cc946c07eade09974423955f8d4b080941f53c7b (commit)
from 96d2964dcbc02c3873084ee09183f0f5fb0c44ba (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit cc946c07eade09974423955f8d4b080941f53c7b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Dec 18 11:26:04 2014 -0500
3198: Support repacking buffer blocks when writes are superceded.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 768a5eb..ad52195 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -242,6 +242,8 @@ class StreamFileWriter(StreamFileReader):
self._filepos = fileoffset
def _writeto(self, offset, data):
+ if offset > self._size():
+ raise ArgumentError("Offset is past the end of the file")
self._stream._append(data)
replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 0dec9e4..bbbbb95 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -4,6 +4,7 @@ import os
import re
import threading
import functools
+import copy
from .ranges import *
from .arvfile import ArvadosFileBase, StreamFileReader, StreamFileWriter
@@ -210,15 +211,58 @@ class StreamWriter(StreamReader):
self.bufferblocks[self.current_bblock.locator] = self.current_bblock
self._data_locators.append(self.current_bblock.locator_list_entry)
+ def _repack_writes(self):
+ '''Test if the buffer block has more data than is referenced by actual segments
+ (this happens when a buffered write over-writes a file range written in
+ a previous buffered write). Re-pack the buffer block for efficiency
+ and to avoid leaking information.
+ '''
+ segs = self._files.values()[0].segments
+
+ bufferblock_segs = []
+ i = 0
+ tmp_segs = copy.copy(segs)
+ while i < len(tmp_segs):
+ # Go through each segment and identify segments that include the buffer block
+ s = tmp_segs[i]
+ if s[LOCATOR] < self.current_bblock.locator_list_entry[OFFSET] and (s[LOCATOR] + s[BLOCKSIZE]) > self.current_bblock.locator_list_entry[OFFSET]:
+ # The segment straddles the previous block and the current buffer block. Split the segment.
+ b1 = self.current_bblock.locator_list_entry[OFFSET] - s[LOCATOR]
+ b2 = (s[LOCATOR] + s[BLOCKSIZE]) - self.current_bblock.locator_list_entry[OFFSET]
+ bb_seg = [self.current_bblock.locator_list_entry[OFFSET], b2, s[OFFSET]+b1]
+ tmp_segs[i] = [s[LOCATOR], b1, s[OFFSET]]
+ tmp_segs.insert(i+1, bb_seg)
+ bufferblock_segs.append(bb_seg)
+ i += 1
+ elif s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]:
+ # The segment's data is in the buffer block.
+ bufferblock_segs.append(s)
+ i += 1
+
+ # Now sum up the segments to get the total bytes
+ # of the file referencing into the buffer block.
+ write_total = sum([s[BLOCKSIZE] for s in bufferblock_segs])
+
+ if write_total < self.current_bblock.locator_list_entry[BLOCKSIZE]:
+ # There is more data in the buffer block than is actually accounted for by segments, so
+ # re-pack into a new buffer by copying over to a new buffer block.
+ new_bb = BufferBlock(self.current_bblock.locator,
+ self.current_bblock.locator_list_entry[OFFSET],
+ starting_size=write_total)
+ for t in bufferblock_segs:
+ t_start = t[LOCATOR] - self.current_bblock.locator_list_entry[OFFSET]
+ t_end = t_start + t[BLOCKSIZE]
+ t[0] = self.current_bblock.locator_list_entry[OFFSET] + new_bb.write_pointer
+ new_bb.append(self.current_bblock.buffer_block[t_start:t_end])
+
+ self.current_bblock = new_bb
+ self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+ self._data_locators[-1] = self.current_bblock.locator_list_entry
+ self._files.values()[0].segments = tmp_segs
+
def _commit(self):
# commit buffer block
- segs = self._files.values()[0].segments
- print "segs %s bb %s" % (segs, self.current_bblock.locator_list_entry)
- final_writes = [s for s in segs if s[LOCATOR] >= self.current_bblock.locator_list_entry[OFFSET]]
- print "final_writes %s" % final_writes
- # if size of final_writes < size of buffer block ...
-
# TODO: do 'put' in the background?
pdh = self._keep.put(self.current_bblock.buffer_block[0:self.current_bblock.write_pointer])
self._data_locators[-1][0] = pdh
@@ -226,6 +270,7 @@ class StreamWriter(StreamReader):
def commit(self):
with self.mutex:
+ self._repack_writes()
self._commit()
def _append(self, data):
@@ -236,8 +281,10 @@ class StreamWriter(StreamReader):
self._init_bufferblock()
if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
- self._commit()
- self._init_bufferblock()
+ self._repack_writes()
+ if (self.current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+ self._commit()
+ self._init_bufferblock()
self.current_bblock.append(data)
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 0af274d..5f0264c 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -281,7 +281,7 @@ class StreamWriterTestCase(unittest.TestCase):
return self.blocks[locator]
def put(self, data):
pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
- self.blocks[pdh] = data
+ self.blocks[pdh] = str(data)
return pdh
def test_init(self):
@@ -382,7 +382,7 @@ class StreamFileWriterTestCase(unittest.TestCase):
stream.commit()
self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", stream.manifest_text())
- def test_write_rewrite(self):
+ def test_write_rewrite0(self):
stream = StreamWriter(['.', arvados.config.EMPTY_BLOCK_LOCATOR, '0:0:count.txt'],
keep=StreamWriterTestCase.MockKeep({}))
writer = stream.files()["count.txt"]
@@ -391,7 +391,32 @@ class StreamFileWriterTestCase(unittest.TestCase):
writer.write("0123456789")
stream.commit()
self.assertEqual(writer.size(), 10)
+ self.assertEqual("0123456789", writer.readfrom(0, 20))
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", stream.manifest_text())
+ def test_write_rewrite1(self):
+ stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = stream.files()["count.txt"]
+ for b in xrange(0, 10):
+ writer.seek(10, os.SEEK_SET)
+ writer.write("abcdefghij")
+ stream.commit()
+ self.assertEqual(writer.size(), 20)
+ self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:10:count.txt 10:10:count.txt\n", stream.manifest_text())
+
+ def test_write_rewrite2(self):
+ stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = stream.files()["count.txt"]
+ for b in xrange(0, 10):
+ writer.seek(5, os.SEEK_SET)
+ writer.write("abcdefghij")
+ stream.commit()
+ self.assertEqual(writer.size(), 15)
+ self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", stream.manifest_text())
+
if __name__ == '__main__':
unittest.main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list