[ARVADOS] created: 66eb1f645adc591318f1d857242730d48e5a1b3f

git at public.curoverse.com git at public.curoverse.com
Fri Aug 22 16:29:15 EDT 2014

        at  66eb1f645adc591318f1d857242730d48e5a1b3f (commit)

commit 66eb1f645adc591318f1d857242730d48e5a1b3f
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Aug 22 13:38:40 2014 -0400

    3147: Add retry support to Python SDK's KeepClient.

diff --git a/sdk/python/arvados/errors.py b/sdk/python/arvados/errors.py
index 1d9c778..89910aa 100644
--- a/sdk/python/arvados/errors.py
+++ b/sdk/python/arvados/errors.py
@@ -17,12 +17,14 @@ class SyntaxError(Exception):
 class AssertionError(Exception):
-class NotFoundError(Exception):
-    pass
 class CommandFailedError(Exception):
+class KeepReadError(Exception):
+    pass
 class KeepWriteError(Exception):
+class NotFoundError(KeepReadError):
+    pass
 class NotImplementedError(Exception):
 class NoKeepServersError(Exception):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 909ee1f..4a4998a 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,6 +27,7 @@ global_client_object = None
 import arvados
 import arvados.config as config
 import arvados.errors
+import arvados.retry as retry
 import arvados.util
 class KeepLocator(object):
@@ -200,15 +201,83 @@ class KeepClient(object):
                 return self._done
+    class KeepService(object):
+        # Make requests to a single Keep service, and track results.
+        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+                       ssl.SSLError)
+        def __init__(self, root, **headers):
+            self.root = root
+            self.last_result = None
+            self.success_flag = None
+            self.get_headers = {'Accept': 'application/octet-stream'}
+            self.get_headers.update(headers)
+            self.put_headers = headers
+        def usable(self):
+            return self.success_flag is not False
+        def finished(self):
+            return self.success_flag is not None
+        def last_status(self):
+            try:
+                return int(self.last_result[0].status)
+            except (AttributeError, IndexError, ValueError):
+                return None
+        def get(self, http, locator):
+            # http is an httplib2.Http object.
+            # locator is a KeepLocator object.
+            url = self.root + str(locator)
+            _logger.debug("Request: GET %s", url)
+            try:
+                with timer.Timer() as t:
+                    result = http.request(url.encode('utf-8'), 'GET',
+                                          headers=self.get_headers)
+            except self.HTTP_ERRORS as e:
+                _logger.debug("Request fail: GET %s => %s: %s",
+                              url, type(e), str(e))
+                self.last_result = e
+            else:
+                self.last_result = result
+                self.success_flag = retry.check_http_response_success(result)
+                content = result[1]
+                _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
+                             self.last_status(), len(content), t.msecs,
+                             (len(content)/(1024*1024))/t.secs)
+                if self.success_flag:
+                    resp_md5 = hashlib.md5(content).hexdigest()
+                    if resp_md5 == locator.md5sum:
+                        return content
+                    _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+            return None
+        def put(self, http, hash_s, body):
+            url = self.root + hash_s
+            _logger.debug("Request: PUT %s", url)
+            try:
+                result = http.request(url.encode('utf-8'), 'PUT',
+                                      headers=self.put_headers, body=body)
+            except self.HTTP_ERRORS as e:
+                _logger.debug("Request fail: PUT %s => %s: %s",
+                              url, type(e), str(e))
+                self.last_result = e
+            else:
+                self.last_result = result
+                self.success_flag = retry.check_http_response_success(result)
+            return self.success_flag
     class KeepWriterThread(threading.Thread):
         Write a blob of data to the given Keep server. On success, call
         save_response() of the given ThreadLimiter to save the returned
-        def __init__(self, api_token, **kwargs):
+        def __init__(self, keep_service, **kwargs):
             super(KeepClient.KeepWriterThread, self).__init__()
-            self._api_token = api_token
+            self.service = keep_service
             self.args = kwargs
             self._success = False
@@ -224,51 +293,35 @@ class KeepClient(object):
         def run_with_limiter(self, limiter):
+            if self.service.finished():
+                return
             _logger.debug("KeepWriterThread %s proceeding %s %s",
             h = httplib2.Http(timeout=self.args.get('timeout', None))
-            url = self.args['service_root'] + self.args['data_hash']
-            headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-            if self.args['using_proxy']:
-                # We're using a proxy, so tell the proxy how many copies we
-                # want it to store
-                headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-            try:
-                _logger.debug("Uploading to {}".format(url))
-                resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                          headers=headers,
-                                          body=self.args['data'])
-                if re.match(r'^2\d\d$', resp['status']):
-                    self._success = True
-                    _logger.debug("KeepWriterThread %s succeeded %s %s",
-                                  str(threading.current_thread()),
-                                  self.args['data_hash'],
-                                  self.args['service_root'])
+            self._success = bool(self.service.put(
+                    h, self.args['data_hash'], self.args['data']))
+            status = self.service.last_status()
+            if self._success:
+                resp, body = self.service.last_result
+                _logger.debug("KeepWriterThread %s succeeded %s %s",
+                              str(threading.current_thread()),
+                              self.args['data_hash'],
+                              self.args['service_root'])
+                # Tick the 'done' counter for the number of replica
+                # reported stored by the server, for the case that
+                # we're talking to a proxy or other backend that
+                # stores to multiple copies for us.
+                try:
+                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                except (KeyError, ValueError):
                     replicas_stored = 1
-                    if 'x-keep-replicas-stored' in resp:
-                        # Tick the 'done' counter for the number of replica
-                        # reported stored by the server, for the case that
-                        # we're talking to a proxy or other backend that
-                        # stores to multiple copies for us.
-                        try:
-                            replicas_stored = int(resp['x-keep-replicas-stored'])
-                        except ValueError:
-                            pass
-                    limiter.save_response(content.strip(), replicas_stored)
-                else:
-                    _logger.debug("Request fail: PUT %s => %s %s",
-                                    url, resp['status'], content)
-            except (httplib2.HttpLib2Error,
-                    httplib.HTTPException,
-                    ssl.SSLError) as e:
-                # When using https, timeouts look like ssl.SSLError from here.
-                # "SSLError: The write operation timed out"
-                _logger.debug("Request fail: PUT %s => %s: %s",
-                                url, type(e), str(e))
+                limiter.save_response(body.strip(), replicas_stored)
+            elif status is not None:
+                _logger.debug("Request fail: PUT %s => %s %s",
+                              self.args['data_hash'], status,
+                              self.service.last_result[1])
     def __init__(self, api_client=None, proxy=None, timeout=60,
@@ -323,6 +376,7 @@ class KeepClient(object):
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
+                self.static_service_roots = True
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -332,29 +386,35 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
+                self.static_service_roots = False
-    def shuffled_service_roots(self, hash):
-        if self.service_roots is None:
-            with self.lock:
-                try:
-                    keep_services = self.api_client.keep_services().accessible()
-                except Exception:  # API server predates Keep services.
-                    keep_services = self.api_client.keep_disks().list()
+    def build_service_roots(self, force_rebuild=False):
+        if (self.static_service_roots or
+              (self.service_roots and not force_rebuild)):
+            return
+        with self.lock:
+            try:
+                keep_services = self.api_client.keep_services().accessible()
+            except Exception:  # API server predates Keep services.
+                keep_services = self.api_client.keep_disks().list()
+            keep_services = keep_services.execute().get('items')
+            if not keep_services:
+                raise arvados.errors.NoKeepServersError()
-                keep_services = keep_services.execute().get('items')
-                if not keep_services:
-                    raise arvados.errors.NoKeepServersError()
+            self.using_proxy = (keep_services[0].get('service_type') ==
+                                'proxy')
-                self.using_proxy = (keep_services[0].get('service_type') ==
-                                    'proxy')
+            roots = (("http%s://%s:%d/" %
+                      ('s' if f['service_ssl_flag'] else '',
+                       f['service_host'],
+                       f['service_port']))
+                     for f in keep_services)
+            self.service_roots = sorted(set(roots))
+            _logger.debug(str(self.service_roots))
-                roots = (("http%s://%s:%d/" %
-                          ('s' if f['service_ssl_flag'] else '',
-                           f['service_host'],
-                           f['service_port']))
-                         for f in keep_services)
-                self.service_roots = sorted(set(roots))
-                _logger.debug(str(self.service_roots))
+    def shuffled_service_roots(self, hash, force_rebuild=False):
+        self.build_service_roots(force_rebuild)
         # Build an ordering with which to query the Keep servers based on the
         # contents of the hash.
@@ -454,113 +514,170 @@ class KeepClient(object):
-    def get(self, loc_s):
+    def add_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+        # roots_map is a dictionary, mapping Keep service root strings
+        # to KeepService objects.  Poll for Keep services, and add any
+        # new ones to roots_map.  Return the current list of local
+        # root strings.
+        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+        local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+        for root in local_roots:
+            if root not in roots_map:
+                roots_map[root] = self.KeepService(root, **headers)
+        return local_roots
+    def get(self, loc_s, num_retries=0):
+        """Get data from Keep.
+        This method fetches one or more blocks of data from Keep.  It
+        sends a request each Keep service registered with the API
+        server (or the proxy provided when this client was
+        instantiated), then each service named in location hints, in
+        sequence.  As soon as one service provides the data, it's
+        returned.
+        Arguments:
+        * loc_s: A string of one or more comma-separated locators to fetch.
+          This method returns the concatenation of these blocks.
+        * num_retries: The number of times to retry GET requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  Note that, in each loop, the method may try
+          to fetch data from every available Keep service, along with any
+          that are named in location hints in the locator.  Default 0.
+        """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
         slot, first = self.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
             return v
-        try:
-            for service_root in self.shuffled_service_roots(expect_hash):
-                url = service_root + loc_s
-                headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
-                           'Accept': 'application/octet-stream'}
-                blob = self.get_url(url, headers, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-            for hint in locator.hints:
-                if not hint.startswith('K@'):
-                    continue
-                url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
-                blob = self.get_url(url, {}, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-        except:
-            slot.set(None)
-            self.cap_cache()
-            raise
-        slot.set(None)
+        # See #3147 for a discussion of the loop implementation.  Highlights:
+        # * Refresh the list of Keep services after each failure, in case
+        #   it's being updated.
+        # * Retry until we succeed, we're out of retries, or every available
+        #   service has returned permanent failure.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@')]
+        # Map root URLs their KeepService objects.
+        roots_map = {root: self.KeepService(root) for root in hint_roots}
+        blob = None
+        loop = retry.HTTPRetryLoop(num_retries,
+                                   lambda r: None if blob is None else True)
+        for tries_left in loop:
+            try:
+                local_roots = self.add_new_services(
+                    roots_map, expect_hash,
+                    force_rebuild=(tries_left < num_retries))
+            except Exception as error:
+                loop.save_result(error)
+                continue
+            # Build an ordered list of KeepService objects that haven't
+            # returned permanent failure.
+            services_to_try = [roots_map[root]
+                               for root in (local_roots + hint_roots)
+                               if roots_map[root].usable()]
+            if not services_to_try:
+                _logger.info(
+                    "All Keep services for %s have permafailed; giving up",
+                    loc_s)
+                break
+            http = httplib2.Http(timeout=self.timeout)
+            for keep_service in services_to_try:
+                blob = keep_service.get(http, locator)
+                if blob is not None:
+                    loop.save_result(blob)
+                    break
+        # Always cache the result, then return it if we succeeded.
+        slot.set(blob)
-        raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+        if loop.success():
+            return blob
+        # No servers fulfilled the request.  Count how many responded
+        # "not found;" if the ratio is high enough (currently 75%), report
+        # Not Found; otherwise a generic error.
+        not_founds = sum(1 for ks in roots_map.values()
+                         if ks.last_status() in set([403, 404, 410]))
+        if (float(not_founds) / len(roots_map)) >= .75:
+            raise arvados.errors.NotFoundError(loc_s)
+        else:
+            raise arvados.errors.KeepReadError(loc_s)
-    def get_url(self, url, headers, expect_hash):
-        h = httplib2.Http()
-        try:
-            _logger.debug("Request: GET %s", url)
-            with timer.Timer() as t:
-                resp, content = h.request(url.encode('utf-8'), 'GET',
-                                          headers=headers)
-            _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
-                         len(content), t.msecs,
-                         (len(content)/(1024*1024))/t.secs)
-            if re.match(r'^2\d\d$', resp['status']):
-                md5 = hashlib.md5(content).hexdigest()
-                if md5 == expect_hash:
-                    return content
-                _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
-        except Exception as e:
-            _logger.debug("Request fail: GET %s => %s: %s",
-                         url, type(e), str(e))
-        return None
-    def put(self, data, copies=2):
+    def put(self, data, copies=2, num_retries=0):
+        """Save data in Keep.
+        This method will get a list of Keep services from the API server, and
+        send the data to each one simultaneously in a new thread.  Once the
+        uploads are finished, if enough copies are saved, this method returns
+        the most recent HTTP response body.  If requests fail to upload
+        enough copies, this method raises KeepWriteError.
+        Arguments:
+        * data: The string of data to upload.
+        * copies: The number of copies that the user requires be saved.
+          Default 2.
+        * num_retries: The number of times to retry PUT requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  Default 0.
+        """
         data_hash = hashlib.md5(data).hexdigest()
-        have_copies = 0
-        want_copies = copies
-        if not (want_copies > 0):
+        if copies < 1:
             return data_hash
-        threads = []
-        thread_limiter = KeepClient.ThreadLimiter(want_copies)
-        for service_root in self.shuffled_service_roots(data_hash):
-            t = KeepClient.KeepWriterThread(
-                self.api_token,
-                data=data,
-                data_hash=data_hash,
-                service_root=service_root,
-                thread_limiter=thread_limiter,
-                timeout=self.timeout,
-                using_proxy=self.using_proxy,
-                want_copies=(want_copies if self.using_proxy else 1))
-            t.start()
-            threads += [t]
-        for t in threads:
-            t.join()
-        if thread_limiter.done() < want_copies:
-            # Retry the threads (i.e., services) that failed the first
-            # time around.
-            threads_retry = []
+        headers = {}
+        if self.using_proxy:
+            # We're using a proxy, so tell the proxy how many copies we
+            # want it to store
+            headers['X-Keep-Desired-Replication'] = str(copies)
+        roots_map = {}
+        thread_limiter = KeepClient.ThreadLimiter(copies)
+        loop = retry.HTTPRetryLoop(
+            num_retries,
+            lambda r: True if (thread_limiter.done() >= copies) else None)
+        for tries_left in loop:
+            try:
+                local_roots = self.add_new_services(
+                    roots_map, data_hash,
+                    force_rebuild=(tries_left < num_retries), **headers)
+            except Exception as error:
+                loop.save_result(error)
+                continue
+            threads = []
+            for service_root, ks in roots_map.iteritems():
+                if ks.finished():
+                    continue
+                t = KeepClient.KeepWriterThread(
+                    ks,
+                    data=data,
+                    data_hash=data_hash,
+                    service_root=service_root,
+                    thread_limiter=thread_limiter,
+                    timeout=self.timeout)
+                t.start()
+                threads.append(t)
+            if not threads:
+                _logger.info(
+                    "All Keep services for %s have finished; giving up",
+                    data_hash)
+                break
             for t in threads:
-                if not t.success():
-                    _logger.debug("Retrying: PUT %s %s",
-                                    t.args['service_root'],
-                                    t.args['data_hash'])
-                    retry_with_args = t.args.copy()
-                    t_retry = KeepClient.KeepWriterThread(self.api_token,
-                                                          **retry_with_args)
-                    t_retry.start()
-                    threads_retry += [t_retry]
-            for t in threads_retry:
-        have_copies = thread_limiter.done()
-        # If we're done, return the response from Keep
-        if have_copies >= want_copies:
+            loop.save_result(None)
+        if loop.success():
             return thread_limiter.response()
         raise arvados.errors.KeepWriteError(
             "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, want_copies, have_copies))
+            (data_hash, copies, thread_limiter.done()))
     def local_store_put(self, data):
         md5 = hashlib.md5(data).hexdigest()
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 6198919..3476069 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,12 +1,11 @@
-# usage example:
-# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
+import mock
 import os
 import unittest
 import arvados
+import arvados.retry
 import run_test_server
+from arvados_testutil import fake_httplib2_response
 class KeepTestCase(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
@@ -219,3 +218,108 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
                          'wrong content from Keep.get(md5("baz2"))')
+class KeepClientRetryTestMixin(object):
+    # Testing with a local Keep store won't exercise the retry behavior.
+    # Instead, our strategy is:
+    # * Create a client with one proxy specified (pointed at a black
+    #   hole), so there's no need to instantiate an API client, and
+    #   all HTTP requests come from one place.
+    # * Mock httplib's request method to provide simulated responses.
+    # This lets us test the retry logic extensively without relying on any
+    # supporting servers, and prevents side effects in case something hiccups.
+    # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
+    # run_method().
+    PROXY_ADDR = 'http://[100::]/'
+    TEST_DATA = 'testdata'
+    TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
+    @staticmethod
+    def mock_responses(body, *codes):
+        return mock.patch('httplib2.Http.request', side_effect=(
+                (fake_httplib2_response(code), body) for code in codes))
+    def new_client(self):
+        return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+    def run_method(self, *args, **kwargs):
+        raise NotImplementedError("test subclasses must define run_method")
+    def check_success(self, expected=None, *args, **kwargs):
+        if expected is None:
+            expected = self.DEFAULT_EXPECT
+        self.assertEqual(expected, self.run_method(*args, **kwargs))
+    def check_exception(self, error_class=None, *args, **kwargs):
+        if error_class is None:
+            error_class = self.DEFAULT_EXCEPTION
+        self.assertRaises(error_class, self.run_method, *args, **kwargs)
+    def test_immediate_success(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+            self.check_success()
+    def test_retry_then_success(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+            self.check_success(num_retries=3)
+    def test_no_default_retry(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+            self.check_exception()
+    def test_no_retry_after_permanent_error(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+            self.check_exception(num_retries=3)
+    def test_error_after_retries_exhausted(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+            self.check_exception(num_retries=1)
+# Don't delay from HTTPRetryLoop's exponential backoff.
+no_backoff = mock.patch('time.sleep', lambda n: None)
+ at no_backoff
+class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
+    DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+    def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
+                   *args, **kwargs):
+        return self.new_client().get(locator, *args, **kwargs)
+    def test_specific_exception_when_not_found(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+            self.check_exception(arvados.errors.NotFoundError, num_retries=3)
+    def test_general_exception_with_mixed_errors(self):
+        # get should raise a NotFoundError if no server returns the block,
+        # and a high threshold of servers report that it's not found.
+        # This test rigs up 50/50 disagreement between two servers, and
+        # checks that it does not become a NotFoundError.
+        client = self.new_client()
+        client.service_roots = [self.PROXY_ADDR, self.PROXY_ADDR]
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+            with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
+                client.get(self.TEST_LOCATOR)
+            self.assertNotIsInstance(
+                exc_check.exception, arvados.errors.NotFoundError,
+                "mixed errors raised NotFoundError")
+    def test_hint_server_can_succeed_without_retries(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+            self.check_success(locator=self.TEST_LOCATOR + '+K at xyzzy')
+ at no_backoff
+class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
+    DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
+    def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
+                   copies=1, *args, **kwargs):
+        return self.new_client().put(data, copies, *args, **kwargs)
+    def test_do_not_send_multiple_copies_to_same_server(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+            self.check_exception(copies=2, num_retries=3)

commit 5632bb3b451b0b7b624a55160c0ab2f2f287a373
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Aug 22 15:57:36 2014 -0400

    3147: Remove old Keep signing support from Python SDK.
    Per Tom.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 561d34c..909ee1f 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -242,15 +242,6 @@ class KeepClient(object):
                 resp, content = h.request(url.encode('utf-8'), 'PUT',
-                if (resp['status'] == '401' and
-                    re.match(r'Timestamp verification failed', content)):
-                    body = KeepClient.sign_for_old_server(
-                        self.args['data_hash'],
-                        self.args['data'])
-                    h = httplib2.Http(timeout=self.args.get('timeout', None))
-                    resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                              headers=headers,
-                                              body=body)
                 if re.match(r'^2\d\d$', resp['status']):
                     self._success = True
                     _logger.debug("KeepWriterThread %s succeeded %s %s",
@@ -570,9 +561,6 @@ class KeepClient(object):
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, want_copies, have_copies))
-    @staticmethod
-    def sign_for_old_server(data_hash, data):
-        return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
     def local_store_put(self, data):
         md5 = hashlib.md5(data).hexdigest()

commit 51334e82aee44732dc6857ffd7c9a0735f718cfd
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Aug 21 15:52:08 2014 -0400

    3147: Add HTTPRetryLoop to the Python SDK.

diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 6670e66..f5edc75 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -1,5 +1,7 @@
 #!/usr/bin/env python
+import time
 from collections import deque
 import arvados.errors
@@ -124,3 +126,34 @@ def check_http_response_success(result):
         return False
         return None  # Get well soon, server.
+class HTTPRetryLoop(RetryLoop):
+    """Coordinate limited retries of HTTP requests.
+    This RetryLoop uses check_http_response_success as the default
+    success check, and provides exponential backoff between
+    iterations.
+    """
+    def __init__(self, num_retries, success_check=check_http_response_success,
+                 backoff_start=1, backoff_growth=2, save_results=1):
+        """Construct an HTTPRetryLoop.
+        New arguments (see RetryLoop for others):
+        * backoff_start: The number of seconds that must pass before the
+          loop's second iteration.  Default 1.
+        * backoff_growth: The wait time multiplier after each iteration.
+          Default 2 (i.e., double the wait time each time).
+        """
+        self.backoff_wait = backoff_start
+        self.backoff_growth = backoff_growth
+        self.next_start_time = 0
+        super(HTTPRetryLoop, self).__init__(num_retries, success_check,
+                                            save_results)
+    def next(self):
+        if self.running() and (self.tries_left > 0):
+            wait_time = max(0, self.next_start_time - time.time())
+            time.sleep(wait_time)
+            self.backoff_wait *= self.backoff_growth
+        self.next_start_time = time.time() + self.backoff_wait
+        return super(HTTPRetryLoop, self).next()
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 2c51e8f..30cc779 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -32,5 +32,5 @@ setup(name='arvados-python-client',
-      tests_require=['PyYAML'],
+      tests_require=['mock', 'PyYAML'],
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index 9984f09..d6e3d62 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -5,6 +5,7 @@ import unittest
 import arvados.errors as arv_error
 import arvados.retry as arv_retry
+import mock
 from arvados_testutil import fake_httplib2_response
@@ -93,6 +94,81 @@ class RetryLoopTestCase(unittest.TestCase):
         self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+ at mock.patch('time.sleep')
+class HTTPRetryLoopTestCase(unittest.TestCase):
+    def run_loop(self, num_retries, *codes, **kwargs):
+        responses = itertools.chain(
+            ((fake_httplib2_response(c), str(c)) for c in codes),
+            itertools.repeat((None, None)))
+        retrier = arv_retry.HTTPRetryLoop(num_retries, **kwargs)
+        for tries_left, response in itertools.izip(retrier, responses):
+            retrier.save_result(response)
+        return retrier
+    def check_result(self, retrier, expect_success, last_status,
+                     sleep_mock, sleep_count):
+        self.assertIs(retrier.success(), expect_success,
+                      "loop success flag is incorrect")
+        self.assertEqual(str(last_status), retrier.last_result()[1],
+                         "wrong loop result")
+        self.assertEqual(sleep_count, sleep_mock.call_count,
+                         "loop did not back off correctly")
+    def sleep_times(self, sleep_mock):
+        return (arglist[0][0] for arglist in sleep_mock.call_args_list)
+    def check_backoff_growth(self, sleep_mock, multiplier=1):
+        check = (self.assertGreater if (multiplier == 1)
+                 else self.assertGreaterEqual)
+        sleep_times = self.sleep_times(sleep_mock)
+        last_wait = next(sleep_times)
+        for this_wait in sleep_times:
+            check(this_wait, last_wait * multiplier,
+                  "loop did not grow backoff times correctly")
+            last_wait = this_wait
+    def test_no_retries_and_success(self, sleep_mock):
+        retrier = self.run_loop(0, 200)
+        self.check_result(retrier, True, 200, sleep_mock, 0)
+    def test_no_retries_and_tempfail(self, sleep_mock):
+        retrier = self.run_loop(0, 500, 200)
+        self.check_result(retrier, None, 500, sleep_mock, 0)
+    def test_no_retries_and_permfail(self, sleep_mock):
+        retrier = self.run_loop(0, 400, 200)
+        self.check_result(retrier, False, 400, sleep_mock, 0)
+    def test_retries_with_immediate_success(self, sleep_mock):
+        retrier = self.run_loop(3, 200, 500, 500)
+        self.check_result(retrier, True, 200, sleep_mock, 0)
+    def test_retries_with_delayed_success(self, sleep_mock):
+        retrier = self.run_loop(3, 500, 500, 200, 502)
+        self.check_result(retrier, True, 200, sleep_mock, 2)
+        self.check_backoff_growth(sleep_mock)
+    def test_retries_then_permfail(self, sleep_mock):
+        retrier = self.run_loop(3, 500, 404, 200, 200)
+        self.check_result(retrier, False, 404, sleep_mock, 1)
+    def test_retries_all_tempfail(self, sleep_mock):
+        retrier = self.run_loop(3, 502, 502, 502, 500, 200)
+        self.check_result(retrier, None, 500, sleep_mock, 3)
+        self.check_backoff_growth(sleep_mock)
+    def test_backoff_parameters(self, sleep_mock):
+        with mock.patch('time.time', side_effects=itertools.count):
+            self.run_loop(3, 500, 500, 500, 500,
+                          backoff_start=5, backoff_growth=10)
+        self.check_backoff_growth(sleep_mock, 10)
+    def test_custom_success_check(self, mock):
+        retrier = self.run_loop(3, 200, 777, 201, 202, 203,
+                                success_check=lambda r: r[1] == '777' or None)
+        self.check_result(retrier, True, 777, mock, 1)
 class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
     def results_map(self, *codes):
         for code in codes:

commit aead13393f781ef2966e6e4133a57eec5a626597
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Aug 21 14:37:02 2014 -0400

    3147: Add check_http_response_success to Python SDK.
    Other parts of the SDK need to end loops based on the result of an
    HTTP request.  This function puts that logic in one place.

diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 1254371..6670e66 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -4,6 +4,9 @@ from collections import deque
 import arvados.errors
+_HTTP_SUCCESSES = set(xrange(200, 300))
+_HTTP_CAN_RETRY = set([408, 409, 422, 423, 500, 502, 503, 504])
 class RetryLoop(object):
     """Coordinate limited retries of code.
@@ -88,3 +91,36 @@ class RetryLoop(object):
         except IndexError:
             raise arvados.errors.AssertionError(
                 "queried loop results before any were recorded")
+def check_http_response_success(result):
+    """Convert an httplib2 request result to a loop control flag.
+    Pass this method the 2-tuple returned by httplib2.Http.request.  It
+    returns True if the response indicates success, None if it indicates
+    temporary failure, and False otherwise.  You can use this as the
+    success_check for a RetryLoop.
+    Implementation details:
+    * Any 2xx result returns True.
+    * A select few status codes, or any malformed responses, return None.
+      422 Unprocessable Entity is in this category.  This may not meet the
+      letter of the HTTP specification, but the Arvados API server will
+      use it for various server-side problems like database connection
+      errors.
+    * Everything else returns False.  Note that this includes 1xx and
+      3xx status codes.  They don't indicate success, and you can't
+      retry those requests verbatim.
+    """
+    try:
+        status = int(result[0].status)
+    except Exception:
+        return None
+    if status in _HTTP_SUCCESSES:
+        return True
+    elif status in _HTTP_CAN_RETRY:
+        return None
+    elif 100 <= status < 600:
+        return False
+    else:
+        return None  # Get well soon, server.
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index fd68994..9984f09 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -6,6 +6,8 @@ import unittest
 import arvados.errors as arv_error
 import arvados.retry as arv_retry
+from arvados_testutil import fake_httplib2_response
 class RetryLoopTestCase(unittest.TestCase):
     def loop_success(result):
@@ -91,5 +93,52 @@ class RetryLoopTestCase(unittest.TestCase):
         self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
+    def results_map(self, *codes):
+        for code in codes:
+            response = (fake_httplib2_response(code), None)
+            yield code, arv_retry.check_http_response_success(response)
+    def check(assert_name):
+        def check_method(self, expected, *codes):
+            assert_func = getattr(self, assert_name)
+            for code, actual in self.results_map(*codes):
+                assert_func(expected, actual,
+                            "{} status flagged {}".format(code, actual))
+                if assert_name != 'assertIs':
+                    self.assertTrue(
+                        actual is True or actual is False or actual is None,
+                        "{} status returned {}".format(code, actual))
+        return check_method
+    check_is = check('assertIs')
+    check_is_not = check('assertIsNot')
+    def test_obvious_successes(self):
+        self.check_is(True, *range(200, 207))
+    def test_obvious_stops(self):
+        self.check_is(False, 424, 426, 428, 431,
+                      *range(400, 408) + range(410, 420))
+    def test_obvious_retries(self):
+        self.check_is(None, 500, 502, 503, 504)
+    def test_4xx_retries(self):
+        self.check_is(None, 408, 409, 422, 423)
+    def test_5xx_failures(self):
+        self.check_is(False, 501, *range(505, 512))
+    def test_1xx_not_retried(self):
+        self.check_is_not(None, 100, 101)
+    def test_redirects_not_retried(self):
+        self.check_is_not(None, *range(300, 309))
+    def test_wacky_code_retries(self):
+        self.check_is(None, 0, 99, 600, -200)
 if __name__ == '__main__':

commit a4359d15330c352ae0727fb326507a53c9ad0989
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Aug 21 13:24:39 2014 -0400

    3147: Add RetryLoop to the Python SDK.
    This provides a general-purpose mechanism for us to retry all kinds of

diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
new file mode 100644
index 0000000..1254371
--- /dev/null
+++ b/sdk/python/arvados/retry.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+from collections import deque
+import arvados.errors
+class RetryLoop(object):
+    """Coordinate limited retries of code.
+    RetryLoop coordinates a loop that runs until it records a
+    successful result or tries too many times, whichever comes first.
+    Typical use looks like:
+        loop = RetryLoop(num_retries=2)
+        for tries_left in loop:
+            try:
+                result = do_something()
+            except TemporaryError as error:
+                log("error: {} ({} tries left)".format(error, tries_left))
+            else:
+                loop.save_result(result)
+        if loop.success():
+            return loop.last_result()
+    """
+    def __init__(self, num_retries, success_check=lambda r: True,
+                 save_results=1):
+        """Construct a new RetryLoop.
+        Arguments:
+        * num_retries: The maximum number of times to retry the loop if it
+          doesn't succeed.  This means the loop could run at most 1+N times.
+        * success_check: This is a function that will be called each
+          time the loop saves a result.  The function should return
+          True if the result indicates loop success, False if it
+          represents a permanent failure state, and None if the loop
+          should continue.  If no function is provided, the loop will
+          end as soon as it records any result.
+        * save_results: Specify a number to save the last N results
+          that the loop recorded.  These records are available through
+          the results attribute, oldest first.  Default 1.
+        """
+        self.tries_left = num_retries + 1
+        self.check_result = success_check
+        self.results = deque(maxlen=save_results)
+        self._running = None
+        self._success = None
+    def __iter__(self):
+        return self
+    def running(self):
+        return self._running and (self._success is None)
+    def next(self):
+        if self._running is None:
+            self._running = True
+        if (self.tries_left < 1) or not self.running():
+            self._running = False
+            raise StopIteration
+        self.tries_left -= 1
+        return self.tries_left
+    def save_result(self, result):
+        """Record a loop result.
+        Save the given result, and end the loop if it indicates
+        success or permanent failure.  See __init__'s documentation
+        about success_check to learn how to make that indication.
+        """
+        if not self.running():
+            raise arvados.errors.AssertionError(
+                "recorded a loop result after the loop finished")
+        self.results.append(result)
+        self._success = self.check_result(result)
+    def success(self):
+        """Return the loop's end state.
+        Returns True if the loop obtained a successful result, False if it
+        encountered permanent failure, or else None.
+        """
+        return self._success
+    def last_result(self):
+        """Return the most recent result the loop recorded."""
+        try:
+            return self.results[-1]
+        except IndexError:
+            raise arvados.errors.AssertionError(
+                "queried loop results before any were recorded")
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
new file mode 100644
index 0000000..fd68994
--- /dev/null
+++ b/sdk/python/tests/test_retry.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+import itertools
+import unittest
+import arvados.errors as arv_error
+import arvados.retry as arv_retry
+class RetryLoopTestCase(unittest.TestCase):
+    @staticmethod
+    def loop_success(result):
+        # During the tests, we use integers that look like HTTP status
+        # codes as loop results.  Then we define simplified HTTP
+        # heuristics here to decide whether the result is success (True),
+        # permanent failure (False), or temporary failure (None).
+        if result < 400:
+            return True
+        elif result < 500:
+            return False
+        else:
+            return None
+    def run_loop(self, num_retries, *results):
+        responses = itertools.chain(results, itertools.repeat(None))
+        retrier = arv_retry.RetryLoop(num_retries, self.loop_success)
+        for tries_left, response in itertools.izip(retrier, responses):
+            retrier.save_result(response)
+        return retrier
+    def check_result(self, retrier, expect_success, last_code):
+        self.assertIs(retrier.success(), expect_success,
+                      "loop success flag is incorrect")
+        self.assertEqual(last_code, retrier.last_result())
+    def test_zero_retries_and_success(self):
+        retrier = self.run_loop(0, 200)
+        self.check_result(retrier, True, 200)
+    def test_zero_retries_and_tempfail(self):
+        retrier = self.run_loop(0, 500, 501)
+        self.check_result(retrier, None, 500)
+    def test_zero_retries_and_permfail(self):
+        retrier = self.run_loop(0, 400, 201)
+        self.check_result(retrier, False, 400)
+    def test_one_retry_with_immediate_success(self):
+        retrier = self.run_loop(1, 200, 201)
+        self.check_result(retrier, True, 200)
+    def test_one_retry_with_delayed_success(self):
+        retrier = self.run_loop(1, 500, 201)
+        self.check_result(retrier, True, 201)
+    def test_one_retry_with_no_success(self):
+        retrier = self.run_loop(1, 500, 501, 502)
+        self.check_result(retrier, None, 501)
+    def test_one_retry_but_permfail(self):
+        retrier = self.run_loop(1, 400, 201)
+        self.check_result(retrier, False, 400)
+    def test_two_retries_with_immediate_success(self):
+        retrier = self.run_loop(2, 200, 201, 202)
+        self.check_result(retrier, True, 200)
+    def test_two_retries_with_success_after_one(self):
+        retrier = self.run_loop(2, 500, 201, 502)
+        self.check_result(retrier, True, 201)
+    def test_two_retries_with_success_after_two(self):
+        retrier = self.run_loop(2, 500, 501, 202, 503)
+        self.check_result(retrier, True, 202)
+    def test_two_retries_with_no_success(self):
+        retrier = self.run_loop(2, 500, 501, 502, 503)
+        self.check_result(retrier, None, 502)
+    def test_two_retries_with_permfail(self):
+        retrier = self.run_loop(2, 500, 401, 202)
+        self.check_result(retrier, False, 401)
+    def test_save_result_before_start_is_error(self):
+        retrier = arv_retry.RetryLoop(0)
+        self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+    def test_save_result_after_end_is_error(self):
+        retrier = arv_retry.RetryLoop(0)
+        for count in retrier:
+            pass
+        self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+if __name__ == '__main__':
+    unittest.main()

commit 3e24c54403f877a969bb4c3b439e393a16e287d9
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Aug 21 11:13:23 2014 -0400

    3147: Move fake Python HTTP response generation to arvados_testutil.
    Other tests can use this functionality.

diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 369d561..77146db 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -1,11 +1,18 @@
 #!/usr/bin/env python
 import errno
+import httplib
+import httplib2
 import os
 import shutil
 import tempfile
 import unittest
+def fake_httplib2_response(code, **headers):
+    headers.update(status=str(code),
+                   reason=httplib.responses.get(code, "Unknown Response"))
+    return httplib2.Response(headers)
 class ArvadosBaseTestCase(unittest.TestCase):
     # This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_api.py b/sdk/python/tests/test_api.py
index 2fd709e..e9cb838 100644
--- a/sdk/python/tests/test_api.py
+++ b/sdk/python/tests/test_api.py
@@ -10,22 +10,17 @@ import run_test_server
 import unittest
 from apiclient.http import RequestMockBuilder
-from httplib import responses as HTTP_RESPONSES
+from arvados_testutil import fake_httplib2_response
 if not mimetypes.inited:
 class ArvadosApiClientTest(unittest.TestCase):
-    @classmethod
-    def response_from_code(cls, code):
-        return httplib2.Response(
-            {'status': code,
-             'reason': HTTP_RESPONSES.get(code, "Unknown Response"),
-             'Content-Type': mimetypes.types_map['.json']})
+    ERROR_HEADERS = {'Content-Type': mimetypes.types_map['.json']}
     def api_error_response(cls, code, *errors):
-        return (cls.response_from_code(code),
+        return (fake_httplib2_response(code, **cls.ERROR_HEADERS),
                 json.dumps({'errors': errors,
                             'error_token': '1234567890+12345678'}))
@@ -38,7 +33,9 @@ class ArvadosApiClientTest(unittest.TestCase):
         # FIXME: Figure out a better way to stub this out.
         mock_responses = {
-            'arvados.humans.delete': (cls.response_from_code(500), ""),
+            'arvados.humans.delete': (
+                fake_httplib2_response(500, **cls.ERROR_HEADERS),
+                ""),
             'arvados.humans.get': cls.api_error_response(
                 422, "Bad UUID format", "Bad output format"),
             'arvados.humans.list': (None, json.dumps(



More information about the arvados-commits mailing list