[ARVADOS] created: 1.1.3-77-g8404fc2
Git user
git at public.curoverse.com
Tue Feb 20 16:29:51 EST 2018
at 8404fc27cc5904f90ef429a2a6e2fd43d5d49996 (commit)
commit 8404fc27cc5904f90ef429a2a6e2fd43d5d49996
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Feb 20 16:29:22 2018 -0500
13106: Ensure cache slot is reset on error
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 351f7f5..e8e95af 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -541,7 +541,7 @@ class KeepClient(object):
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
+
class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
@@ -552,19 +552,19 @@ class KeepClient(object):
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
@@ -613,25 +613,25 @@ class KeepClient(object):
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
@@ -996,84 +996,90 @@ class KeepClient(object):
self.get_counter.add(1)
- locator = KeepLocator(loc_s)
- if method == "GET":
- slot, first = self.block_cache.reserve_cache(locator.md5sum)
- if not first:
- self.hits_counter.add(1)
- v = slot.get()
- return v
-
- self.misses_counter.add(1)
-
- headers = {
- 'X-Request-Id': (request_id or
- (hasattr(self, 'api_client') and self.api_client.request_id) or
- arvados.util.new_request_id()),
- }
-
- # If the locator has hints specifying a prefix (indicating a
- # remote keepproxy) or the UUID of a local gateway service,
- # read data from the indicated service(s) instead of the usual
- # list of local disk services.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
- hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
- for hint in locator.hints if (
- hint.startswith('K@') and
- len(hint) == 29 and
- self._gateway_services.get(hint[2:])
- )])
- # Map root URLs to their KeepService objects.
- roots_map = {
- root: self.KeepService(root, self._user_agent_pool,
- upload_counter=self.upload_counter,
- download_counter=self.download_counter,
- headers=headers)
- for root in hint_roots
- }
-
- # See #3147 for a discussion of the loop implementation. Highlights:
- # * Refresh the list of Keep services after each failure, in case
- # it's being updated.
- # * Retry until we succeed, we're out of retries, or every available
- # service has returned permanent failure.
- sorted_roots = []
- roots_map = {}
+ slot = None
blob = None
- loop = retry.RetryLoop(num_retries, self._check_loop_result,
- backoff_start=2)
- for tries_left in loop:
- try:
- sorted_roots = self.map_new_services(
- roots_map, locator,
- force_rebuild=(tries_left < num_retries),
- need_writable=False,
- headers=headers)
- except Exception as error:
- loop.save_result(error)
- continue
+ try:
+ locator = KeepLocator(loc_s)
+ if method == "GET":
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
+ if not first:
+ self.hits_counter.add(1)
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
+
+ self.misses_counter.add(1)
+
+ headers = {
+ 'X-Request-Id': (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id()),
+ }
+
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {
+ root: self.KeepService(root, self._user_agent_pool,
+ upload_counter=self.upload_counter,
+ download_counter=self.download_counter,
+ headers=headers)
+ for root in hint_roots
+ }
+
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ sorted_roots = []
+ roots_map = {}
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ sorted_roots = self.map_new_services(
+ roots_map, locator,
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False,
+ headers=headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
- # Query KeepService objects that haven't returned
- # permanent failure, in our specified shuffle order.
- services_to_try = [roots_map[root]
- for root in sorted_roots
- if roots_map[root].usable()]
- for keep_service in services_to_try:
- blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
- if blob is not None:
- break
- loop.save_result((blob, len(services_to_try)))
-
- # Always cache the result, then return it if we succeeded.
- if method == "GET":
- slot.set(blob)
- self.block_cache.cap_cache()
- if loop.success():
- if method == "HEAD":
- return True
- else:
- return blob
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in sorted_roots
+ if roots_map[root].usable()]
+ for keep_service in services_to_try:
+ blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ if loop.success():
+ if method == "HEAD":
+ return True
+ else:
+ return blob
+ finally:
+ if slot is not None:
+ slot.set(blob)
+ self.block_cache.cap_cache()
# Q: Including 403 is necessary for the Keep tests to continue
# passing, but maybe they should expect KeepReadError instead?
@@ -1144,7 +1150,7 @@ class KeepClient(object):
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list