[ARVADOS] created: 5bf8784175517327b334fb4f7c8d3b8ee505353d

git at public.curoverse.com git at public.curoverse.com
Mon Dec 15 17:03:20 EST 2014


        at  5bf8784175517327b334fb4f7c8d3b8ee505353d (commit)


commit 5bf8784175517327b334fb4f7c8d3b8ee505353d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 15 17:03:57 2014 -0500

    3198: Initial support appending to streams.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index d530f58..5ab7e77 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -302,8 +302,6 @@ class _WriterFile(ArvadosFileBase):
 
 
 class CollectionWriter(CollectionBase):
-    KEEP_BLOCK_SIZE = 2**26
-
     def __init__(self, api_client=None, num_retries=0):
         """Instantiate a CollectionWriter.
 
@@ -369,7 +367,7 @@ class CollectionWriter(CollectionBase):
 
     def _work_file(self):
         while True:
-            buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
+            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
             if not buf:
                 break
             self.write(buf)
@@ -441,7 +439,7 @@ class CollectionWriter(CollectionBase):
         self._data_buffer.append(newdata)
         self._data_buffer_len += len(newdata)
         self._current_stream_length += len(newdata)
-        while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
+        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
             self.flush_data()
 
     def open(self, streampath, filename=None):
@@ -477,8 +475,8 @@ class CollectionWriter(CollectionBase):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
-                self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
-            self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
+                self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
+            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
     def start_new_file(self, newfilename=None):
diff --git a/sdk/python/arvados/config.py b/sdk/python/arvados/config.py
index a0c3cc6..d293a31 100644
--- a/sdk/python/arvados/config.py
+++ b/sdk/python/arvados/config.py
@@ -12,6 +12,7 @@ if os.environ.get('HOME') is not None:
 else:
     default_config_file = ''
 
+KEEP_BLOCK_SIZE = 2**26
 EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
 
 def initialize(config_file=default_config_file):
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 57a7a4d..f57e303 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -4,6 +4,8 @@ import hashlib
 import os
 import re
 import zlib
+import threading
+import functools
 
 from .arvfile import ArvadosFileBase
 from arvados.retry import retry_method
@@ -315,6 +317,9 @@ class StreamReader(object):
     def locators_and_ranges(self, range_start, range_size):
         return locators_and_ranges(self._data_locators, range_start, range_size)
 
+    def _keepget(self, locator, num_retries=None):
+        self._keep.get(locator, num_retries=num_retries)
+
     @retry_method
     def readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
@@ -324,7 +329,7 @@ class StreamReader(object):
             self._keep = KeepClient(num_retries=self.num_retries)
         data = []
         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
-            data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
+            data.append(self._keepget(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
         return ''.join(data)
 
     def manifest_text(self, strip=False):
@@ -339,3 +344,69 @@ class StreamReader(object):
                                         for seg in f.segments])
                               for f in self._files.values()])
         return ' '.join(manifest_text) + '\n'
+
+class BufferBlock(object):
+    def __init__(self, locator, streamoffset):
+        self.locator = locator
+        self.buffer_block = bytearray(config.KEEP_BLOCK_SIZE)
+        self.buffer_view = memoryview(self.buffer_block)
+        self.write_pointer = 0
+        self.locator_list_entry = [locator, 0, streamoffset]
+
+    def append(self, data):
+        self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
+        self.write_pointer += len(data)
+        self.locator_list_entry[1] = self.write_pointer
+
+class StreamWriter(StreamReader):
+    def __init__(self, tokens, keep=None, debug=False, _empty=False,
+                 num_retries=0):
+        super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
+        self.mutex = threading.Lock()
+        self.current_bblock = None
+        self.bufferblocks = {}
+
+    # Proxy the methods listed below to self.nodes.
+    def _proxy_method(name):
+        method = getattr(StreamReader, name)
+        @functools.wraps(method, ('__name__', '__doc__'))
+        def wrapper(self, *args, **kwargs):
+            with self.mutex:
+                return method(self, *args, **kwargs)
+        return wrapper
+
+    for _method_name in ['name', 'files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
+        locals()[_method_name] = _proxy_method(_method_name)
+
+    def _keepget(self, locator, num_retries=None):
+        if locator in self.bufferblocks:
+            bb = self.bufferblocks[locator]
+            return str(bb.buffer_block[0:bb.write_pointer])
+        else:
+            return self._keep.get(locator, num_retries=num_retries)
+
+    def append(self, data):
+        with self.mutex:
+            if self.current_bblock is None:
+                streamoffset = sum([x[1] for x in self._data_locators])
+                self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
+                self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+                self._data_locators.append(self.current_bblock.locator_list_entry)
+            self.current_bblock.append(data)
+
+class StreamFileWriter(StreamFileReader):
+    def __init__(self, name, mode):
+        super(StreamFileWriter, self).__init__(name, mode)
+        self.mutex = threading.Lock()
+
+    # Proxy the methods listed below to self.nodes.
+    def _proxy_method(name):
+        method = getattr(StreamFileReader, name)
+        @functools.wraps(method, ('__name__', '__doc__'))
+        def wrapper(self, *args, **kwargs):
+            with self.mutex:
+                return method(self, *args, **kwargs)
+        return wrapper
+
+    for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
+        locals()[_method_name] = _proxy_method(_method_name)
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 08a3d28..0e26f27 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -8,7 +8,7 @@ import os
 import unittest
 
 import arvados
-from arvados import StreamReader, StreamFileReader
+from arvados import StreamReader, StreamFileReader, StreamWriter
 
 import arvados_testutil as tutil
 import run_test_server
@@ -272,6 +272,24 @@ class StreamFileReadlinesTestCase(StreamFileReadTestCase):
     def read_for_test(self, reader, byte_count, **kwargs):
         return ''.join(reader.readlines(**kwargs))
 
+class StreamWriterTestCase(unittest.TestCase):
+    class MockKeep(object):
+        def __init__(self, blocks):
+            self.blocks = blocks
+        def get(self, locator, num_retries=0):
+            return self.blocks[locator]
+
+    def test_init(self):
+        stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        self.assertEqual(stream.readfrom(0, 5), "01234")
+
+    def test_append(self):
+        stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+                              keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        self.assertEqual(stream.readfrom(5, 8), "56789")
+        stream.append("foo")
+        self.assertEqual(stream.readfrom(5, 8), "56789foo")
 
 if __name__ == '__main__':
     unittest.main()

commit fef134f2bab9d20e79e265e78de9ec83131e9f90
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Dec 12 16:26:07 2014 -0500

    3198: Start to refactor locators_and_ranges to use for replacing ranges

diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index c263dd8..57a7a4d 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -16,20 +16,7 @@ BLOCKSIZE = 1
 OFFSET = 2
 SEGMENTSIZE = 3
 
-def locators_and_ranges(data_locators, range_start, range_size, debug=False):
-    '''
-    Get blocks that are covered by the range
-    data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
-    range_start: start of range
-    range_size: size of range
-    returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
-    '''
-    if range_size == 0:
-        return []
-    resp = []
-    range_start = long(range_start)
-    range_size = long(range_size)
-    range_end = range_start + range_size
+def first_block(data_locators, range_start, range_size, debug=False):
     block_start = 0L
 
     # range_start/block_start is the inclusive lower bound
@@ -49,7 +36,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
     while not (range_start >= block_start and range_start < block_end):
         if lo == i:
             # must be out of range, fail
-            return []
+            return None
         if range_start > block_start:
             lo = i
         else:
@@ -60,6 +47,27 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
         block_start = data_locators[i][OFFSET]
         block_end = block_start + block_size
 
+    return i
+
+def locators_and_ranges(data_locators, range_start, range_size, debug=False):
+    '''
+    Get blocks that are covered by the range
+    data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+    range_start: start of range
+    range_size: size of range
+    returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
+    '''
+    if range_size == 0:
+        return []
+    resp = []
+    range_start = long(range_start)
+    range_size = long(range_size)
+    range_end = range_start + range_size
+
+    i = first_block(data_locators, range_start, range_size, debug)
+    if i is None:
+        return []
+
     while i < len(data_locators):
         locator, block_size, block_start = data_locators[i]
         block_end = block_start + block_size
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 80ad6b3..fce0576 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -582,11 +582,6 @@ class ProjectDirectory(Directory):
 
             contents = arvados.util.list_all(self.api.groups().contents,
                                              self.num_retries, uuid=self.uuid)
-            # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(
-                self.api.links().list, self.num_retries,
-                filters=[['tail_uuid', '=', self.uuid],
-                         ['link_class', '=', 'name']])
 
         # end with llfuse.lock_released, re-acquire lock
 
@@ -914,5 +909,5 @@ class Operations(llfuse.Operations):
     # arv-mount.
     # The workaround is to implement it with the proper number of parameters,
     # and then everything works out.
-    def create(self, p1, p2, p3, p4, p5):
+    def create(self, inode_parent, name, mode, flags, ctx):
         raise llfuse.FUSEError(errno.EROFS)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list