[ARVADOS] updated: 71959d0d8434b7af5de9a5b8260f0ebe11ec7238

git at public.curoverse.com git at public.curoverse.com
Tue Dec 16 17:10:48 EST 2014


Summary of changes:
 sdk/python/arvados/stream.py    | 87 ++++++++++++++++++++++++++++++++++++++---
 sdk/python/tests/test_stream.py | 10 +++++
 2 files changed, 91 insertions(+), 6 deletions(-)

       via  71959d0d8434b7af5de9a5b8260f0ebe11ec7238 (commit)
      from  78700dbcc4f5d34f0a4cfee5c040e716d684ed62 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 71959d0d8434b7af5de9a5b8260f0ebe11ec7238
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Dec 16 17:11:26 2014 -0500

    3198: Can use write to append only.

diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 55718d9..3f64615 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -100,6 +100,70 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
         i += 1
     return resp
 
+def replace_range(data_locators, range_start, range_size, new_locator, debug=False):
+    '''
+    Replace a range with a new block.
+    data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+    range_start: start of range
+    range_size: size of range
+    new_locator: locator for new block to be inserted
+    !!! data_locators will be updated in place !!!
+    '''
+    if range_size == 0:
+        return
+
+    range_start = long(range_start)
+    range_size = long(range_size)
+    range_end = range_start + range_size
+
+    last = data_locators[-1]
+    if (last[OFFSET]+last[BLOCKSIZE]) == range_start:
+        # append new block
+        data_locators.append([new_locator, range_size, range_start])
+        return
+
+    i = first_block(data_locators, range_start, range_size, debug)
+    if i is None:
+        return
+
+    while i < len(data_locators):
+        locator, block_size, block_start = data_locators[i]
+        block_end = block_start + block_size
+        if debug:
+            print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+        if range_end <= block_start:
+            # range ends before this block starts, so don't look at any more locators
+            break
+
+        #if range_start >= block_end:
+            # range starts after this block ends, so go to next block
+            # we should always start at the first block due to the binary above, so this test is redundant
+            #next
+
+        if range_start >= block_start and range_end <= block_end:
+            # range starts and ends in this block
+            # split block into 3 pieces
+            #resp.append([locator, block_size, range_start - block_start, range_size])
+            pass
+        elif range_start >= block_start and range_end > block_end:
+            # range starts in this block
+            # split block into 2 pieces
+            #resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+            pass
+        elif range_start < block_start and range_end > block_end:
+            # range starts in a previous block and extends to further blocks
+            # zero out this block
+            #resp.append([locator, block_size, 0L, block_size])
+            pass
+        elif range_start < block_start and range_end <= block_end:
+            # range starts in a previous block and ends in this block
+            # split into 2 pieces
+            #resp.append([locator, block_size, 0L, range_end - block_start])
+            pass
+        block_start = block_end
+        i += 1
+
+
 def split(path):
     """split(path) -> streamname, filename
 
@@ -148,15 +212,18 @@ class StreamFileReader(ArvadosFileBase):
             pos += self._filepos
         elif whence == os.SEEK_END:
             pos += self.size()
-        self._filepos = min(max(pos, 0L), self.size())
+        self._filepos = min(max(pos, 0L), self._size())
 
     def tell(self):
         return self._filepos
 
-    def size(self):
+    def _size(self):
         n = self.segments[-1]
         return n[OFFSET] + n[BLOCKSIZE]
 
+    def size(self):
+        return self._size()
+
     @ArvadosFileBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
@@ -310,10 +377,13 @@ class StreamReader(object):
     def all_files(self):
         return self._files.values()
 
-    def size(self):
+    def _size(self):
         n = self._data_locators[-1]
         return n[OFFSET] + n[BLOCKSIZE]
 
+    def size(self):
+        return self._size()
+
     def locators_and_ranges(self, range_start, range_size):
         return locators_and_ranges(self._data_locators, range_start, range_size)
 
@@ -396,7 +466,8 @@ class StreamWriter(StreamReader):
 
     def _append(self, data):
         if self.current_bblock is None:
-            streamoffset = sum([x[1] for x in self._data_locators])
+            last = self._data_locators[-1]
+            streamoffset = last[OFFSET] + last[BLOCKSIZE]
             self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
             self.bufferblocks[self.current_bblock.locator] = self.current_bblock
             self._data_locators.append(self.current_bblock.locator_list_entry)
@@ -441,13 +512,17 @@ class StreamFileWriter(StreamFileReader):
                     self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
                     streamoffset += blocksize
                     fileoffset += segmentsize
+            if len(newstream) == 0:
+                newstream.append(config.EMPTY_BLOCK_LOCATOR)
+                self.segments.append([0, 0, 0])
             self._stream._data_locators = newstream
             if self._filepos > fileoffset:
                 self._filepos = fileoffset
 
     def _writeto(self, offset, data):
-        # TODO
-        pass
+        self._stream._append(data)
+        replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data))
+        self._filepos += len(data)
 
     def writeto(self, offset, data):
         with self._stream.mutex:
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index b5130cb..f07ca6c 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -307,5 +307,15 @@ class StreamFileWriterTestCase(unittest.TestCase):
         writer.truncate(8)
         self.assertEqual("567", writer.readfrom(5, 8))
 
+    def test_append(self):
+        stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = stream.files()["count.txt"]
+        self.assertEqual("56789", writer.readfrom(5, 8))
+        writer.seek(10)
+        writer.write("foo")
+        self.assertEqual("56789foo", writer.readfrom(5, 8))
+        #print stream.manifest_text()
+
 if __name__ == '__main__':
     unittest.main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list