[ARVADOS] created: 66eb1f645adc591318f1d857242730d48e5a1b3f
git at public.curoverse.com
git at public.curoverse.com
Fri Aug 22 16:29:15 EDT 2014
at 66eb1f645adc591318f1d857242730d48e5a1b3f (commit)
commit 66eb1f645adc591318f1d857242730d48e5a1b3f
Author: Brett Smith <brett at curoverse.com>
Date: Fri Aug 22 13:38:40 2014 -0400
3147: Add retry support to Python SDK's KeepClient.
diff --git a/sdk/python/arvados/errors.py b/sdk/python/arvados/errors.py
index 1d9c778..89910aa 100644
--- a/sdk/python/arvados/errors.py
+++ b/sdk/python/arvados/errors.py
@@ -17,12 +17,14 @@ class SyntaxError(Exception):
pass
class AssertionError(Exception):
pass
-class NotFoundError(Exception):
- pass
class CommandFailedError(Exception):
pass
+class KeepReadError(Exception):
+ pass
class KeepWriteError(Exception):
pass
+class NotFoundError(KeepReadError):
+ pass
class NotImplementedError(Exception):
pass
class NoKeepServersError(Exception):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 909ee1f..4a4998a 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,6 +27,7 @@ global_client_object = None
import arvados
import arvados.config as config
import arvados.errors
+import arvados.retry as retry
import arvados.util
class KeepLocator(object):
@@ -200,15 +201,83 @@ class KeepClient(object):
return self._done
+ class KeepService(object):
+ # Make requests to a single Keep service, and track results.
+ HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+ ssl.SSLError)
+
+ def __init__(self, root, **headers):
+ self.root = root
+ self.last_result = None
+ self.success_flag = None
+ self.get_headers = {'Accept': 'application/octet-stream'}
+ self.get_headers.update(headers)
+ self.put_headers = headers
+
+ def usable(self):
+ return self.success_flag is not False
+
+ def finished(self):
+ return self.success_flag is not None
+
+ def last_status(self):
+ try:
+ return int(self.last_result[0].status)
+ except (AttributeError, IndexError, ValueError):
+ return None
+
+ def get(self, http, locator):
+ # http is an httplib2.Http object.
+ # locator is a KeepLocator object.
+ url = self.root + str(locator)
+ _logger.debug("Request: GET %s", url)
+ try:
+ with timer.Timer() as t:
+ result = http.request(url.encode('utf-8'), 'GET',
+ headers=self.get_headers)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ content = result[1]
+ _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
+ self.last_status(), len(content), t.msecs,
+ (len(content)/(1024*1024))/t.secs)
+ if self.success_flag:
+ resp_md5 = hashlib.md5(content).hexdigest()
+ if resp_md5 == locator.md5sum:
+ return content
+ _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+ return None
+
+ def put(self, http, hash_s, body):
+ url = self.root + hash_s
+ _logger.debug("Request: PUT %s", url)
+ try:
+ result = http.request(url.encode('utf-8'), 'PUT',
+ headers=self.put_headers, body=body)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: PUT %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ return self.success_flag
+
+
class KeepWriterThread(threading.Thread):
"""
Write a blob of data to the given Keep server. On success, call
save_response() of the given ThreadLimiter to save the returned
locator.
"""
- def __init__(self, api_token, **kwargs):
+ def __init__(self, keep_service, **kwargs):
super(KeepClient.KeepWriterThread, self).__init__()
- self._api_token = api_token
+ self.service = keep_service
self.args = kwargs
self._success = False
@@ -224,51 +293,35 @@ class KeepClient(object):
self.run_with_limiter(limiter)
def run_with_limiter(self, limiter):
+ if self.service.finished():
+ return
_logger.debug("KeepWriterThread %s proceeding %s %s",
str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root'])
h = httplib2.Http(timeout=self.args.get('timeout', None))
- url = self.args['service_root'] + self.args['data_hash']
- headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-
- if self.args['using_proxy']:
- # We're using a proxy, so tell the proxy how many copies we
- # want it to store
- headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
- try:
- _logger.debug("Uploading to {}".format(url))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=self.args['data'])
- if re.match(r'^2\d\d$', resp['status']):
- self._success = True
- _logger.debug("KeepWriterThread %s succeeded %s %s",
- str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root'])
+ self._success = bool(self.service.put(
+ h, self.args['data_hash'], self.args['data']))
+ status = self.service.last_status()
+ if self._success:
+ resp, body = self.service.last_result
+ _logger.debug("KeepWriterThread %s succeeded %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(resp['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
replicas_stored = 1
- if 'x-keep-replicas-stored' in resp:
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
- try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
- except ValueError:
- pass
- limiter.save_response(content.strip(), replicas_stored)
- else:
- _logger.debug("Request fail: PUT %s => %s %s",
- url, resp['status'], content)
- except (httplib2.HttpLib2Error,
- httplib.HTTPException,
- ssl.SSLError) as e:
- # When using https, timeouts look like ssl.SSLError from here.
- # "SSLError: The write operation timed out"
- _logger.debug("Request fail: PUT %s => %s: %s",
- url, type(e), str(e))
+ limiter.save_response(body.strip(), replicas_stored)
+ elif status is not None:
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.args['data_hash'], status,
+ self.service.last_result[1])
def __init__(self, api_client=None, proxy=None, timeout=60,
@@ -323,6 +376,7 @@ class KeepClient(object):
self.api_token = api_token
self.service_roots = [proxy]
self.using_proxy = True
+ self.static_service_roots = True
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
@@ -332,29 +386,35 @@ class KeepClient(object):
self.api_token = api_client.api_token
self.service_roots = None
self.using_proxy = None
+ self.static_service_roots = False
- def shuffled_service_roots(self, hash):
- if self.service_roots is None:
- with self.lock:
- try:
- keep_services = self.api_client.keep_services().accessible()
- except Exception: # API server predates Keep services.
- keep_services = self.api_client.keep_disks().list()
+ def build_service_roots(self, force_rebuild=False):
+ if (self.static_service_roots or
+ (self.service_roots and not force_rebuild)):
+ return
+ with self.lock:
+ try:
+ keep_services = self.api_client.keep_services().accessible()
+ except Exception: # API server predates Keep services.
+ keep_services = self.api_client.keep_disks().list()
+
+ keep_services = keep_services.execute().get('items')
+ if not keep_services:
+ raise arvados.errors.NoKeepServersError()
- keep_services = keep_services.execute().get('items')
- if not keep_services:
- raise arvados.errors.NoKeepServersError()
+ self.using_proxy = (keep_services[0].get('service_type') ==
+ 'proxy')
- self.using_proxy = (keep_services[0].get('service_type') ==
- 'proxy')
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_services)
+ self.service_roots = sorted(set(roots))
+ _logger.debug(str(self.service_roots))
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_services)
- self.service_roots = sorted(set(roots))
- _logger.debug(str(self.service_roots))
+ def shuffled_service_roots(self, hash, force_rebuild=False):
+ self.build_service_roots(force_rebuild)
# Build an ordering with which to query the Keep servers based on the
# contents of the hash.
@@ -454,113 +514,170 @@ class KeepClient(object):
finally:
self._cache_lock.release()
- def get(self, loc_s):
+ def add_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ # roots_map is a dictionary, mapping Keep service root strings
+ # to KeepService objects. Poll for Keep services, and add any
+ # new ones to roots_map. Return the current list of local
+ # root strings.
+ headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+ local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ for root in local_roots:
+ if root not in roots_map:
+ roots_map[root] = self.KeepService(root, **headers)
+ return local_roots
+
+ def get(self, loc_s, num_retries=0):
+ """Get data from Keep.
+
+ This method fetches one or more blocks of data from Keep. It
+ sends a request each Keep service registered with the API
+ server (or the proxy provided when this client was
+ instantiated), then each service named in location hints, in
+ sequence. As soon as one service provides the data, it's
+ returned.
+
+ Arguments:
+ * loc_s: A string of one or more comma-separated locators to fetch.
+ This method returns the concatenation of these blocks.
+ * num_retries: The number of times to retry GET requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Note that, in each loop, the method may try
+ to fetch data from every available Keep service, along with any
+ that are named in location hints in the locator. Default 0.
+ """
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
slot, first = self.reserve_cache(expect_hash)
-
if not first:
v = slot.get()
return v
- try:
- for service_root in self.shuffled_service_roots(expect_hash):
- url = service_root + loc_s
- headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
- 'Accept': 'application/octet-stream'}
- blob = self.get_url(url, headers, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
-
- for hint in locator.hints:
- if not hint.startswith('K@'):
- continue
- url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
- blob = self.get_url(url, {}, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
- except:
- slot.set(None)
- self.cap_cache()
- raise
-
- slot.set(None)
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@')]
+ # Map root URLs their KeepService objects.
+ roots_map = {root: self.KeepService(root) for root in hint_roots}
+ blob = None
+ loop = retry.HTTPRetryLoop(num_retries,
+ lambda r: None if blob is None else True)
+ for tries_left in loop:
+ try:
+ local_roots = self.add_new_services(
+ roots_map, expect_hash,
+ force_rebuild=(tries_left < num_retries))
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ # Build an ordered list of KeepService objects that haven't
+ # returned permanent failure.
+ services_to_try = [roots_map[root]
+ for root in (local_roots + hint_roots)
+ if roots_map[root].usable()]
+ if not services_to_try:
+ _logger.info(
+ "All Keep services for %s have permafailed; giving up",
+ loc_s)
+ break
+
+ http = httplib2.Http(timeout=self.timeout)
+ for keep_service in services_to_try:
+ blob = keep_service.get(http, locator)
+ if blob is not None:
+ loop.save_result(blob)
+ break
+
+ # Always cache the result, then return it if we succeeded.
+ slot.set(blob)
self.cap_cache()
- raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+ if loop.success():
+ return blob
+
+ # No servers fulfilled the request. Count how many responded
+ # "not found;" if the ratio is high enough (currently 75%), report
+ # Not Found; otherwise a generic error.
+ not_founds = sum(1 for ks in roots_map.values()
+ if ks.last_status() in set([403, 404, 410]))
+ if (float(not_founds) / len(roots_map)) >= .75:
+ raise arvados.errors.NotFoundError(loc_s)
+ else:
+ raise arvados.errors.KeepReadError(loc_s)
- def get_url(self, url, headers, expect_hash):
- h = httplib2.Http()
- try:
- _logger.debug("Request: GET %s", url)
- with timer.Timer() as t:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
- len(content), t.msecs,
- (len(content)/(1024*1024))/t.secs)
- if re.match(r'^2\d\d$', resp['status']):
- md5 = hashlib.md5(content).hexdigest()
- if md5 == expect_hash:
- return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
- except Exception as e:
- _logger.debug("Request fail: GET %s => %s: %s",
- url, type(e), str(e))
- return None
-
- def put(self, data, copies=2):
+ def put(self, data, copies=2, num_retries=0):
+ """Save data in Keep.
+
+ This method will get a list of Keep services from the API server, and
+ send the data to each one simultaneously in a new thread. Once the
+ uploads are finished, if enough copies are saved, this method returns
+ the most recent HTTP response body. If requests fail to upload
+ enough copies, this method raises KeepWriteError.
+
+ Arguments:
+ * data: The string of data to upload.
+ * copies: The number of copies that the user requires be saved.
+ Default 2.
+ * num_retries: The number of times to retry PUT requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Default 0.
+ """
data_hash = hashlib.md5(data).hexdigest()
- have_copies = 0
- want_copies = copies
- if not (want_copies > 0):
+ if copies < 1:
return data_hash
- threads = []
- thread_limiter = KeepClient.ThreadLimiter(want_copies)
- for service_root in self.shuffled_service_roots(data_hash):
- t = KeepClient.KeepWriterThread(
- self.api_token,
- data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- timeout=self.timeout,
- using_proxy=self.using_proxy,
- want_copies=(want_copies if self.using_proxy else 1))
- t.start()
- threads += [t]
- for t in threads:
- t.join()
- if thread_limiter.done() < want_copies:
- # Retry the threads (i.e., services) that failed the first
- # time around.
- threads_retry = []
+
+ headers = {}
+ if self.using_proxy:
+ # We're using a proxy, so tell the proxy how many copies we
+ # want it to store
+ headers['X-Keep-Desired-Replication'] = str(copies)
+ roots_map = {}
+ thread_limiter = KeepClient.ThreadLimiter(copies)
+ loop = retry.HTTPRetryLoop(
+ num_retries,
+ lambda r: True if (thread_limiter.done() >= copies) else None)
+ for tries_left in loop:
+ try:
+ local_roots = self.add_new_services(
+ roots_map, data_hash,
+ force_rebuild=(tries_left < num_retries), **headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ threads = []
+ for service_root, ks in roots_map.iteritems():
+ if ks.finished():
+ continue
+ t = KeepClient.KeepWriterThread(
+ ks,
+ data=data,
+ data_hash=data_hash,
+ service_root=service_root,
+ thread_limiter=thread_limiter,
+ timeout=self.timeout)
+ t.start()
+ threads.append(t)
+
+ if not threads:
+ _logger.info(
+ "All Keep services for %s have finished; giving up",
+ data_hash)
+ break
for t in threads:
- if not t.success():
- _logger.debug("Retrying: PUT %s %s",
- t.args['service_root'],
- t.args['data_hash'])
- retry_with_args = t.args.copy()
- t_retry = KeepClient.KeepWriterThread(self.api_token,
- **retry_with_args)
- t_retry.start()
- threads_retry += [t_retry]
- for t in threads_retry:
t.join()
- have_copies = thread_limiter.done()
- # If we're done, return the response from Keep
- if have_copies >= want_copies:
+ loop.save_result(None)
+
+ if loop.success():
return thread_limiter.response()
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
- (data_hash, want_copies, have_copies))
-
+ (data_hash, copies, thread_limiter.done()))
def local_store_put(self, data):
md5 = hashlib.md5(data).hexdigest()
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 6198919..3476069 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,12 +1,11 @@
-# usage example:
-#
-# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
-
+import mock
import os
import unittest
import arvados
+import arvados.retry
import run_test_server
+from arvados_testutil import fake_httplib2_response
class KeepTestCase(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
@@ -219,3 +218,108 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
'baz2',
'wrong content from Keep.get(md5("baz2"))')
self.assertTrue(keep_client.using_proxy)
+
+
+class KeepClientRetryTestMixin(object):
+ # Testing with a local Keep store won't exercise the retry behavior.
+ # Instead, our strategy is:
+ # * Create a client with one proxy specified (pointed at a black
+ # hole), so there's no need to instantiate an API client, and
+ # all HTTP requests come from one place.
+ # * Mock httplib's request method to provide simulated responses.
+ # This lets us test the retry logic extensively without relying on any
+ # supporting servers, and prevents side effects in case something hiccups.
+ # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
+ # run_method().
+ PROXY_ADDR = 'http://[100::]/'
+ TEST_DATA = 'testdata'
+ TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
+
+ @staticmethod
+ def mock_responses(body, *codes):
+ return mock.patch('httplib2.Http.request', side_effect=(
+ (fake_httplib2_response(code), body) for code in codes))
+
+ def new_client(self):
+ return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+
+ def run_method(self, *args, **kwargs):
+ raise NotImplementedError("test subclasses must define run_method")
+
+ def check_success(self, expected=None, *args, **kwargs):
+ if expected is None:
+ expected = self.DEFAULT_EXPECT
+ self.assertEqual(expected, self.run_method(*args, **kwargs))
+
+ def check_exception(self, error_class=None, *args, **kwargs):
+ if error_class is None:
+ error_class = self.DEFAULT_EXCEPTION
+ self.assertRaises(error_class, self.run_method, *args, **kwargs)
+
+ def test_immediate_success(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ self.check_success()
+
+ def test_retry_then_success(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ self.check_success(num_retries=3)
+
+ def test_no_default_retry(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ self.check_exception()
+
+ def test_no_retry_after_permanent_error(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+ self.check_exception(num_retries=3)
+
+ def test_error_after_retries_exhausted(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+ self.check_exception(num_retries=1)
+
+
+# Don't delay from HTTPRetryLoop's exponential backoff.
+no_backoff = mock.patch('time.sleep', lambda n: None)
+ at no_backoff
+class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
+ DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+
+ def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
+ *args, **kwargs):
+ return self.new_client().get(locator, *args, **kwargs)
+
+ def test_specific_exception_when_not_found(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+ self.check_exception(arvados.errors.NotFoundError, num_retries=3)
+
+ def test_general_exception_with_mixed_errors(self):
+ # get should raise a NotFoundError if no server returns the block,
+ # and a high threshold of servers report that it's not found.
+ # This test rigs up 50/50 disagreement between two servers, and
+ # checks that it does not become a NotFoundError.
+ client = self.new_client()
+ client.service_roots = [self.PROXY_ADDR, self.PROXY_ADDR]
+ with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+ with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
+ client.get(self.TEST_LOCATOR)
+ self.assertNotIsInstance(
+ exc_check.exception, arvados.errors.NotFoundError,
+ "mixed errors raised NotFoundError")
+
+ def test_hint_server_can_succeed_without_retries(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+ self.check_success(locator=self.TEST_LOCATOR + '+K at xyzzy')
+
+
+ at no_backoff
+class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
+ DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
+
+ def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
+ copies=1, *args, **kwargs):
+ return self.new_client().put(data, copies, *args, **kwargs)
+
+ def test_do_not_send_multiple_copies_to_same_server(self):
+ with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ self.check_exception(copies=2, num_retries=3)
commit 5632bb3b451b0b7b624a55160c0ab2f2f287a373
Author: Brett Smith <brett at curoverse.com>
Date: Fri Aug 22 15:57:36 2014 -0400
3147: Remove old Keep signing support from Python SDK.
Per Tom.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 561d34c..909ee1f 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -242,15 +242,6 @@ class KeepClient(object):
resp, content = h.request(url.encode('utf-8'), 'PUT',
headers=headers,
body=self.args['data'])
- if (resp['status'] == '401' and
- re.match(r'Timestamp verification failed', content)):
- body = KeepClient.sign_for_old_server(
- self.args['data_hash'],
- self.args['data'])
- h = httplib2.Http(timeout=self.args.get('timeout', None))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=body)
if re.match(r'^2\d\d$', resp['status']):
self._success = True
_logger.debug("KeepWriterThread %s succeeded %s %s",
@@ -570,9 +561,6 @@ class KeepClient(object):
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, want_copies, have_copies))
- @staticmethod
- def sign_for_old_server(data_hash, data):
- return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
def local_store_put(self, data):
md5 = hashlib.md5(data).hexdigest()
commit 51334e82aee44732dc6857ffd7c9a0735f718cfd
Author: Brett Smith <brett at curoverse.com>
Date: Thu Aug 21 15:52:08 2014 -0400
3147: Add HTTPRetryLoop to the Python SDK.
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 6670e66..f5edc75 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -1,5 +1,7 @@
#!/usr/bin/env python
+import time
+
from collections import deque
import arvados.errors
@@ -124,3 +126,34 @@ def check_http_response_success(result):
return False
else:
return None # Get well soon, server.
+
+class HTTPRetryLoop(RetryLoop):
+ """Coordinate limited retries of HTTP requests.
+
+ This RetryLoop uses check_http_response_success as the default
+ success check, and provides exponential backoff between
+ iterations.
+ """
+ def __init__(self, num_retries, success_check=check_http_response_success,
+ backoff_start=1, backoff_growth=2, save_results=1):
+ """Construct an HTTPRetryLoop.
+
+ New arguments (see RetryLoop for others):
+ * backoff_start: The number of seconds that must pass before the
+ loop's second iteration. Default 1.
+ * backoff_growth: The wait time multiplier after each iteration.
+ Default 2 (i.e., double the wait time each time).
+ """
+ self.backoff_wait = backoff_start
+ self.backoff_growth = backoff_growth
+ self.next_start_time = 0
+ super(HTTPRetryLoop, self).__init__(num_retries, success_check,
+ save_results)
+
+ def next(self):
+ if self.running() and (self.tries_left > 0):
+ wait_time = max(0, self.next_start_time - time.time())
+ time.sleep(wait_time)
+ self.backoff_wait *= self.backoff_growth
+ self.next_start_time = time.time() + self.backoff_wait
+ return super(HTTPRetryLoop, self).next()
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 2c51e8f..30cc779 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -32,5 +32,5 @@ setup(name='arvados-python-client',
'ws4py'
],
test_suite='tests',
- tests_require=['PyYAML'],
+ tests_require=['mock', 'PyYAML'],
zip_safe=False)
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index 9984f09..d6e3d62 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -5,6 +5,7 @@ import unittest
import arvados.errors as arv_error
import arvados.retry as arv_retry
+import mock
from arvados_testutil import fake_httplib2_response
@@ -93,6 +94,81 @@ class RetryLoopTestCase(unittest.TestCase):
self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+ at mock.patch('time.sleep')
+class HTTPRetryLoopTestCase(unittest.TestCase):
+ def run_loop(self, num_retries, *codes, **kwargs):
+ responses = itertools.chain(
+ ((fake_httplib2_response(c), str(c)) for c in codes),
+ itertools.repeat((None, None)))
+ retrier = arv_retry.HTTPRetryLoop(num_retries, **kwargs)
+ for tries_left, response in itertools.izip(retrier, responses):
+ retrier.save_result(response)
+ return retrier
+
+ def check_result(self, retrier, expect_success, last_status,
+ sleep_mock, sleep_count):
+ self.assertIs(retrier.success(), expect_success,
+ "loop success flag is incorrect")
+ self.assertEqual(str(last_status), retrier.last_result()[1],
+ "wrong loop result")
+ self.assertEqual(sleep_count, sleep_mock.call_count,
+ "loop did not back off correctly")
+
+ def sleep_times(self, sleep_mock):
+ return (arglist[0][0] for arglist in sleep_mock.call_args_list)
+
+ def check_backoff_growth(self, sleep_mock, multiplier=1):
+ check = (self.assertGreater if (multiplier == 1)
+ else self.assertGreaterEqual)
+ sleep_times = self.sleep_times(sleep_mock)
+ last_wait = next(sleep_times)
+ for this_wait in sleep_times:
+ check(this_wait, last_wait * multiplier,
+ "loop did not grow backoff times correctly")
+ last_wait = this_wait
+
+ def test_no_retries_and_success(self, sleep_mock):
+ retrier = self.run_loop(0, 200)
+ self.check_result(retrier, True, 200, sleep_mock, 0)
+
+ def test_no_retries_and_tempfail(self, sleep_mock):
+ retrier = self.run_loop(0, 500, 200)
+ self.check_result(retrier, None, 500, sleep_mock, 0)
+
+ def test_no_retries_and_permfail(self, sleep_mock):
+ retrier = self.run_loop(0, 400, 200)
+ self.check_result(retrier, False, 400, sleep_mock, 0)
+
+ def test_retries_with_immediate_success(self, sleep_mock):
+ retrier = self.run_loop(3, 200, 500, 500)
+ self.check_result(retrier, True, 200, sleep_mock, 0)
+
+ def test_retries_with_delayed_success(self, sleep_mock):
+ retrier = self.run_loop(3, 500, 500, 200, 502)
+ self.check_result(retrier, True, 200, sleep_mock, 2)
+ self.check_backoff_growth(sleep_mock)
+
+ def test_retries_then_permfail(self, sleep_mock):
+ retrier = self.run_loop(3, 500, 404, 200, 200)
+ self.check_result(retrier, False, 404, sleep_mock, 1)
+
+ def test_retries_all_tempfail(self, sleep_mock):
+ retrier = self.run_loop(3, 502, 502, 502, 500, 200)
+ self.check_result(retrier, None, 500, sleep_mock, 3)
+ self.check_backoff_growth(sleep_mock)
+
+ def test_backoff_parameters(self, sleep_mock):
+ with mock.patch('time.time', side_effects=itertools.count):
+ self.run_loop(3, 500, 500, 500, 500,
+ backoff_start=5, backoff_growth=10)
+ self.check_backoff_growth(sleep_mock, 10)
+
+ def test_custom_success_check(self, mock):
+ retrier = self.run_loop(3, 200, 777, 201, 202, 203,
+ success_check=lambda r: r[1] == '777' or None)
+ self.check_result(retrier, True, 777, mock, 1)
+
+
class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
def results_map(self, *codes):
for code in codes:
commit aead13393f781ef2966e6e4133a57eec5a626597
Author: Brett Smith <brett at curoverse.com>
Date: Thu Aug 21 14:37:02 2014 -0400
3147: Add check_http_response_success to Python SDK.
Other parts of the SDK need to end loops based on the result of an
HTTP request. This function puts that logic in one place.
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 1254371..6670e66 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -4,6 +4,9 @@ from collections import deque
import arvados.errors
+_HTTP_SUCCESSES = set(xrange(200, 300))
+_HTTP_CAN_RETRY = set([408, 409, 422, 423, 500, 502, 503, 504])
+
class RetryLoop(object):
"""Coordinate limited retries of code.
@@ -88,3 +91,36 @@ class RetryLoop(object):
except IndexError:
raise arvados.errors.AssertionError(
"queried loop results before any were recorded")
+
+
+def check_http_response_success(result):
+ """Convert an httplib2 request result to a loop control flag.
+
+ Pass this method the 2-tuple returned by httplib2.Http.request. It
+ returns True if the response indicates success, None if it indicates
+ temporary failure, and False otherwise. You can use this as the
+ success_check for a RetryLoop.
+
+ Implementation details:
+ * Any 2xx result returns True.
+ * A select few status codes, or any malformed responses, return None.
+ 422 Unprocessable Entity is in this category. This may not meet the
+ letter of the HTTP specification, but the Arvados API server will
+ use it for various server-side problems like database connection
+ errors.
+ * Everything else returns False. Note that this includes 1xx and
+ 3xx status codes. They don't indicate success, and you can't
+ retry those requests verbatim.
+ """
+ try:
+ status = int(result[0].status)
+ except Exception:
+ return None
+ if status in _HTTP_SUCCESSES:
+ return True
+ elif status in _HTTP_CAN_RETRY:
+ return None
+ elif 100 <= status < 600:
+ return False
+ else:
+ return None # Get well soon, server.
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index fd68994..9984f09 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -6,6 +6,8 @@ import unittest
import arvados.errors as arv_error
import arvados.retry as arv_retry
+from arvados_testutil import fake_httplib2_response
+
class RetryLoopTestCase(unittest.TestCase):
@staticmethod
def loop_success(result):
@@ -91,5 +93,52 @@ class RetryLoopTestCase(unittest.TestCase):
self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
+ def results_map(self, *codes):
+ for code in codes:
+ response = (fake_httplib2_response(code), None)
+ yield code, arv_retry.check_http_response_success(response)
+
+ def check(assert_name):
+ def check_method(self, expected, *codes):
+ assert_func = getattr(self, assert_name)
+ for code, actual in self.results_map(*codes):
+ assert_func(expected, actual,
+ "{} status flagged {}".format(code, actual))
+ if assert_name != 'assertIs':
+ self.assertTrue(
+ actual is True or actual is False or actual is None,
+ "{} status returned {}".format(code, actual))
+ return check_method
+
+ check_is = check('assertIs')
+ check_is_not = check('assertIsNot')
+
+ def test_obvious_successes(self):
+ self.check_is(True, *range(200, 207))
+
+ def test_obvious_stops(self):
+ self.check_is(False, 424, 426, 428, 431,
+ *range(400, 408) + range(410, 420))
+
+ def test_obvious_retries(self):
+ self.check_is(None, 500, 502, 503, 504)
+
+ def test_4xx_retries(self):
+ self.check_is(None, 408, 409, 422, 423)
+
+ def test_5xx_failures(self):
+ self.check_is(False, 501, *range(505, 512))
+
+ def test_1xx_not_retried(self):
+ self.check_is_not(None, 100, 101)
+
+ def test_redirects_not_retried(self):
+ self.check_is_not(None, *range(300, 309))
+
+ def test_wacky_code_retries(self):
+ self.check_is(None, 0, 99, 600, -200)
+
+
if __name__ == '__main__':
unittest.main()
commit a4359d15330c352ae0727fb326507a53c9ad0989
Author: Brett Smith <brett at curoverse.com>
Date: Thu Aug 21 13:24:39 2014 -0400
3147: Add RetryLoop to the Python SDK.
This provides a general-purpose mechanism for us to retry all kinds of
operations.
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
new file mode 100644
index 0000000..1254371
--- /dev/null
+++ b/sdk/python/arvados/retry.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+
+from collections import deque
+
+import arvados.errors
+
+class RetryLoop(object):
+ """Coordinate limited retries of code.
+
+ RetryLoop coordinates a loop that runs until it records a
+ successful result or tries too many times, whichever comes first.
+ Typical use looks like:
+
+ loop = RetryLoop(num_retries=2)
+ for tries_left in loop:
+ try:
+ result = do_something()
+ except TemporaryError as error:
+ log("error: {} ({} tries left)".format(error, tries_left))
+ else:
+ loop.save_result(result)
+ if loop.success():
+ return loop.last_result()
+ """
+ def __init__(self, num_retries, success_check=lambda r: True,
+ save_results=1):
+ """Construct a new RetryLoop.
+
+ Arguments:
+ * num_retries: The maximum number of times to retry the loop if it
+ doesn't succeed. This means the loop could run at most 1+N times.
+ * success_check: This is a function that will be called each
+ time the loop saves a result. The function should return
+ True if the result indicates loop success, False if it
+ represents a permanent failure state, and None if the loop
+ should continue. If no function is provided, the loop will
+ end as soon as it records any result.
+ * save_results: Specify a number to save the last N results
+ that the loop recorded. These records are available through
+ the results attribute, oldest first. Default 1.
+ """
+ self.tries_left = num_retries + 1
+ self.check_result = success_check
+ self.results = deque(maxlen=save_results)
+ self._running = None
+ self._success = None
+
+ def __iter__(self):
+ return self
+
+ def running(self):
+ return self._running and (self._success is None)
+
+ def next(self):
+ if self._running is None:
+ self._running = True
+ if (self.tries_left < 1) or not self.running():
+ self._running = False
+ raise StopIteration
+ self.tries_left -= 1
+ return self.tries_left
+
+ def save_result(self, result):
+ """Record a loop result.
+
+ Save the given result, and end the loop if it indicates
+ success or permanent failure. See __init__'s documentation
+ about success_check to learn how to make that indication.
+ """
+ if not self.running():
+ raise arvados.errors.AssertionError(
+ "recorded a loop result after the loop finished")
+ self.results.append(result)
+ self._success = self.check_result(result)
+
+ def success(self):
+ """Return the loop's end state.
+
+ Returns True if the loop obtained a successful result, False if it
+ encountered permanent failure, or else None.
+ """
+ return self._success
+
+ def last_result(self):
+ """Return the most recent result the loop recorded."""
+ try:
+ return self.results[-1]
+ except IndexError:
+ raise arvados.errors.AssertionError(
+ "queried loop results before any were recorded")
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
new file mode 100644
index 0000000..fd68994
--- /dev/null
+++ b/sdk/python/tests/test_retry.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+import itertools
+import unittest
+
+import arvados.errors as arv_error
+import arvados.retry as arv_retry
+
+class RetryLoopTestCase(unittest.TestCase):
+ @staticmethod
+ def loop_success(result):
+ # During the tests, we use integers that look like HTTP status
+ # codes as loop results. Then we define simplified HTTP
+ # heuristics here to decide whether the result is success (True),
+ # permanent failure (False), or temporary failure (None).
+ if result < 400:
+ return True
+ elif result < 500:
+ return False
+ else:
+ return None
+
+ def run_loop(self, num_retries, *results):
+ responses = itertools.chain(results, itertools.repeat(None))
+ retrier = arv_retry.RetryLoop(num_retries, self.loop_success)
+ for tries_left, response in itertools.izip(retrier, responses):
+ retrier.save_result(response)
+ return retrier
+
+ def check_result(self, retrier, expect_success, last_code):
+ self.assertIs(retrier.success(), expect_success,
+ "loop success flag is incorrect")
+ self.assertEqual(last_code, retrier.last_result())
+
+ def test_zero_retries_and_success(self):
+ retrier = self.run_loop(0, 200)
+ self.check_result(retrier, True, 200)
+
+ def test_zero_retries_and_tempfail(self):
+ retrier = self.run_loop(0, 500, 501)
+ self.check_result(retrier, None, 500)
+
+ def test_zero_retries_and_permfail(self):
+ retrier = self.run_loop(0, 400, 201)
+ self.check_result(retrier, False, 400)
+
+ def test_one_retry_with_immediate_success(self):
+ retrier = self.run_loop(1, 200, 201)
+ self.check_result(retrier, True, 200)
+
+ def test_one_retry_with_delayed_success(self):
+ retrier = self.run_loop(1, 500, 201)
+ self.check_result(retrier, True, 201)
+
+ def test_one_retry_with_no_success(self):
+ retrier = self.run_loop(1, 500, 501, 502)
+ self.check_result(retrier, None, 501)
+
+ def test_one_retry_but_permfail(self):
+ retrier = self.run_loop(1, 400, 201)
+ self.check_result(retrier, False, 400)
+
+ def test_two_retries_with_immediate_success(self):
+ retrier = self.run_loop(2, 200, 201, 202)
+ self.check_result(retrier, True, 200)
+
+ def test_two_retries_with_success_after_one(self):
+ retrier = self.run_loop(2, 500, 201, 502)
+ self.check_result(retrier, True, 201)
+
+ def test_two_retries_with_success_after_two(self):
+ retrier = self.run_loop(2, 500, 501, 202, 503)
+ self.check_result(retrier, True, 202)
+
+ def test_two_retries_with_no_success(self):
+ retrier = self.run_loop(2, 500, 501, 502, 503)
+ self.check_result(retrier, None, 502)
+
+ def test_two_retries_with_permfail(self):
+ retrier = self.run_loop(2, 500, 401, 202)
+ self.check_result(retrier, False, 401)
+
+ def test_save_result_before_start_is_error(self):
+ retrier = arv_retry.RetryLoop(0)
+ self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+
+ def test_save_result_after_end_is_error(self):
+ retrier = arv_retry.RetryLoop(0)
+ for count in retrier:
+ pass
+ self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+
+
+if __name__ == '__main__':
+ unittest.main()
commit 3e24c54403f877a969bb4c3b439e393a16e287d9
Author: Brett Smith <brett at curoverse.com>
Date: Thu Aug 21 11:13:23 2014 -0400
3147: Move fake Python HTTP response generation to arvados_testutil.
Other tests can use this functionality.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 369d561..77146db 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -1,11 +1,18 @@
#!/usr/bin/env python
import errno
+import httplib
+import httplib2
import os
import shutil
import tempfile
import unittest
+def fake_httplib2_response(code, **headers):
+ headers.update(status=str(code),
+ reason=httplib.responses.get(code, "Unknown Response"))
+ return httplib2.Response(headers)
+
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_api.py b/sdk/python/tests/test_api.py
index 2fd709e..e9cb838 100644
--- a/sdk/python/tests/test_api.py
+++ b/sdk/python/tests/test_api.py
@@ -10,22 +10,17 @@ import run_test_server
import unittest
from apiclient.http import RequestMockBuilder
-from httplib import responses as HTTP_RESPONSES
+from arvados_testutil import fake_httplib2_response
if not mimetypes.inited:
mimetypes.init()
class ArvadosApiClientTest(unittest.TestCase):
- @classmethod
- def response_from_code(cls, code):
- return httplib2.Response(
- {'status': code,
- 'reason': HTTP_RESPONSES.get(code, "Unknown Response"),
- 'Content-Type': mimetypes.types_map['.json']})
+ ERROR_HEADERS = {'Content-Type': mimetypes.types_map['.json']}
@classmethod
def api_error_response(cls, code, *errors):
- return (cls.response_from_code(code),
+ return (fake_httplib2_response(code, **cls.ERROR_HEADERS),
json.dumps({'errors': errors,
'error_token': '1234567890+12345678'}))
@@ -38,7 +33,9 @@ class ArvadosApiClientTest(unittest.TestCase):
# FIXME: Figure out a better way to stub this out.
run_test_server.run()
mock_responses = {
- 'arvados.humans.delete': (cls.response_from_code(500), ""),
+ 'arvados.humans.delete': (
+ fake_httplib2_response(500, **cls.ERROR_HEADERS),
+ ""),
'arvados.humans.get': cls.api_error_response(
422, "Bad UUID format", "Bad output format"),
'arvados.humans.list': (None, json.dumps(
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list