[ARVADOS] updated: 27d6124d152f99fc8f26cd2c9fc99d24ab46a7ea

git at public.curoverse.com git at public.curoverse.com
Fri Feb 14 17:10:51 EST 2014


Summary of changes:
 sdk/python/arvados/collection.py |    4 +-
 sdk/python/arvados/keep.py       |   43 +++++++
 sdk/python/arvados/stream.py     |  260 +++++++++++++++-----------------------
 3 files changed, 147 insertions(+), 160 deletions(-)

       via  27d6124d152f99fc8f26cd2c9fc99d24ab46a7ea (commit)
      from  519c5922c34da64a8cce9e4d0030892e9b4bdd83 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 27d6124d152f99fc8f26cd2c9fc99d24ab46a7ea
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Feb 14 17:11:55 2014 -0500

    Work in progress

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 2535669..96ae100 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -60,7 +60,7 @@ def normalize(collection):
             current_span = None
             fout = f.replace(' ', '\\040')
             for chunk in stream[f]:
-                chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.CHUNKOFFSET]
+                chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.OFFSET]
                 if current_span == None:
                     current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
                 else:
@@ -127,7 +127,7 @@ class CollectionReader(object):
         self._populate()
         resp = []
         for s in self._streams:
-            resp += [StreamReader(s)]
+            resp.append(StreamReader(s))
         return resp
 
     def all_files(self):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e1902d1..7d2b72a 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -138,6 +138,10 @@ class KeepClient(object):
     def __init__(self):
         self.lock = threading.Lock()
         self.service_roots = None
+        self._cache_lock = threading.Lock()
+        self._cache = []
+        # default 256 megabyte cache
+        self._cache_max = 256 * 1024 * 1024
 
     def shuffled_service_roots(self, hash):
         if self.service_roots == None:
@@ -173,6 +177,11 @@ class KeepClient(object):
         if 'KEEP_LOCAL_STORE' in os.environ:
             return KeepClient.local_store_get(locator)
         expect_hash = re.sub(r'\+.*', '', locator)
+
+        c = self.check_cache(expect_hash)
+        if c:
+            return c
+
         for service_root in self.shuffled_service_roots(expect_hash):
             url = service_root + expect_hash
             api_token = config.get('ARVADOS_API_TOKEN')
@@ -180,12 +189,15 @@ class KeepClient(object):
                        'Accept': 'application/octet-stream'}
             blob = self.get_url(url, headers, expect_hash)
             if blob:
+                self.put_cache(expect_hash, blob)
                 return blob
+
         for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
             instance = location_hint.group(1)
             url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash
             blob = self.get_url(url, {}, expect_hash)
             if blob:
+                self.put_cache(expect_hash, blob)
                 return blob
         raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
 
@@ -234,6 +246,37 @@ class KeepClient(object):
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, want_copies, have_copies))
 
+    def put_cache(self, locator, data):
+        """Put a block into the cache."""
+        if self.check_cache(locator) != None:
+            return
+        self.cache_lock.acquire()
+        try:
+            # first check cache size and delete stuff from the end if necessary
+            sm = sum([len(a[1]) for a in self._cache]) + len(data)
+            while sum > self._cache_max:
+                del self._cache[-1]
+                sm = sum([len(a[1]) for a in self._cache]) + len(data)
+
+            # now add the new block at the front of the list
+            self._cache.insert(0, [locator, data])
+        finally:
+            self.cache_lock.release()
+
+    def check_cache(self, locator):
+        """Get a block from the cache.  Also moves the block to the front of the list."""
+        self._cache_lock.acquire()
+        try:
+            for i in xrange(0, len(self._cache)):
+                if self._cache[i][0] == locator:
+                    n = self._cache[i]
+                    del self._cache[i]
+                    self._cache.insert(0, n)
+                    return n[1]   
+        finally:
+            self.cache_lock.release()
+        return None            
+
     @staticmethod
     def sign_for_old_server(data_hash, data):
         return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 84461b3..4a53324 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -22,13 +22,48 @@ from keep import *
 import config
 import errors
 
+LOCATOR = 0
+BLOCKSIZE = 1
+OFFSET = 2
+SEGMENTSIZE = 3
+
+def locators_and_ranges(self, data_locators, range_start, range_size):
+    '''returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range'''
+    resp = []
+    range_start = long(range_start)
+    range_size = long(range_size)
+    range_end = range_start + range_size
+    block_start = 0L
+    for locator, block_size, block_start in data_locators:
+        block_end = block_start + block_size
+        if range_end < block_start:
+            # range ends before this block starts, so don't look at any more locators
+            break
+        if range_start > block_end:
+            # range starts after this block ends, so go to next block
+            next
+        elif range_start >= block_start and range_end <= block_end:
+            # range starts and ends in this block
+            resp.append([locator, block_size, range_start - block_start, range_size])
+        elif range_start >= block_start:
+            # range starts in this block
+            resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+        elif range_start < block_start and range_end > block_end:
+            # range starts in a previous block and extends to further blocks
+            resp.append([locator, block_size, 0L, block_size])
+        elif range_start < block_start and range_end <= block_end:
+            # range starts in a previous block and ends in this block
+            resp.append([locator, block_size, 0L, range_end - block_start])
+        block_start = block_end
+    return resp
+
+
 class StreamFileReader(object):
-    def __init__(self, stream, pos, size, name):
+    def __init__(self, stream, segments, name):
         self._stream = stream
-        self._pos = pos
-        self._size = size
+        self.segments = segments
         self._name = name
-        self._filepos = 0
+        self._filepos = 0L
 
     def name(self):
         return self._name
@@ -36,16 +71,29 @@ class StreamFileReader(object):
     def decompressed_name(self):
         return re.sub('\.(bz2|gz)$', '', self._name)
 
-    def size(self):
-        return self._size
-
     def stream_name(self):
         return self._stream.name()
 
+    def seek(self, pos):
+        self._filepos = min(max(pos, 0L), self.size())
+
+    def tell(self, pos):
+        return self._filepos
+
+    def size(self):
+        n = self.data_locators[-1]
+        return n[self.OFFSET] + n[self.BLOCKSIZE]
+
     def read(self, size, **kwargs):
-        self._stream.seek(self._pos + self._filepos)
-        data = self._stream.read(min(size, self._size - self._filepos))
-        self._filepos += len(data)
+        """Read up to 'size' bytes from the stream, starting at the current file position"""
+        if size == 0:
+            return ''
+
+        data = ''
+        for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
+            self._stream.seek(locator+segmentoffset)
+            data += self._stream.read(segmentsize)
+            self._filepos += len(data)
         return data
 
     def readall(self, size=2**20, **kwargs):
@@ -55,25 +103,22 @@ class StreamFileReader(object):
                 break
             yield data
 
-    def seek(self, pos):
-        self._filepos = pos
-
     def bunzip2(self, size):
         decompressor = bz2.BZ2Decompressor()
-        for chunk in self.readall(size):
-            data = decompressor.decompress(chunk)
+        for segment in self.readall(size):
+            data = decompressor.decompress(segment)
             if data and data != '':
                 yield data
 
     def gunzip(self, size):
         decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
-        for chunk in self.readall(size):
-            data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
+        for segment in self.readall(size):
+            data = decompressor.decompress(decompressor.unconsumed_tail + segment)
             if data and data != '':
                 yield data
 
     def readall_decompressed(self, size=2**20):
-        self._stream.seek(self._pos + self._filepos)
+        self.seek(0)
         if re.search('\.bz2$', self._name):
             return self.bunzip2(size)
         elif re.search('\.gz$', self._name):
@@ -101,173 +146,72 @@ class StreamFileReader(object):
         if data != '':
             yield data
 
-    def stream_offset(self):
-        return self._pos
-
-    def as_manifest(self):
-        if self.size() == 0:
-            return ("%s %s 0:0:%s\n"
-                    % (self._stream.name(), config.EMPTY_BLOCK_LOCATOR, self.name()))
-        return string.join(self._stream.tokens_for_range(self._pos, self._size),
-                           " ") + "\n"
 
 class StreamReader(object):
     def __init__(self, tokens):
         self._tokens = tokens
-        self._current_datablock_data = None
-        self._current_datablock_pos = 0
-        self._current_datablock_index = -1
         self._pos = 0L
 
         self._stream_name = None
         self.data_locators = []
-        self.files = []
+        self.files = {}
+
+        streamoffset = 0L
 
         for tok in self._tokens:
             if self._stream_name == None:
                 self._stream_name = tok.replace('\\040', ' ')
-            elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
-                self.data_locators += [tok]
-            elif re.search(r'^\d+:\d+:\S+', tok):
-                pos, size, name = tok.split(':',2)
-                self.files += [[long(pos), long(size), name.replace('\\040', ' ')]]
-            else:
-                raise errors.SyntaxError("Invalid manifest format")
-
+                continue
+
+            s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
+            if s:
+                blocksize = long(s.group(1))
+                self.data_locators.append([tok, blocksize, streamoffset])
+                streamoffset += blocksize
+                continue
+
+            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
+            if s:
+                pos = long(s.group(1))
+                size = long(s.group(2))
+                name = s.group(3).replace('\\040', ' ')
+                if name not in self.files:
+                    self.files[name] = StreamFileReader(self, [[pos, size, 0]], name)
+                else:
+                    n = self.files[name]
+                    n.segments.append([pos, size, n.size()])
+                continue
+
+            raise errors.SyntaxError("Invalid manifest format")
+            
     def tokens(self):
         return self._tokens
 
-    def tokens_for_range(self, range_start, range_size):
-        resp = [self._stream_name]
-        return_all_tokens = False
-        block_start = 0
-        token_bytes_skipped = 0
-        for locator in self.data_locators:
-            sizehint = re.search(r'\+(\d+)', locator)
-            if not sizehint:
-                return_all_tokens = True
-            if return_all_tokens:
-                resp += [locator]
-                next
-            blocksize = int(sizehint.group(0))
-            if range_start + range_size <= block_start:
-                break
-            if range_start < block_start + blocksize:
-                resp += [locator]
-            else:
-                token_bytes_skipped += blocksize
-            block_start += blocksize
-        for f in self.files:
-            if ((f[0] < range_start + range_size)
-                and
-                (f[0] + f[1] > range_start)
-                and
-                f[1] > 0):
-                resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
-        return resp
-
-    LOCATOR = 0
-    BLOCKSIZE = 1
-    CHUNKOFFSET = 2
-    CHUNKSIZE = 3
-
-    def locators_and_ranges(self, range_start, range_size):
-        '''returns list of [block locator, blocksize, chunk offset, chunk size] that satisfies the range'''
-        resp = []
-        return_all_tokens = False
-        range_start = long(range_start)
-        range_size = long(range_size)
-        range_end = range_start + range_size
-        block_start = 0L
-        for locator in self.data_locators:
-            sizehint = re.search(r'[0-9a-f]{32}\+(\d+)', locator)
-            if not sizehint:
-                raise Exception("Manifest must include block sizes to be normalized")
-            block_size = long(sizehint.group(1))
-            block_end = block_start + block_size
-            if range_end < block_start:
-                # range ends before this block starts, so don't look at any more locators
-                break
-            if range_start > block_end:
-                # range starts after this block ends, so go to next block
-                next
-            elif range_start >= block_start and range_end <= block_end:
-                # range starts and ends in this block
-                resp.append([locator, block_size, range_start - block_start, range_size])
-            elif range_start >= block_start:
-                # range starts in this block
-                resp.append([locator, block_size, range_start - block_start, block_end - range_start])
-            elif range_start < block_start and range_end > block_end:
-                # range starts in a previous block and extends to further blocks
-                resp.append([locator, block_size, 0L, block_size])
-            elif range_start < block_start and range_end <= block_end:
-                # range starts in a previous block and ends in this block
-                resp.append([locator, block_size, 0L, range_end - block_start])
-            block_start = block_end
-        return resp
-
     def name(self):
         return self._stream_name
 
     def all_files(self):
-        for f in self.files:
-            pos, size, name = f
-            yield StreamFileReader(self, pos, size, name)
-
-    def nextdatablock(self):
-        if self._current_datablock_index < 0:
-            self._current_datablock_pos = 0
-            self._current_datablock_index = 0
-        else:
-            self._current_datablock_pos += self.current_datablock_size()
-            self._current_datablock_index += 1
-        self._current_datablock_data = None
-
-    def current_datablock_data(self):
-        if self._current_datablock_data == None:
-            self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
-        return self._current_datablock_data
-
-    def current_datablock_size(self):
-        if self._current_datablock_index < 0:
-            self.nextdatablock()
-        sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
-        if sizehint:
-            return int(sizehint.group(0))
-        return len(self.current_datablock_data())
+        return self.files.values()
 
     def seek(self, pos):
         """Set the position of the next read operation."""
         self._pos = pos
 
-    def really_seek(self):
-        """Find and load the appropriate data block, so the byte at
-        _pos is in memory.
-        """
-        if self._pos == self._current_datablock_pos:
-            return True
-        if (self._current_datablock_pos != None and
-            self._pos >= self._current_datablock_pos and
-            self._pos <= self._current_datablock_pos + self.current_datablock_size()):
-            return True
-        if self._pos < self._current_datablock_pos:
-            self._current_datablock_index = -1
-            self.nextdatablock()
-        while (self._pos > self._current_datablock_pos and
-               self._pos > self._current_datablock_pos + self.current_datablock_size()):
-            self.nextdatablock()
+    def tell(self):
+        return self._pos
+
+    def size(self):
+        n = self.data_locators[-1]
+        return n[self.OFFSET] + n[self.BLOCKSIZE]
+
+    def locators_and_ranges(self.data_locators, self._pos, size)
 
     def read(self, size):
-        """Read no more than size bytes -- but at least one byte,
-        unless _pos is already at the end of the stream.
-        """
+        """Read up to 'size' bytes from the stream, starting at the current file position"""
         if size == 0:
             return ''
-        self.really_seek()
-        while self._pos >= self._current_datablock_pos + self.current_datablock_size():
-            self.nextdatablock()
-            if self._current_datablock_index >= len(self.data_locators):
-                return None
-        data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
+        data = ''
+        for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.data_locators, self._pos, size):
+            data += Keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
         self._pos += len(data)
         return data

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list