[ARVADOS] updated: 27d6124d152f99fc8f26cd2c9fc99d24ab46a7ea
git at public.curoverse.com
git at public.curoverse.com
Fri Feb 14 17:10:51 EST 2014
Summary of changes:
sdk/python/arvados/collection.py | 4 +-
sdk/python/arvados/keep.py | 43 +++++++
sdk/python/arvados/stream.py | 260 +++++++++++++++-----------------------
3 files changed, 147 insertions(+), 160 deletions(-)
via 27d6124d152f99fc8f26cd2c9fc99d24ab46a7ea (commit)
from 519c5922c34da64a8cce9e4d0030892e9b4bdd83 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 27d6124d152f99fc8f26cd2c9fc99d24ab46a7ea
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Feb 14 17:11:55 2014 -0500
Work in progress
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 2535669..96ae100 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -60,7 +60,7 @@ def normalize(collection):
current_span = None
fout = f.replace(' ', '\\040')
for chunk in stream[f]:
- chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.CHUNKOFFSET]
+ chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.OFFSET]
if current_span == None:
current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
else:
@@ -127,7 +127,7 @@ class CollectionReader(object):
self._populate()
resp = []
for s in self._streams:
- resp += [StreamReader(s)]
+ resp.append(StreamReader(s))
return resp
def all_files(self):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e1902d1..7d2b72a 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -138,6 +138,10 @@ class KeepClient(object):
def __init__(self):
self.lock = threading.Lock()
self.service_roots = None
+ self._cache_lock = threading.Lock()
+ self._cache = []
+ # default 256 megabyte cache
+ self._cache_max = 256 * 1024 * 1024
def shuffled_service_roots(self, hash):
if self.service_roots == None:
@@ -173,6 +177,11 @@ class KeepClient(object):
if 'KEEP_LOCAL_STORE' in os.environ:
return KeepClient.local_store_get(locator)
expect_hash = re.sub(r'\+.*', '', locator)
+
+ c = self.check_cache(expect_hash)
+ if c:
+ return c
+
for service_root in self.shuffled_service_roots(expect_hash):
url = service_root + expect_hash
api_token = config.get('ARVADOS_API_TOKEN')
@@ -180,12 +189,15 @@ class KeepClient(object):
'Accept': 'application/octet-stream'}
blob = self.get_url(url, headers, expect_hash)
if blob:
+ self.put_cache(expect_hash, blob)
return blob
+
for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
instance = location_hint.group(1)
url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash
blob = self.get_url(url, {}, expect_hash)
if blob:
+ self.put_cache(expect_hash, blob)
return blob
raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
@@ -234,6 +246,37 @@ class KeepClient(object):
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, want_copies, have_copies))
+ def put_cache(self, locator, data):
+ """Put a block into the cache."""
+ if self.check_cache(locator) != None:
+ return
+ self.cache_lock.acquire()
+ try:
+ # first check cache size and delete stuff from the end if necessary
+ sm = sum([len(a[1]) for a in self._cache]) + len(data)
+ while sum > self._cache_max:
+ del self._cache[-1]
+ sm = sum([len(a[1]) for a in self._cache]) + len(data)
+
+ # now add the new block at the front of the list
+ self._cache.insert(0, [locator, data])
+ finally:
+ self.cache_lock.release()
+
+ def check_cache(self, locator):
+ """Get a block from the cache. Also moves the block to the front of the list."""
+ self._cache_lock.acquire()
+ try:
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i][0] == locator:
+ n = self._cache[i]
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n[1]
+ finally:
+ self.cache_lock.release()
+ return None
+
@staticmethod
def sign_for_old_server(data_hash, data):
return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 84461b3..4a53324 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -22,13 +22,48 @@ from keep import *
import config
import errors
+LOCATOR = 0
+BLOCKSIZE = 1
+OFFSET = 2
+SEGMENTSIZE = 3
+
+def locators_and_ranges(self, data_locators, range_start, range_size):
+ '''returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range'''
+ resp = []
+ range_start = long(range_start)
+ range_size = long(range_size)
+ range_end = range_start + range_size
+ block_start = 0L
+ for locator, block_size, block_start in data_locators:
+ block_end = block_start + block_size
+ if range_end < block_start:
+ # range ends before this block starts, so don't look at any more locators
+ break
+ if range_start > block_end:
+ # range starts after this block ends, so go to next block
+ next
+ elif range_start >= block_start and range_end <= block_end:
+ # range starts and ends in this block
+ resp.append([locator, block_size, range_start - block_start, range_size])
+ elif range_start >= block_start:
+ # range starts in this block
+ resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+ elif range_start < block_start and range_end > block_end:
+ # range starts in a previous block and extends to further blocks
+ resp.append([locator, block_size, 0L, block_size])
+ elif range_start < block_start and range_end <= block_end:
+ # range starts in a previous block and ends in this block
+ resp.append([locator, block_size, 0L, range_end - block_start])
+ block_start = block_end
+ return resp
+
+
class StreamFileReader(object):
- def __init__(self, stream, pos, size, name):
+ def __init__(self, stream, segments, name):
self._stream = stream
- self._pos = pos
- self._size = size
+ self.segments = segments
self._name = name
- self._filepos = 0
+ self._filepos = 0L
def name(self):
return self._name
@@ -36,16 +71,29 @@ class StreamFileReader(object):
def decompressed_name(self):
return re.sub('\.(bz2|gz)$', '', self._name)
- def size(self):
- return self._size
-
def stream_name(self):
return self._stream.name()
+ def seek(self, pos):
+ self._filepos = min(max(pos, 0L), self.size())
+
+ def tell(self, pos):
+ return self._filepos
+
+ def size(self):
+ n = self.data_locators[-1]
+ return n[self.OFFSET] + n[self.BLOCKSIZE]
+
def read(self, size, **kwargs):
- self._stream.seek(self._pos + self._filepos)
- data = self._stream.read(min(size, self._size - self._filepos))
- self._filepos += len(data)
+ """Read up to 'size' bytes from the stream, starting at the current file position"""
+ if size == 0:
+ return ''
+
+ data = ''
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
+ self._stream.seek(locator+segmentoffset)
+ data += self._stream.read(segmentsize)
+ self._filepos += len(data)
return data
def readall(self, size=2**20, **kwargs):
@@ -55,25 +103,22 @@ class StreamFileReader(object):
break
yield data
- def seek(self, pos):
- self._filepos = pos
-
def bunzip2(self, size):
decompressor = bz2.BZ2Decompressor()
- for chunk in self.readall(size):
- data = decompressor.decompress(chunk)
+ for segment in self.readall(size):
+ data = decompressor.decompress(segment)
if data and data != '':
yield data
def gunzip(self, size):
decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
- for chunk in self.readall(size):
- data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
+ for segment in self.readall(size):
+ data = decompressor.decompress(decompressor.unconsumed_tail + segment)
if data and data != '':
yield data
def readall_decompressed(self, size=2**20):
- self._stream.seek(self._pos + self._filepos)
+ self.seek(0)
if re.search('\.bz2$', self._name):
return self.bunzip2(size)
elif re.search('\.gz$', self._name):
@@ -101,173 +146,72 @@ class StreamFileReader(object):
if data != '':
yield data
- def stream_offset(self):
- return self._pos
-
- def as_manifest(self):
- if self.size() == 0:
- return ("%s %s 0:0:%s\n"
- % (self._stream.name(), config.EMPTY_BLOCK_LOCATOR, self.name()))
- return string.join(self._stream.tokens_for_range(self._pos, self._size),
- " ") + "\n"
class StreamReader(object):
def __init__(self, tokens):
self._tokens = tokens
- self._current_datablock_data = None
- self._current_datablock_pos = 0
- self._current_datablock_index = -1
self._pos = 0L
self._stream_name = None
self.data_locators = []
- self.files = []
+ self.files = {}
+
+ streamoffset = 0L
for tok in self._tokens:
if self._stream_name == None:
self._stream_name = tok.replace('\\040', ' ')
- elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
- self.data_locators += [tok]
- elif re.search(r'^\d+:\d+:\S+', tok):
- pos, size, name = tok.split(':',2)
- self.files += [[long(pos), long(size), name.replace('\\040', ' ')]]
- else:
- raise errors.SyntaxError("Invalid manifest format")
-
+ continue
+
+ s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
+ if s:
+ blocksize = long(s.group(1))
+ self.data_locators.append([tok, blocksize, streamoffset])
+ streamoffset += blocksize
+ continue
+
+ s = re.search(r'^(\d+):(\d+):(\S+)', tok)
+ if s:
+ pos = long(s.group(1))
+ size = long(s.group(2))
+ name = s.group(3).replace('\\040', ' ')
+ if name not in self.files:
+ self.files[name] = StreamFileReader(self, [[pos, size, 0]], name)
+ else:
+ n = self.files[name]
+ n.segments.append([pos, size, n.size()])
+ continue
+
+ raise errors.SyntaxError("Invalid manifest format")
+
def tokens(self):
return self._tokens
- def tokens_for_range(self, range_start, range_size):
- resp = [self._stream_name]
- return_all_tokens = False
- block_start = 0
- token_bytes_skipped = 0
- for locator in self.data_locators:
- sizehint = re.search(r'\+(\d+)', locator)
- if not sizehint:
- return_all_tokens = True
- if return_all_tokens:
- resp += [locator]
- next
- blocksize = int(sizehint.group(0))
- if range_start + range_size <= block_start:
- break
- if range_start < block_start + blocksize:
- resp += [locator]
- else:
- token_bytes_skipped += blocksize
- block_start += blocksize
- for f in self.files:
- if ((f[0] < range_start + range_size)
- and
- (f[0] + f[1] > range_start)
- and
- f[1] > 0):
- resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
- return resp
-
- LOCATOR = 0
- BLOCKSIZE = 1
- CHUNKOFFSET = 2
- CHUNKSIZE = 3
-
- def locators_and_ranges(self, range_start, range_size):
- '''returns list of [block locator, blocksize, chunk offset, chunk size] that satisfies the range'''
- resp = []
- return_all_tokens = False
- range_start = long(range_start)
- range_size = long(range_size)
- range_end = range_start + range_size
- block_start = 0L
- for locator in self.data_locators:
- sizehint = re.search(r'[0-9a-f]{32}\+(\d+)', locator)
- if not sizehint:
- raise Exception("Manifest must include block sizes to be normalized")
- block_size = long(sizehint.group(1))
- block_end = block_start + block_size
- if range_end < block_start:
- # range ends before this block starts, so don't look at any more locators
- break
- if range_start > block_end:
- # range starts after this block ends, so go to next block
- next
- elif range_start >= block_start and range_end <= block_end:
- # range starts and ends in this block
- resp.append([locator, block_size, range_start - block_start, range_size])
- elif range_start >= block_start:
- # range starts in this block
- resp.append([locator, block_size, range_start - block_start, block_end - range_start])
- elif range_start < block_start and range_end > block_end:
- # range starts in a previous block and extends to further blocks
- resp.append([locator, block_size, 0L, block_size])
- elif range_start < block_start and range_end <= block_end:
- # range starts in a previous block and ends in this block
- resp.append([locator, block_size, 0L, range_end - block_start])
- block_start = block_end
- return resp
-
def name(self):
return self._stream_name
def all_files(self):
- for f in self.files:
- pos, size, name = f
- yield StreamFileReader(self, pos, size, name)
-
- def nextdatablock(self):
- if self._current_datablock_index < 0:
- self._current_datablock_pos = 0
- self._current_datablock_index = 0
- else:
- self._current_datablock_pos += self.current_datablock_size()
- self._current_datablock_index += 1
- self._current_datablock_data = None
-
- def current_datablock_data(self):
- if self._current_datablock_data == None:
- self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
- return self._current_datablock_data
-
- def current_datablock_size(self):
- if self._current_datablock_index < 0:
- self.nextdatablock()
- sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
- if sizehint:
- return int(sizehint.group(0))
- return len(self.current_datablock_data())
+ return self.files.values()
def seek(self, pos):
"""Set the position of the next read operation."""
self._pos = pos
- def really_seek(self):
- """Find and load the appropriate data block, so the byte at
- _pos is in memory.
- """
- if self._pos == self._current_datablock_pos:
- return True
- if (self._current_datablock_pos != None and
- self._pos >= self._current_datablock_pos and
- self._pos <= self._current_datablock_pos + self.current_datablock_size()):
- return True
- if self._pos < self._current_datablock_pos:
- self._current_datablock_index = -1
- self.nextdatablock()
- while (self._pos > self._current_datablock_pos and
- self._pos > self._current_datablock_pos + self.current_datablock_size()):
- self.nextdatablock()
+ def tell(self):
+ return self._pos
+
+ def size(self):
+ n = self.data_locators[-1]
+ return n[self.OFFSET] + n[self.BLOCKSIZE]
+
+ def locators_and_ranges(self.data_locators, self._pos, size)
def read(self, size):
- """Read no more than size bytes -- but at least one byte,
- unless _pos is already at the end of the stream.
- """
+ """Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
return ''
- self.really_seek()
- while self._pos >= self._current_datablock_pos + self.current_datablock_size():
- self.nextdatablock()
- if self._current_datablock_index >= len(self.data_locators):
- return None
- data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
+ data = ''
+ for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.data_locators, self._pos, size):
+ data += Keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
self._pos += len(data)
return data
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list