[ARVADOS] created: 5bf8784175517327b334fb4f7c8d3b8ee505353d
git at public.curoverse.com
git at public.curoverse.com
Mon Dec 15 17:03:20 EST 2014
at 5bf8784175517327b334fb4f7c8d3b8ee505353d (commit)
commit 5bf8784175517327b334fb4f7c8d3b8ee505353d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 15 17:03:57 2014 -0500
3198: Initial support appending to streams.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index d530f58..5ab7e77 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -302,8 +302,6 @@ class _WriterFile(ArvadosFileBase):
class CollectionWriter(CollectionBase):
- KEEP_BLOCK_SIZE = 2**26
-
def __init__(self, api_client=None, num_retries=0):
"""Instantiate a CollectionWriter.
@@ -369,7 +367,7 @@ class CollectionWriter(CollectionBase):
def _work_file(self):
while True:
- buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
+ buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
if not buf:
break
self.write(buf)
@@ -441,7 +439,7 @@ class CollectionWriter(CollectionBase):
self._data_buffer.append(newdata)
self._data_buffer_len += len(newdata)
self._current_stream_length += len(newdata)
- while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
+ while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
self.flush_data()
def open(self, streampath, filename=None):
@@ -477,8 +475,8 @@ class CollectionWriter(CollectionBase):
data_buffer = ''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
- self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
- self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
+ self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
+ self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
def start_new_file(self, newfilename=None):
diff --git a/sdk/python/arvados/config.py b/sdk/python/arvados/config.py
index a0c3cc6..d293a31 100644
--- a/sdk/python/arvados/config.py
+++ b/sdk/python/arvados/config.py
@@ -12,6 +12,7 @@ if os.environ.get('HOME') is not None:
else:
default_config_file = ''
+KEEP_BLOCK_SIZE = 2**26
EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
def initialize(config_file=default_config_file):
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 57a7a4d..f57e303 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -4,6 +4,8 @@ import hashlib
import os
import re
import zlib
+import threading
+import functools
from .arvfile import ArvadosFileBase
from arvados.retry import retry_method
@@ -315,6 +317,9 @@ class StreamReader(object):
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
+ def _keepget(self, locator, num_retries=None):
+ self._keep.get(locator, num_retries=num_retries)
+
@retry_method
def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
@@ -324,7 +329,7 @@ class StreamReader(object):
self._keep = KeepClient(num_retries=self.num_retries)
data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
- data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
+ data.append(self._keepget(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
return ''.join(data)
def manifest_text(self, strip=False):
@@ -339,3 +344,69 @@ class StreamReader(object):
for seg in f.segments])
for f in self._files.values()])
return ' '.join(manifest_text) + '\n'
+
+class BufferBlock(object):
+ def __init__(self, locator, streamoffset):
+ self.locator = locator
+ self.buffer_block = bytearray(config.KEEP_BLOCK_SIZE)
+ self.buffer_view = memoryview(self.buffer_block)
+ self.write_pointer = 0
+ self.locator_list_entry = [locator, 0, streamoffset]
+
+ def append(self, data):
+ self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
+ self.write_pointer += len(data)
+ self.locator_list_entry[1] = self.write_pointer
+
+class StreamWriter(StreamReader):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False,
+ num_retries=0):
+ super(StreamWriter, self).__init__(tokens, keep, debug, _empty, num_retries)
+ self.mutex = threading.Lock()
+ self.current_bblock = None
+ self.bufferblocks = {}
+
+ # Proxy the methods listed below to self.nodes.
+ def _proxy_method(name):
+ method = getattr(StreamReader, name)
+ @functools.wraps(method, ('__name__', '__doc__'))
+ def wrapper(self, *args, **kwargs):
+ with self.mutex:
+ return method(self, *args, **kwargs)
+ return wrapper
+
+ for _method_name in ['name', 'files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']:
+ locals()[_method_name] = _proxy_method(_method_name)
+
+ def _keepget(self, locator, num_retries=None):
+ if locator in self.bufferblocks:
+ bb = self.bufferblocks[locator]
+ return str(bb.buffer_block[0:bb.write_pointer])
+ else:
+ return self._keep.get(locator, num_retries=num_retries)
+
+ def append(self, data):
+ with self.mutex:
+ if self.current_bblock is None:
+ streamoffset = sum([x[1] for x in self._data_locators])
+ self.current_bblock = BufferBlock("bufferblock%i" % len(self.bufferblocks), streamoffset)
+ self.bufferblocks[self.current_bblock.locator] = self.current_bblock
+ self._data_locators.append(self.current_bblock.locator_list_entry)
+ self.current_bblock.append(data)
+
+class StreamFileWriter(StreamFileReader):
+ def __init__(self, name, mode):
+ super(StreamFileWriter, self).__init__(name, mode)
+ self.mutex = threading.Lock()
+
+ # Proxy the methods listed below to self.nodes.
+ def _proxy_method(name):
+ method = getattr(StreamFileReader, name)
+ @functools.wraps(method, ('__name__', '__doc__'))
+ def wrapper(self, *args, **kwargs):
+ with self.mutex:
+ return method(self, *args, **kwargs)
+ return wrapper
+
+ for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']:
+ locals()[_method_name] = _proxy_method(_method_name)
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 08a3d28..0e26f27 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -8,7 +8,7 @@ import os
import unittest
import arvados
-from arvados import StreamReader, StreamFileReader
+from arvados import StreamReader, StreamFileReader, StreamWriter
import arvados_testutil as tutil
import run_test_server
@@ -272,6 +272,24 @@ class StreamFileReadlinesTestCase(StreamFileReadTestCase):
def read_for_test(self, reader, byte_count, **kwargs):
return ''.join(reader.readlines(**kwargs))
+class StreamWriterTestCase(unittest.TestCase):
+ class MockKeep(object):
+ def __init__(self, blocks):
+ self.blocks = blocks
+ def get(self, locator, num_retries=0):
+ return self.blocks[locator]
+
+ def test_init(self):
+ stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ self.assertEqual(stream.readfrom(0, 5), "01234")
+
+ def test_append(self):
+ stream = StreamWriter(['.', '781e5e245d69b566979b86e28d23f2c7+10', '0:10:count.txt'],
+ keep=StreamWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ self.assertEqual(stream.readfrom(5, 8), "56789")
+ stream.append("foo")
+ self.assertEqual(stream.readfrom(5, 8), "56789foo")
if __name__ == '__main__':
unittest.main()
commit fef134f2bab9d20e79e265e78de9ec83131e9f90
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Dec 12 16:26:07 2014 -0500
3198: Start to refactor locators_and_ranges to use for replacing ranges
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index c263dd8..57a7a4d 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -16,20 +16,7 @@ BLOCKSIZE = 1
OFFSET = 2
SEGMENTSIZE = 3
-def locators_and_ranges(data_locators, range_start, range_size, debug=False):
- '''
- Get blocks that are covered by the range
- data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
- range_start: start of range
- range_size: size of range
- returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
- '''
- if range_size == 0:
- return []
- resp = []
- range_start = long(range_start)
- range_size = long(range_size)
- range_end = range_start + range_size
+def first_block(data_locators, range_start, range_size, debug=False):
block_start = 0L
# range_start/block_start is the inclusive lower bound
@@ -49,7 +36,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
while not (range_start >= block_start and range_start < block_end):
if lo == i:
# must be out of range, fail
- return []
+ return None
if range_start > block_start:
lo = i
else:
@@ -60,6 +47,27 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
block_start = data_locators[i][OFFSET]
block_end = block_start + block_size
+ return i
+
+def locators_and_ranges(data_locators, range_start, range_size, debug=False):
+ '''
+ Get blocks that are covered by the range
+ data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
+ range_start: start of range
+ range_size: size of range
+ returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
+ '''
+ if range_size == 0:
+ return []
+ resp = []
+ range_start = long(range_start)
+ range_size = long(range_size)
+ range_end = range_start + range_size
+
+ i = first_block(data_locators, range_start, range_size, debug)
+ if i is None:
+ return []
+
while i < len(data_locators):
locator, block_size, block_start = data_locators[i]
block_end = block_start + block_size
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 80ad6b3..fce0576 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -582,11 +582,6 @@ class ProjectDirectory(Directory):
contents = arvados.util.list_all(self.api.groups().contents,
self.num_retries, uuid=self.uuid)
- # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
- contents += arvados.util.list_all(
- self.api.links().list, self.num_retries,
- filters=[['tail_uuid', '=', self.uuid],
- ['link_class', '=', 'name']])
# end with llfuse.lock_released, re-acquire lock
@@ -914,5 +909,5 @@ class Operations(llfuse.Operations):
# arv-mount.
# The workaround is to implement it with the proper number of parameters,
# and then everything works out.
- def create(self, p1, p2, p3, p4, p5):
+ def create(self, inode_parent, name, mode, flags, ctx):
raise llfuse.FUSEError(errno.EROFS)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list