[ARVADOS] updated: 89796f01a6ea3cb553a61be6ce92883a1decf003

git at public.curoverse.com git at public.curoverse.com
Sat Jan 3 23:23:59 EST 2015


Summary of changes:
 sdk/python/arvados/arvfile.py    | 23 ++++++----
 sdk/python/arvados/keep.py       | 52 +++++++++++++++------
 sdk/python/arvados/ranges.py     |  4 +-
 sdk/python/tests/test_arvfile.py | 99 ++++++++++++++++++++++++++++++++++++++--
 4 files changed, 149 insertions(+), 29 deletions(-)

       via  89796f01a6ea3cb553a61be6ce92883a1decf003 (commit)
      from  22286e8b81fa7644500e197b95e6d6417ed25f7e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 89796f01a6ea3cb553a61be6ce92883a1decf003
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Sat Jan 3 23:25:22 2015 -0500

    3198: Apply StreamFileReader tests to ArvadosFileReader

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index c46019a..1c21d83 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -304,21 +304,23 @@ class BlockManager(object):
         if self._put_threads is None:
             self._put_queue = Queue.Queue(maxsize=2)
             self._put_errors = Queue.Queue()
-            self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+            self._put_threads = [threading.Thread(target=worker, args=(self,)),
+                                 threading.Thread(target=worker, args=(self,))]
             for t in self._put_threads:
+                t.daemon = True
                 t.start()
 
         block.state = BufferBlock.PENDING
         self._put_queue.put(block)
 
-    def get_block(self, locator, num_retries):
+    def get_block(self, locator, num_retries, cache_only=False):
         if locator in self._bufferblocks:
             bb = self._bufferblocks[locator]
             if bb.state != BufferBlock.COMMITTED:
                 return bb.buffer_view[0:bb.write_pointer].tobytes()
             else:
                 locator = bb._locator
-        return self._keep.get(locator, num_retries=num_retries)
+        return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
 
     def commit_all(self):
         for k,v in self._bufferblocks.items():
@@ -352,8 +354,9 @@ class BlockManager(object):
             self._prefetch_queue = Queue.Queue()
             self._prefetch_threads = [threading.Thread(target=worker, args=(self,)),
                                       threading.Thread(target=worker, args=(self,))]
-            self._prefetch_threads[0].start()
-            self._prefetch_threads[1].start()
+            for t in self._prefetch_threads:
+                t.daemon = True
+                t.start()
         self._prefetch_queue.put(locator)
 
 class ArvadosFile(object):
@@ -366,7 +369,7 @@ class ArvadosFile(object):
         self._modified = True
         self.segments = []
         for s in segments:
-            self.add_segment(stream, s.range_start, s.range_size)
+            self.add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
 
     def set_unmodified(self):
@@ -402,9 +405,11 @@ class ArvadosFile(object):
             self.parent._my_block_manager().block_prefetch(lr.locator)
 
         for lr in locators_and_ranges(self.segments, offset, size):
-            # TODO: if data is empty, wait on block get, otherwise only
-            # get more data if the block is already in the cache.
-            data.append(self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+            d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+            if d:
+                data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
+            else:
+                break
         return ''.join(data)
 
     def _repack_writes(self):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 36ec56c..a087838 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -186,24 +186,34 @@ class KeepBlockCache(object):
                         break
                 sm = sum([slot.size() for slot in self._cache])
 
+    def _get(self, locator):
+        # Test if the locator is already in the cache
+        for i in xrange(0, len(self._cache)):
+            if self._cache[i].locator == locator:
+                n = self._cache[i]
+                if i != 0:
+                    # move it to the front
+                    del self._cache[i]
+                    self._cache.insert(0, n)
+                return n
+        return None
+
+    def get(self, locator):
+        with self._cache_lock:
+            return self._get(locator)
+
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
         with self._cache_lock:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
-
-            # Add a new cache slot for the locator
-            n = KeepBlockCache.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
+            n = self._get(locator)
+            if n:
+                return n, False
+            else:
+                # Add a new cache slot for the locator
+                n = KeepBlockCache.CacheSlot(locator)
+                self._cache.insert(0, n)
+                return n, True
 
 class KeepClient(object):
 
@@ -576,7 +586,7 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
+    def get(self, loc_s, num_retries=None, cache_only=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -595,12 +605,21 @@ class KeepClient(object):
           to fetch data from every available Keep service, along with any
           that are named in location hints in the locator.  The default value
           is set when the KeepClient is initialized.
+        * cache_only: If true, return the block data only if already present in
+          cache, otherwise return None.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
+        if cache_only:
+            slot = self.block_cache.get(expect_hash)
+            if slot.ready.is_set():
+                return slot.get()
+            else:
+                return None
+
         slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
@@ -741,3 +760,6 @@ class KeepClient(object):
             return ''
         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
             return f.read()
+
+    def is_cached(self, locator):
+        return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
index 8c377e2..eeb37e29 100644
--- a/sdk/python/arvados/ranges.py
+++ b/sdk/python/arvados/ranges.py
@@ -6,7 +6,7 @@ class Range(object):
         self.segment_offset = segment_offset
 
     def __repr__(self):
-        return "[\"%s\", %i, %i, %i]" % (self.locator, self.range_start, self.range_size, self.segment_offset)
+        return "Range(\"%s\", %i, %i, %i)" % (self.locator, self.range_start, self.range_size, self.segment_offset)
 
 def first_block(data_locators, range_start, range_size, debug=False):
     block_start = 0L
@@ -55,7 +55,7 @@ class LocatorAndRange(object):
                  self.segment_size == other.segment_size)
 
     def __repr__(self):
-        return "[\"%s\", %i, %i, %i]" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
+        return "LocatorAndRange(\"%s\", %i, %i, %i)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
 
 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
     '''
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 5bc54f5..91cfc79 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -9,17 +9,17 @@ import unittest
 import hashlib
 
 import arvados
-from arvados import StreamReader, StreamFileReader, Range, import_manifest, export_manifest
+from arvados import ArvadosFile, ArvadosFileReader, Range, import_manifest, export_manifest
 
 import arvados_testutil as tutil
-
+from test_stream import StreamFileReaderTestCase
 
 class ArvadosFileWriterTestCase(unittest.TestCase):
     class MockKeep(object):
         def __init__(self, blocks):
             self.blocks = blocks
             self.requests = []
-        def get(self, locator, num_retries=0):
+        def get(self, locator, num_retries=0, cache_only=False):
             self.requests.append(locator)
             return self.blocks.get(locator)
         def put(self, data):
@@ -337,3 +337,96 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             r = c.open("count.txt", "r")
             self.assertEqual("0123", r.read(4))
         self.assertEqual(["2e9ec317e197819358fbc43afca7d837+8", "2e9ec317e197819358fbc43afca7d837+8", "e8dc4081b13434b45189a720b77b6818+8"], keep.requests)
+
+
+class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
+    class MockParent(object):
+        class MockBlockMgr(object):
+            def __init__(self, blocks, nocache):
+                self.blocks = blocks
+                self.nocache = nocache
+
+            def block_prefetch(self, loc):
+                pass
+
+            def get_block(self, loc, num_retries=0, cache_only=False):
+                if self.nocache and cache_only:
+                    return None
+                return self.blocks[loc]
+
+        def __init__(self, blocks, nocache):
+            self.blocks = blocks
+            self.nocache = nocache
+
+        def _my_block_manager(self):
+            return ArvadosFileReaderTestCase.MockParent.MockBlockMgr(self.blocks, self.nocache)
+
+    def make_count_reader(self, nocache=False):
+        stream = []
+        n = 0
+        blocks = {}
+        for d in ['01234', '34567', '67890']:
+            loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+            blocks[loc] = d
+            stream.append(Range(loc, n, len(d)))
+            n += len(d)
+        af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
+        return ArvadosFileReader(af, "count.txt")
+
+    def test_read_returns_first_block(self):
+        # read() calls will be aligned on block boundaries - see #3663.
+        sfile = self.make_count_reader(nocache=True)
+        self.assertEqual('123', sfile.read(10))
+
+    def test_successive_reads(self):
+        sfile = self.make_count_reader(nocache=True)
+        for expect in ['123', '456', '789', '']:
+            self.assertEqual(expect, sfile.read(10))
+
+    def test_tell_after_block_read(self):
+        sfile = self.make_count_reader(nocache=True)
+        sfile.read(5)
+        self.assertEqual(3, sfile.tell())
+
+# class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
+#     def reader_for(self, coll_name, **kwargs):
+#         return StreamReader(self.manifest_for(coll_name).split(),
+#                             self.keep_client(), **kwargs)
+
+#     def read_for_test(self, reader, byte_count, **kwargs):
+#         return reader.readfrom(0, byte_count, **kwargs)
+
+#     def test_manifest_text_without_keep_client(self):
+#         mtext = self.manifest_for('multilevel_collection_1')
+#         for line in mtext.rstrip('\n').split('\n'):
+#             reader = StreamReader(line.split())
+#             self.assertEqual(line + '\n', reader.manifest_text())
+
+
+# class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
+#     def reader_for(self, coll_name, **kwargs):
+#         return StreamReader(self.manifest_for(coll_name).split(),
+#                             self.keep_client(), **kwargs).all_files()[0]
+
+#     def read_for_test(self, reader, byte_count, **kwargs):
+#         return reader.read(byte_count, **kwargs)
+
+
+# class StreamFileReadFromTestCase(StreamFileReadTestCase):
+#     def read_for_test(self, reader, byte_count, **kwargs):
+#         return reader.readfrom(0, byte_count, **kwargs)
+
+
+# class StreamFileReadAllTestCase(StreamFileReadTestCase):
+#     def read_for_test(self, reader, byte_count, **kwargs):
+#         return ''.join(reader.readall(**kwargs))
+
+
+# class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
+#     def read_for_test(self, reader, byte_count, **kwargs):
+#         return ''.join(reader.readall_decompressed(**kwargs))
+
+
+# class StreamFileReadlinesTestCase(StreamFileReadTestCase):
+#     def read_for_test(self, reader, byte_count, **kwargs):
+#         return ''.join(reader.readlines(**kwargs))

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list