[ARVADOS] updated: 89796f01a6ea3cb553a61be6ce92883a1decf003
git at public.curoverse.com
git at public.curoverse.com
Sat Jan 3 23:23:59 EST 2015
Summary of changes:
sdk/python/arvados/arvfile.py | 23 ++++++----
sdk/python/arvados/keep.py | 52 +++++++++++++++------
sdk/python/arvados/ranges.py | 4 +-
sdk/python/tests/test_arvfile.py | 99 ++++++++++++++++++++++++++++++++++++++--
4 files changed, 149 insertions(+), 29 deletions(-)
via 89796f01a6ea3cb553a61be6ce92883a1decf003 (commit)
from 22286e8b81fa7644500e197b95e6d6417ed25f7e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 89796f01a6ea3cb553a61be6ce92883a1decf003
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Sat Jan 3 23:25:22 2015 -0500
3198: Apply StreamFileReader tests to ArvadosFileReader
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index c46019a..1c21d83 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -304,21 +304,23 @@ class BlockManager(object):
if self._put_threads is None:
self._put_queue = Queue.Queue(maxsize=2)
self._put_errors = Queue.Queue()
- self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+ self._put_threads = [threading.Thread(target=worker, args=(self,)),
+ threading.Thread(target=worker, args=(self,))]
for t in self._put_threads:
+ t.daemon = True
t.start()
block.state = BufferBlock.PENDING
self._put_queue.put(block)
- def get_block(self, locator, num_retries):
+ def get_block(self, locator, num_retries, cache_only=False):
if locator in self._bufferblocks:
bb = self._bufferblocks[locator]
if bb.state != BufferBlock.COMMITTED:
return bb.buffer_view[0:bb.write_pointer].tobytes()
else:
locator = bb._locator
- return self._keep.get(locator, num_retries=num_retries)
+ return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
def commit_all(self):
for k,v in self._bufferblocks.items():
@@ -352,8 +354,9 @@ class BlockManager(object):
self._prefetch_queue = Queue.Queue()
self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
threading.Thread(target=worker, args=(self,))]
- self._prefetch_threads[0].start()
- self._prefetch_threads[1].start()
+ for t in self._prefetch_threads:
+ t.daemon = True
+ t.start()
self._prefetch_queue.put(locator)
class ArvadosFile(object):
@@ -366,7 +369,7 @@ class ArvadosFile(object):
self._modified = True
self.segments = []
for s in segments:
- self.add_segment(stream, s.range_start, s.range_size)
+ self.add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
def set_unmodified(self):
@@ -402,9 +405,11 @@ class ArvadosFile(object):
self.parent._my_block_manager().block_prefetch(lr.locator)
for lr in locators_and_ranges(self.segments, offset, size):
- # TODO: if data is empty, wait on block get, otherwise only
- # get more data if the block is already in the cache.
- data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ if d:
+ data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ else:
+ break
return ''.join(data)
def _repack_writes(self):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 36ec56c..a087838 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -186,24 +186,34 @@ class KeepBlockCache(object):
break
sm = sum([slot.size() for slot in self._cache])
+ def _get(self, locator):
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n
+ return None
+
+ def get(self, locator):
+ with self._cache_lock:
+ return self._get(locator)
+
def reserve_cache(self, locator):
'''Reserve a cache slot for the specified locator,
or return the existing slot.'''
with self._cache_lock:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
-
- # Add a new cache slot for the locator
- n = KeepBlockCache.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
+ n = self._get(locator)
+ if n:
+ return n, False
+ else:
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
class KeepClient(object):
@@ -576,7 +586,7 @@ class KeepClient(object):
return None
@retry.retry_method
- def get(self, loc_s, num_retries=None):
+ def get(self, loc_s, num_retries=None, cache_only=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
@@ -595,12 +605,21 @@ class KeepClient(object):
to fetch data from every available Keep service, along with any
that are named in location hints in the locator. The default value
is set when the KeepClient is initialized.
+ * cache_only: If true, return the block data only if already present in
+ cache, otherwise return None.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
+ if cache_only:
+ slot = self.block_cache.get(expect_hash)
+ if slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
+
slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
@@ -741,3 +760,6 @@ class KeepClient(object):
return ''
with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
return f.read()
+
+ def is_cached(self, locator):
+ return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
index 8c377e2..eeb37e29 100644
--- a/sdk/python/arvados/ranges.py
+++ b/sdk/python/arvados/ranges.py
@@ -6,7 +6,7 @@ class Range(object):
self.segment_offset = segment_offset
def __repr__(self):
- return "[\"%s\", %i, %i, %i]" % (self.locator, self.range_start, self.range_size, self.segment_offset)
+ return "Range(\"%s\", %i, %i, %i)" % (self.locator, self.range_start, self.range_size, self.segment_offset)
def first_block(data_locators, range_start, range_size, debug=False):
block_start = 0L
@@ -55,7 +55,7 @@ class LocatorAndRange(object):
self.segment_size == other.segment_size)
def __repr__(self):
- return "[\"%s\", %i, %i, %i]" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
+ return "LocatorAndRange(\"%s\", %i, %i, %i)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
def locators_and_ranges(data_locators, range_start, range_size, debug=False):
'''
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 5bc54f5..91cfc79 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -9,17 +9,17 @@ import unittest
import hashlib
import arvados
-from arvados import StreamReader, StreamFileReader, Range, import_manifest, export_manifest
+from arvados import ArvadosFile, ArvadosFileReader, Range, import_manifest, export_manifest
import arvados_testutil as tutil
-
+from test_stream import StreamFileReaderTestCase
class ArvadosFileWriterTestCase(unittest.TestCase):
class MockKeep(object):
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
- def get(self, locator, num_retries=0):
+ def get(self, locator, num_retries=0, cache_only=False):
self.requests.append(locator)
return self.blocks.get(locator)
def put(self, data):
@@ -337,3 +337,96 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
r = c.open("count.txt", "r")
self.assertEqual("0123", r.read(4))
self.assertEqual(["2e9ec317e197819358fbc43afca7d837+8", "2e9ec317e197819358fbc43afca7d837+8", "e8dc4081b13434b45189a720b77b6818+8"], keep.requests)
+
+
+class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
+ class MockParent(object):
+ class MockBlockMgr(object):
+ def __init__(self, blocks, nocache):
+ self.blocks = blocks
+ self.nocache = nocache
+
+ def block_prefetch(self, loc):
+ pass
+
+ def get_block(self, loc, num_retries=0, cache_only=False):
+ if self.nocache and cache_only:
+ return None
+ return self.blocks[loc]
+
+ def __init__(self, blocks, nocache):
+ self.blocks = blocks
+ self.nocache = nocache
+
+ def _my_block_manager(self):
+ return ArvadosFileReaderTestCase.MockParent.MockBlockMgr(self.blocks, self.nocache)
+
+ def make_count_reader(self, nocache=False):
+ stream = []
+ n = 0
+ blocks = {}
+ for d in ['01234', '34567', '67890']:
+ loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+ blocks[loc] = d
+ stream.append(Range(loc, n, len(d)))
+ n += len(d)
+ af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
+ return ArvadosFileReader(af, "count.txt")
+
+ def test_read_returns_first_block(self):
+ # read() calls will be aligned on block boundaries - see #3663.
+ sfile = self.make_count_reader(nocache=True)
+ self.assertEqual('123', sfile.read(10))
+
+ def test_successive_reads(self):
+ sfile = self.make_count_reader(nocache=True)
+ for expect in ['123', '456', '789', '']:
+ self.assertEqual(expect, sfile.read(10))
+
+ def test_tell_after_block_read(self):
+ sfile = self.make_count_reader(nocache=True)
+ sfile.read(5)
+ self.assertEqual(3, sfile.tell())
+
+# class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
+# def reader_for(self, coll_name, **kwargs):
+# return StreamReader(self.manifest_for(coll_name).split(),
+# self.keep_client(), **kwargs)
+
+# def read_for_test(self, reader, byte_count, **kwargs):
+# return reader.readfrom(0, byte_count, **kwargs)
+
+# def test_manifest_text_without_keep_client(self):
+# mtext = self.manifest_for('multilevel_collection_1')
+# for line in mtext.rstrip('\n').split('\n'):
+# reader = StreamReader(line.split())
+# self.assertEqual(line + '\n', reader.manifest_text())
+
+
+# class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
+# def reader_for(self, coll_name, **kwargs):
+# return StreamReader(self.manifest_for(coll_name).split(),
+# self.keep_client(), **kwargs).all_files()[0]
+
+# def read_for_test(self, reader, byte_count, **kwargs):
+# return reader.read(byte_count, **kwargs)
+
+
+# class StreamFileReadFromTestCase(StreamFileReadTestCase):
+# def read_for_test(self, reader, byte_count, **kwargs):
+# return reader.readfrom(0, byte_count, **kwargs)
+
+
+# class StreamFileReadAllTestCase(StreamFileReadTestCase):
+# def read_for_test(self, reader, byte_count, **kwargs):
+# return ''.join(reader.readall(**kwargs))
+
+
+# class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
+# def read_for_test(self, reader, byte_count, **kwargs):
+# return ''.join(reader.readall_decompressed(**kwargs))
+
+
+# class StreamFileReadlinesTestCase(StreamFileReadTestCase):
+# def read_for_test(self, reader, byte_count, **kwargs):
+# return ''.join(reader.readlines(**kwargs))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list