[ARVADOS] created: 46f5403fdd2b3b0bd26d81d81ba509c624f54bb3
git at public.curoverse.com
git at public.curoverse.com
Tue Sep 16 14:49:46 EDT 2014
at 46f5403fdd2b3b0bd26d81d81ba509c624f54bb3 (commit)
commit 46f5403fdd2b3b0bd26d81d81ba509c624f54bb3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Sep 16 14:49:38 2014 -0400
3878: Fix block cache sharing across threads so arv-mount uses a bounded amount
of memory to store block. Also fixes bug in cache management code and cleans
some exception handler error reporting.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f5c4066..496136e 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -92,9 +92,9 @@ def normalize(collection):
class CollectionReader(object):
- def __init__(self, manifest_locator_or_text, api_client=None):
+ def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
self._api_client = api_client
- self._keep_client = None
+ self._keep_client = keep_client
if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
@@ -153,7 +153,7 @@ class CollectionReader(object):
# now regenerate the manifest text based on the normalized stream
#print "normalizing", self._manifest_text
- self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
+ self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
#print "result", self._manifest_text
@@ -161,7 +161,7 @@ class CollectionReader(object):
self._populate()
resp = []
for s in self._streams:
- resp.append(StreamReader(s))
+ resp.append(StreamReader(s, keep=self._keep_client))
return resp
def all_files(self):
@@ -172,7 +172,7 @@ class CollectionReader(object):
def manifest_text(self, strip=False):
self._populate()
if strip:
- m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+ m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
return m
else:
return self._manifest_text
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index f0a8724..7621a96 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -143,6 +143,71 @@ class Keep(object):
def put(data, **kwargs):
return Keep.global_client_object().put(data, **kwargs)
+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content == None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ self._cache_lock.acquire()
+ try:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+ finally:
+ self._cache_lock.release()
+
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ self._cache_lock.acquire()
+ try:
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n, False
+
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+ finally:
+ self._cache_lock.release()
class KeepClient(object):
class ThreadLimiter(object):
@@ -326,7 +391,7 @@ class KeepClient(object):
def __init__(self, api_client=None, proxy=None, timeout=300,
- api_token=None, local_store=None):
+ api_token=None, local_store=None, block_cache=None):
"""Initialize a new KeepClient.
Arguments:
@@ -362,15 +427,17 @@ class KeepClient(object):
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ if block_cache is None:
+ raise Exception()
+ self.block_cache = block_cache if block_cache else KeepBlockCache()
+
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
self.timeout = timeout
- self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
- self._cache = []
- self._cache_lock = threading.Lock()
+
if proxy:
if not proxy.endswith('/'):
proxy += '/'
@@ -461,59 +528,6 @@ class KeepClient(object):
_logger.debug(str(pseq))
return pseq
- class CacheSlot(object):
- def __init__(self, locator):
- self.locator = locator
- self.ready = threading.Event()
- self.content = None
-
- def get(self):
- self.ready.wait()
- return self.content
-
- def set(self, value):
- self.content = value
- self.ready.set()
-
- def size(self):
- if self.content == None:
- return 0
- else:
- return len(self.content)
-
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
- self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
- sm = sum([slot.size() for slot in self._cache])
- while sm > self.cache_max:
- del self._cache[-1]
- sm = sum([slot.size() for a in self._cache])
- finally:
- self._cache_lock.release()
-
- def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
- or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
-
- # Add a new cache slot for the locator
- n = KeepClient.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
@@ -569,7 +583,7 @@ class KeepClient(object):
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
- slot, first = self.reserve_cache(expect_hash)
+ slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
return v
@@ -609,7 +623,7 @@ class KeepClient(object):
# Always cache the result, then return it if we succeeded.
slot.set(blob)
- self.cap_cache()
+ self.block_cache.cap_cache()
if loop.success():
return blob
@@ -694,7 +708,6 @@ class KeepClient(object):
os.rename(os.path.join(self.local_store, md5 + '.tmp'),
os.path.join(self.local_store, md5))
return locator
-
def local_store_get(self, loc_s):
try:
locator = KeepLocator(loc_s)
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 4d2dfee..f49b947 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -34,12 +34,18 @@ class SafeApi(object):
self.token = config.get('ARVADOS_API_TOKEN')
self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
self.local = threading.local()
+ self.block_cache = arvados.KeepBlockCache()
def localapi(self):
if 'api' not in self.local.__dict__:
self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
return self.local.api
+ def localkeep(self):
+ if 'keep' not in self.local.__dict__:
+ self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
+ return self.local.keep
+
def collections(self):
return self.localapi().collections()
@@ -307,7 +313,7 @@ class CollectionDirectory(Directory):
self.collection_object_file.update(self.collection_object)
self.clear()
- collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
+ collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
for s in collection.all_streams():
cwd = self
for part in s.name().split('/'):
@@ -341,6 +347,10 @@ class CollectionDirectory(Directory):
else:
_logger.error("arv-mount %s: error", self.collection_locator)
_logger.exception(detail)
+ except arvados.errors.ArgumentError as detail:
+ _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
+ if self.collection_object is not None and "manifest_text" in self.collection_object:
+ _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
except Exception as detail:
_logger.error("arv-mount %s: error", self.collection_locator)
if self.collection_object is not None and "manifest_text" in self.collection_object:
@@ -796,7 +806,11 @@ class Operations(llfuse.Operations):
try:
with llfuse.lock_released:
return handle.entry.readfrom(off, size)
- except:
+ except arvados.errors.NotFoundError as e:
+ _logger.warning("Block not found: " + str(e))
+ raise llfuse.FUSEError(errno.EIO)
+ except Exception as e:
+ _logger.exception(e)
raise llfuse.FUSEError(errno.EIO)
def release(self, fh):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list