[ARVADOS] created: 46f5403fdd2b3b0bd26d81d81ba509c624f54bb3

git at public.curoverse.com git at public.curoverse.com
Tue Sep 16 14:49:46 EDT 2014


        at  46f5403fdd2b3b0bd26d81d81ba509c624f54bb3 (commit)


commit 46f5403fdd2b3b0bd26d81d81ba509c624f54bb3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Sep 16 14:49:38 2014 -0400

    3878: Fix block cache sharing across threads so arv-mount uses a bounded amount
    of memory to store block.  Also fixes bug in cache management code and cleans
    some exception handler error reporting.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f5c4066..496136e 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -92,9 +92,9 @@ def normalize(collection):
 
 
 class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text, api_client=None):
+    def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
         self._api_client = api_client
-        self._keep_client = None
+        self._keep_client = keep_client
         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
@@ -153,7 +153,7 @@ class CollectionReader(object):
         # now regenerate the manifest text based on the normalized stream
 
         #print "normalizing", self._manifest_text
-        self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
+        self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
         #print "result", self._manifest_text
 
 
@@ -161,7 +161,7 @@ class CollectionReader(object):
         self._populate()
         resp = []
         for s in self._streams:
-            resp.append(StreamReader(s))
+            resp.append(StreamReader(s, keep=self._keep_client))
         return resp
 
     def all_files(self):
@@ -172,7 +172,7 @@ class CollectionReader(object):
     def manifest_text(self, strip=False):
         self._populate()
         if strip:
-            m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+            m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
             return m
         else:
             return self._manifest_text
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index f0a8724..7621a96 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -143,6 +143,71 @@ class Keep(object):
     def put(data, **kwargs):
         return Keep.global_client_object().put(data, **kwargs)
 
+class KeepBlockCache(object):
+    # Default RAM cache is 256MiB
+    def __init__(self, cache_max=(256 * 1024 * 1024)):
+        self.cache_max = cache_max
+        self._cache = []
+        self._cache_lock = threading.Lock()
+
+    class CacheSlot(object):
+        def __init__(self, locator):
+            self.locator = locator
+            self.ready = threading.Event()
+            self.content = None
+
+        def get(self):
+            self.ready.wait()
+            return self.content
+
+        def set(self, value):
+            self.content = value
+            self.ready.set()
+
+        def size(self):
+            if self.content == None:
+                return 0
+            else:
+                return len(self.content)
+
+    def cap_cache(self):
+        '''Cap the cache size to self.cache_max'''
+        self._cache_lock.acquire()
+        try:
+            # Select all slots except those where ready.is_set() and content is
+            # None (that means there was an error reading the block).
+            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+            sm = sum([slot.size() for slot in self._cache])
+            while len(self._cache) > 0 and sm > self.cache_max:
+                for i in xrange(len(self._cache)-1, -1, -1):
+                    if self._cache[i].ready.is_set():
+                        del self._cache[i]
+                        break
+                sm = sum([slot.size() for slot in self._cache])
+        finally:
+            self._cache_lock.release()
+
+    def reserve_cache(self, locator):
+        '''Reserve a cache slot for the specified locator,
+        or return the existing slot.'''
+        self._cache_lock.acquire()
+        try:
+            # Test if the locator is already in the cache
+            for i in xrange(0, len(self._cache)):
+                if self._cache[i].locator == locator:
+                    n = self._cache[i]
+                    if i != 0:
+                        # move it to the front
+                        del self._cache[i]
+                        self._cache.insert(0, n)
+                    return n, False
+
+            # Add a new cache slot for the locator
+            n = KeepBlockCache.CacheSlot(locator)
+            self._cache.insert(0, n)
+            return n, True
+        finally:
+            self._cache_lock.release()
 
 class KeepClient(object):
     class ThreadLimiter(object):
@@ -326,7 +391,7 @@ class KeepClient(object):
 
 
     def __init__(self, api_client=None, proxy=None, timeout=300,
-                 api_token=None, local_store=None):
+                 api_token=None, local_store=None, block_cache=None):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -362,15 +427,17 @@ class KeepClient(object):
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        if block_cache is None:
+            raise Exception()
+        self.block_cache = block_cache if block_cache else KeepBlockCache()
+
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
             self.timeout = timeout
-            self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
-            self._cache = []
-            self._cache_lock = threading.Lock()
+
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -461,59 +528,6 @@ class KeepClient(object):
         _logger.debug(str(pseq))
         return pseq
 
-    class CacheSlot(object):
-        def __init__(self, locator):
-            self.locator = locator
-            self.ready = threading.Event()
-            self.content = None
-
-        def get(self):
-            self.ready.wait()
-            return self.content
-
-        def set(self, value):
-            self.content = value
-            self.ready.set()
-
-        def size(self):
-            if self.content == None:
-                return 0
-            else:
-                return len(self.content)
-
-    def cap_cache(self):
-        '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
-            self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
-            sm = sum([slot.size() for slot in self._cache])
-            while sm > self.cache_max:
-                del self._cache[-1]
-                sm = sum([slot.size() for a in self._cache])
-        finally:
-            self._cache_lock.release()
-
-    def reserve_cache(self, locator):
-        '''Reserve a cache slot for the specified locator,
-        or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
-
-            # Add a new cache slot for the locator
-            n = KeepClient.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
-        finally:
-            self._cache_lock.release()
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -569,7 +583,7 @@ class KeepClient(object):
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
-        slot, first = self.reserve_cache(expect_hash)
+        slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
             return v
@@ -609,7 +623,7 @@ class KeepClient(object):
 
         # Always cache the result, then return it if we succeeded.
         slot.set(blob)
-        self.cap_cache()
+        self.block_cache.cap_cache()
         if loop.success():
             return blob
 
@@ -694,7 +708,6 @@ class KeepClient(object):
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
         return locator
-
     def local_store_get(self, loc_s):
         try:
             locator = KeepLocator(loc_s)
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 4d2dfee..f49b947 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -34,12 +34,18 @@ class SafeApi(object):
         self.token = config.get('ARVADOS_API_TOKEN')
         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
         self.local = threading.local()
+        self.block_cache = arvados.KeepBlockCache()
 
     def localapi(self):
         if 'api' not in self.local.__dict__:
             self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
         return self.local.api
 
+    def localkeep(self):
+        if 'keep' not in self.local.__dict__:
+            self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
+        return self.local.keep
+
     def collections(self):
         return self.localapi().collections()
 
@@ -307,7 +313,7 @@ class CollectionDirectory(Directory):
             self.collection_object_file.update(self.collection_object)
 
         self.clear()
-        collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
+        collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
         for s in collection.all_streams():
             cwd = self
             for part in s.name().split('/'):
@@ -341,6 +347,10 @@ class CollectionDirectory(Directory):
             else:
                 _logger.error("arv-mount %s: error", self.collection_locator)
                 _logger.exception(detail)
+        except arvados.errors.ArgumentError as detail:
+            _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
+            if self.collection_object is not None and "manifest_text" in self.collection_object:
+                _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
         except Exception as detail:
             _logger.error("arv-mount %s: error", self.collection_locator)
             if self.collection_object is not None and "manifest_text" in self.collection_object:
@@ -796,7 +806,11 @@ class Operations(llfuse.Operations):
         try:
             with llfuse.lock_released:
                 return handle.entry.readfrom(off, size)
-        except:
+        except arvados.errors.NotFoundError as e:
+            _logger.warning("Block not found: " + str(e))
+            raise llfuse.FUSEError(errno.EIO)
+        except Exception as e:
+            _logger.exception(e)
             raise llfuse.FUSEError(errno.EIO)
 
     def release(self, fh):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list