[ARVADOS] updated: 78700dbcc4f5d34f0a4cfee5c040e716d684ed62

git at public.curoverse.com git at public.curoverse.com
Tue Dec 16 13:50:27 EST 2014


Summary of changes:
 sdk/python/arvados/commands/put.py |  4 +-
 sdk/python/arvados/stream.py       | 90 +++++++++++++++++++++++++++++++-------
 sdk/python/tests/test_stream.py    | 24 ++++++++--
 3 files changed, 95 insertions(+), 23 deletions(-)

       via  78700dbcc4f5d34f0a4cfee5c040e716d684ed62 (commit)
      from  5bf8784175517327b334fb4f7c8d3b8ee505353d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 78700dbcc4f5d34f0a4cfee5c040e716d684ed62
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Dec 16 13:51:06 2014 -0500

    3198: Support for truncating files.  A few tests.  Next step work on random access writing.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index d070a8b..95ba172 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -295,12 +295,12 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
 
     def flush_data(self):
         start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
+        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
         super(ArvPutCollectionWriter, self).flush_data()
         if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
             self.bytes_written += (start_buffer_len - self._data_buffer_len)
             self.report_progress()
-            if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
                 self.cache_state()
 
     def _record_new_input(self, input_type, source_name, dest_name):
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index f57e303..55718d9 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -168,7 +168,7 @@ class StreamFileReader(ArvadosFileBase):
         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
         if available_chunks:
             locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
-            data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+            data = self._stream._readfrom(locator+segmentoffset, segmentsize,
                                          num_retries=num_retries)
 
         self._filepos += len(data)
@@ -183,7 +183,7 @@ class StreamFileReader(ArvadosFileBase):
 
         data = []
         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
-            data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+            data.append(self._stream._readfrom(locator+segmentoffset, segmentsize,
                                               num_retries=num_retries))
         return ''.join(data)
 
@@ -295,8 +295,8 @@ class StreamReader(object):
                 if name not in self._files:
                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
                 else:
-                    n = self._files[name]
-                    n.segments.append([pos, size, n.size()])
+                    filereader = self._files[name]
+                    filereader.segments.append([pos, size, filereader.size()])
                 continue
 
             raise errors.SyntaxError("Invalid manifest format")
@@ -318,10 +318,13 @@ class StreamReader(object):
         return locators_and_ranges(self._data_locators, range_start, range_size)
 
     def _keepget(self, locator, num_retries=None):
-        self._keep.get(locator, num_retries=num_retries)
+        return self._keep.get(locator, num_retries=num_retries)
 
     @retry_method
     def readfrom(self, start, size, num_retries=None):
+        self._readfrom(start, size, num_retries=num_retries)
+
+    def _readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
         if size == 0:
             return ''
@@ -362,11 +365,17 @@ class StreamWriter(StreamReader):
     def __init__(self, tokens, keep=None, debug=False, _empty=False,
                  num_retries=0):
         super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
+
+        if len(self._files) != 1:
+            raise AssertionError("StreamWriter can only have one file at a time")
+        sr = self._files.popitem()[1]
+        self._files[sr.name] = StreamFileWriter(self, sr.segments, sr.name)
+
         self.mutex = threading.Lock()
         self.current_bblock = None
         self.bufferblocks = {}
 
-    # Proxy the methods listed below to self.nodes.
+    # wrap superclass methods in mutex
     def _proxy_method(name):
         method = getattr(StreamReader, name)
         @functools.wraps(method, ('__name__', '__doc__'))
@@ -385,28 +394,75 @@ class StreamWriter(StreamReader):
         else:
             return self._keep.get(locator, num_retries=num_retries)
 
+    def _append(self, data):
+        if self.current_bblock is None:
+            streamoffset = sum([x[1] for x in self._data_locators])
+            self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
+            self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+            self._data_locators.append(self.current_bblock.locator_list_entry)
+        self.current_bblock.append(data)
+
     def append(self, data):
         with self.mutex:
-            if self.current_bblock is None:
-                streamoffset = sum([x[1] for x in self._data_locators])
-                self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
-                self.bufferblocks[self.current_bblock.locator] = self.current_bblock
-                self._data_locators.append(self.current_bblock.locator_list_entry)
-            self.current_bblock.append(data)
+            self._append(data)
 
 class StreamFileWriter(StreamFileReader):
-    def __init__(self, name, mode):
-        super(StreamFileWriter, self).__init__(name, mode)
-        self.mutex = threading.Lock()
+    def __init__(self, stream, segments, name):
+        super(StreamFileWriter, self).__init__(stream, segments, name)
+        self.mode = 'wb'
 
-    # Proxy the methods listed below to self.nodes.
+    # wrap superclass methods in mutex
     def _proxy_method(name):
         method = getattr(StreamFileReader, name)
         @functools.wraps(method, ('__name__', '__doc__'))
         def wrapper(self, *args, **kwargs):
-            with self.mutex:
+            with self._stream.mutex:
                 return method(self, *args, **kwargs)
         return wrapper
 
     for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
         locals()[_method_name] = _proxy_method(_method_name)
+
+    def truncate(self, size=None):
+        with self._stream.mutex:
+            if size is None:
+                size = self._filepos
+
+            segs = locators_and_ranges(self.segments, 0, size)
+
+            newstream = []
+            self.segments = []
+            streamoffset = 0L
+            fileoffset = 0L
+
+            for seg in segs:
+                for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]):
+                    newstream.append([locator, blocksize, streamoffset])
+                    self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset])
+                    streamoffset += blocksize
+                    fileoffset += segmentsize
+            self._stream._data_locators = newstream
+            if self._filepos > fileoffset:
+                self._filepos = fileoffset
+
+    def _writeto(self, offset, data):
+        # TODO
+        pass
+
+    def writeto(self, offset, data):
+        with self._stream.mutex:
+            self._writeto(offset, data)
+
+    def write(self, data):
+        with self._stream.mutex:
+            self._writeto(self._filepos, data)
+            self._filepos += len(data)
+
+    def writelines(self, seq):
+        with self._stream.mutex:
+            for s in seq:
+                self._writeto(self._filepos, s)
+                self._filepos += len(s)
+
+    def flush(self):
+        pass
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 0e26f27..b5130cb 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -8,7 +8,7 @@ import os
 import unittest
 
 import arvados
-from arvados import StreamReader, StreamFileReader, StreamWriter
+from arvados import StreamReader, StreamFileReader, StreamWriter, StreamFileWriter
 
 import arvados_testutil as tutil
 import run_test_server
@@ -282,14 +282,30 @@ class StreamWriterTestCase(unittest.TestCase):
     def test_init(self):
         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-        self.assertEqual(stream.readfrom(0, 5), "01234")
+        self.assertEqual("01234", stream.readfrom(0, 5))
 
     def test_append(self):
         stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
                               keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
-        self.assertEqual(stream.readfrom(5, 8), "56789")
+        self.assertEqual("56789", stream.readfrom(5, 8))
         stream.append("foo")
-        self.assertEqual(stream.readfrom(5, 8), "56789foo")
+        self.assertEqual("56789foo", stream.readfrom(5, 8))
+
+
+class StreamFileWriterTestCase(unittest.TestCase):
+    class MockKeep(object):
+        def __init__(self, blocks):
+            self.blocks = blocks
+        def get(self, locator, num_retries=0):
+            return self.blocks[locator]
+
+    def test_truncate(self):
+        stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = stream.files()["count.txt"]
+        self.assertEqual("56789", writer.readfrom(5, 8))
+        writer.truncate(8)
+        self.assertEqual("567", writer.readfrom(5, 8))
 
 if __name__ == '__main__':
     unittest.main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list