[ARVADOS] updated: 728ee9775de422840b81827c71c7b515708620a3
    git at public.curoverse.com 
    git at public.curoverse.com
       
    Tue Aug 26 11:30:43 EDT 2014
    
    
  
Summary of changes:
 sdk/python/arvados/errors.py         |   6 +-
 sdk/python/arvados/keep.py           | 431 ++++++++++++++++++++++-------------
 sdk/python/arvados/retry.py          | 140 ++++++++++++
 sdk/python/setup.py                  |   2 +-
 sdk/python/tests/arvados_testutil.py |   7 +
 sdk/python/tests/test_api.py         |  15 +-
 sdk/python/tests/test_keep_client.py | 112 ++++++++-
 sdk/python/tests/test_retry.py       | 198 ++++++++++++++++
 8 files changed, 735 insertions(+), 176 deletions(-)
 create mode 100644 sdk/python/arvados/retry.py
 create mode 100644 sdk/python/tests/test_retry.py
       via  728ee9775de422840b81827c71c7b515708620a3 (commit)
       via  aa1d003d5ab199498da50c231be3c9219546a692 (commit)
       via  5a043a14a8d321adeb1fc14c9cd6f479f8ea8216 (commit)
       via  dccec0cc48025830ac68cbb4a80f5ae73ace3fa7 (commit)
       via  7f5a0cafa4cc3d3155830243ab9df204b3b48a03 (commit)
       via  adbeb30cc27fa56a4550e28e1fd5e8ec4a1c7836 (commit)
       via  440d863f1643587d37bd21548410f13b655ac021 (commit)
      from  8cba6b1dd7307bb47c36a51c7aea8bfa78e76c8d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 728ee9775de422840b81827c71c7b515708620a3
Merge: 8cba6b1 aa1d003
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Aug 26 11:32:10 2014 -0400
    Merge branch '3147-pysdk-retries'
    
    Closes #3147.
commit aa1d003d5ab199498da50c231be3c9219546a692
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Aug 25 20:58:37 2014 -0400
    3147: Fix unwanted integer division in KeepClient logs.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e75d64e..651cec4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -243,9 +243,9 @@ class KeepClient(object):
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
                 content = result[1]
-                _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
+                _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
-                             (len(content)/(1024*1024))/t.secs)
+                             (len(content)/(1024.0*1024))/t.secs)
                 if self.success_flag:
                     resp_md5 = hashlib.md5(content).hexdigest()
                     if resp_md5 == locator.md5sum:
commit 5a043a14a8d321adeb1fc14c9cd6f479f8ea8216
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Aug 22 13:38:40 2014 -0400
    3147: Add retry support to Python SDK's KeepClient.
diff --git a/sdk/python/arvados/errors.py b/sdk/python/arvados/errors.py
index 1d9c778..89910aa 100644
--- a/sdk/python/arvados/errors.py
+++ b/sdk/python/arvados/errors.py
@@ -17,12 +17,14 @@ class SyntaxError(Exception):
     pass
 class AssertionError(Exception):
     pass
-class NotFoundError(Exception):
-    pass
 class CommandFailedError(Exception):
     pass
+class KeepReadError(Exception):
+    pass
 class KeepWriteError(Exception):
     pass
+class NotFoundError(KeepReadError):
+    pass
 class NotImplementedError(Exception):
     pass
 class NoKeepServersError(Exception):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 909ee1f..e75d64e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,6 +27,7 @@ global_client_object = None
 import arvados
 import arvados.config as config
 import arvados.errors
+import arvados.retry as retry
 import arvados.util
 
 class KeepLocator(object):
@@ -200,15 +201,83 @@ class KeepClient(object):
                 return self._done
 
 
+    class KeepService(object):
+        # Make requests to a single Keep service, and track results.
+        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+                       ssl.SSLError)
+
+        def __init__(self, root, **headers):
+            self.root = root
+            self.last_result = None
+            self.success_flag = None
+            self.get_headers = {'Accept': 'application/octet-stream'}
+            self.get_headers.update(headers)
+            self.put_headers = headers
+
+        def usable(self):
+            return self.success_flag is not False
+
+        def finished(self):
+            return self.success_flag is not None
+
+        def last_status(self):
+            try:
+                return int(self.last_result[0].status)
+            except (AttributeError, IndexError, ValueError):
+                return None
+
+        def get(self, http, locator):
+            # http is an httplib2.Http object.
+            # locator is a KeepLocator object.
+            url = self.root + str(locator)
+            _logger.debug("Request: GET %s", url)
+            try:
+                with timer.Timer() as t:
+                    result = http.request(url.encode('utf-8'), 'GET',
+                                          headers=self.get_headers)
+            except self.HTTP_ERRORS as e:
+                _logger.debug("Request fail: GET %s => %s: %s",
+                              url, type(e), str(e))
+                self.last_result = e
+            else:
+                self.last_result = result
+                self.success_flag = retry.check_http_response_success(result)
+                content = result[1]
+                _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
+                             self.last_status(), len(content), t.msecs,
+                             (len(content)/(1024*1024))/t.secs)
+                if self.success_flag:
+                    resp_md5 = hashlib.md5(content).hexdigest()
+                    if resp_md5 == locator.md5sum:
+                        return content
+                    _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+            return None
+
+        def put(self, http, hash_s, body):
+            url = self.root + hash_s
+            _logger.debug("Request: PUT %s", url)
+            try:
+                result = http.request(url.encode('utf-8'), 'PUT',
+                                      headers=self.put_headers, body=body)
+            except self.HTTP_ERRORS as e:
+                _logger.debug("Request fail: PUT %s => %s: %s",
+                              url, type(e), str(e))
+                self.last_result = e
+            else:
+                self.last_result = result
+                self.success_flag = retry.check_http_response_success(result)
+            return self.success_flag
+
+
     class KeepWriterThread(threading.Thread):
         """
         Write a blob of data to the given Keep server. On success, call
         save_response() of the given ThreadLimiter to save the returned
         locator.
         """
-        def __init__(self, api_token, **kwargs):
+        def __init__(self, keep_service, **kwargs):
             super(KeepClient.KeepWriterThread, self).__init__()
-            self._api_token = api_token
+            self.service = keep_service
             self.args = kwargs
             self._success = False
 
@@ -224,51 +293,35 @@ class KeepClient(object):
                 self.run_with_limiter(limiter)
 
         def run_with_limiter(self, limiter):
+            if self.service.finished():
+                return
             _logger.debug("KeepWriterThread %s proceeding %s %s",
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
             h = httplib2.Http(timeout=self.args.get('timeout', None))
-            url = self.args['service_root'] + self.args['data_hash']
-            headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-
-            if self.args['using_proxy']:
-                # We're using a proxy, so tell the proxy how many copies we
-                # want it to store
-                headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
-            try:
-                _logger.debug("Uploading to {}".format(url))
-                resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                          headers=headers,
-                                          body=self.args['data'])
-                if re.match(r'^2\d\d$', resp['status']):
-                    self._success = True
-                    _logger.debug("KeepWriterThread %s succeeded %s %s",
-                                  str(threading.current_thread()),
-                                  self.args['data_hash'],
-                                  self.args['service_root'])
+            self._success = bool(self.service.put(
+                    h, self.args['data_hash'], self.args['data']))
+            status = self.service.last_status()
+            if self._success:
+                resp, body = self.service.last_result
+                _logger.debug("KeepWriterThread %s succeeded %s %s",
+                              str(threading.current_thread()),
+                              self.args['data_hash'],
+                              self.args['service_root'])
+                # Tick the 'done' counter for the number of replica
+                # reported stored by the server, for the case that
+                # we're talking to a proxy or other backend that
+                # stores to multiple copies for us.
+                try:
+                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                except (KeyError, ValueError):
                     replicas_stored = 1
-                    if 'x-keep-replicas-stored' in resp:
-                        # Tick the 'done' counter for the number of replica
-                        # reported stored by the server, for the case that
-                        # we're talking to a proxy or other backend that
-                        # stores to multiple copies for us.
-                        try:
-                            replicas_stored = int(resp['x-keep-replicas-stored'])
-                        except ValueError:
-                            pass
-                    limiter.save_response(content.strip(), replicas_stored)
-                else:
-                    _logger.debug("Request fail: PUT %s => %s %s",
-                                    url, resp['status'], content)
-            except (httplib2.HttpLib2Error,
-                    httplib.HTTPException,
-                    ssl.SSLError) as e:
-                # When using https, timeouts look like ssl.SSLError from here.
-                # "SSLError: The write operation timed out"
-                _logger.debug("Request fail: PUT %s => %s: %s",
-                                url, type(e), str(e))
+                limiter.save_response(body.strip(), replicas_stored)
+            elif status is not None:
+                _logger.debug("Request fail: PUT %s => %s %s",
+                              self.args['data_hash'], status,
+                              self.service.last_result[1])
 
 
     def __init__(self, api_client=None, proxy=None, timeout=60,
@@ -323,6 +376,7 @@ class KeepClient(object):
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
+                self.static_service_roots = True
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -332,29 +386,35 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
+                self.static_service_roots = False
 
-    def shuffled_service_roots(self, hash):
-        if self.service_roots is None:
-            with self.lock:
-                try:
-                    keep_services = self.api_client.keep_services().accessible()
-                except Exception:  # API server predates Keep services.
-                    keep_services = self.api_client.keep_disks().list()
+    def build_service_roots(self, force_rebuild=False):
+        if (self.static_service_roots or
+              (self.service_roots and not force_rebuild)):
+            return
+        with self.lock:
+            try:
+                keep_services = self.api_client.keep_services().accessible()
+            except Exception:  # API server predates Keep services.
+                keep_services = self.api_client.keep_disks().list()
 
-                keep_services = keep_services.execute().get('items')
-                if not keep_services:
-                    raise arvados.errors.NoKeepServersError()
+            keep_services = keep_services.execute().get('items')
+            if not keep_services:
+                raise arvados.errors.NoKeepServersError()
 
-                self.using_proxy = (keep_services[0].get('service_type') ==
-                                    'proxy')
+            self.using_proxy = (keep_services[0].get('service_type') ==
+                                'proxy')
 
-                roots = (("http%s://%s:%d/" %
-                          ('s' if f['service_ssl_flag'] else '',
-                           f['service_host'],
-                           f['service_port']))
-                         for f in keep_services)
-                self.service_roots = sorted(set(roots))
-                _logger.debug(str(self.service_roots))
+            roots = (("http%s://%s:%d/" %
+                      ('s' if f['service_ssl_flag'] else '',
+                       f['service_host'],
+                       f['service_port']))
+                     for f in keep_services)
+            self.service_roots = sorted(set(roots))
+            _logger.debug(str(self.service_roots))
+
+    def shuffled_service_roots(self, hash, force_rebuild=False):
+        self.build_service_roots(force_rebuild)
 
         # Build an ordering with which to query the Keep servers based on the
         # contents of the hash.
@@ -454,113 +514,176 @@ class KeepClient(object):
         finally:
             self._cache_lock.release()
 
-    def get(self, loc_s):
+    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+        # roots_map is a dictionary, mapping Keep service root strings
+        # to KeepService objects.  Poll for Keep services, and add any
+        # new ones to roots_map.  Return the current list of local
+        # root strings.
+        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+        local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+        for root in local_roots:
+            if root not in roots_map:
+                roots_map[root] = self.KeepService(root, **headers)
+        return local_roots
+
+    @staticmethod
+    def _check_loop_result(result):
+        # KeepClient RetryLoops should save results as a 2-tuple: the
+        # actual result of the request, and the number of servers available
+        # to receive the request this round.
+        # This method returns True if there's a real result, False if
+        # there are no more servers available, otherwise None.
+        if isinstance(result, Exception):
+            return None
+        result, tried_server_count = result
+        if (result is not None) and (result is not False):
+            return True
+        elif tried_server_count < 1:
+            _logger.info("No more Keep services to try; giving up")
+            return False
+        else:
+            return None
+
+    def get(self, loc_s, num_retries=0):
+        """Get data from Keep.
+
+        This method fetches one or more blocks of data from Keep.  It
+        sends a request each Keep service registered with the API
+        server (or the proxy provided when this client was
+        instantiated), then each service named in location hints, in
+        sequence.  As soon as one service provides the data, it's
+        returned.
+
+        Arguments:
+        * loc_s: A string of one or more comma-separated locators to fetch.
+          This method returns the concatenation of these blocks.
+        * num_retries: The number of times to retry GET requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  Note that, in each loop, the method may try
+          to fetch data from every available Keep service, along with any
+          that are named in location hints in the locator.  Default 0.
+        """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
         slot, first = self.reserve_cache(expect_hash)
-
         if not first:
             v = slot.get()
             return v
 
-        try:
-            for service_root in self.shuffled_service_roots(expect_hash):
-                url = service_root + loc_s
-                headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
-                           'Accept': 'application/octet-stream'}
-                blob = self.get_url(url, headers, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-
-            for hint in locator.hints:
-                if not hint.startswith('K@'):
-                    continue
-                url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
-                blob = self.get_url(url, {}, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-        except:
-            slot.set(None)
-            self.cap_cache()
-            raise
-
-        slot.set(None)
+        # See #3147 for a discussion of the loop implementation.  Highlights:
+        # * Refresh the list of Keep services after each failure, in case
+        #   it's being updated.
+        # * Retry until we succeed, we're out of retries, or every available
+        #   service has returned permanent failure.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@')]
+        # Map root URLs their KeepService objects.
+        roots_map = {root: self.KeepService(root) for root in hint_roots}
+        blob = None
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
+        for tries_left in loop:
+            try:
+                local_roots = self.map_new_services(
+                    roots_map, expect_hash,
+                    force_rebuild=(tries_left < num_retries))
+            except Exception as error:
+                loop.save_result(error)
+                continue
+
+            # Query KeepService objects that haven't returned
+            # permanent failure, in our specified shuffle order.
+            services_to_try = [roots_map[root]
+                               for root in (local_roots + hint_roots)
+                               if roots_map[root].usable()]
+            http = httplib2.Http(timeout=self.timeout)
+            for keep_service in services_to_try:
+                blob = keep_service.get(http, locator)
+                if blob is not None:
+                    break
+            loop.save_result((blob, len(services_to_try)))
+
+        # Always cache the result, then return it if we succeeded.
+        slot.set(blob)
         self.cap_cache()
-        raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+        if loop.success():
+            return blob
+
+        # No servers fulfilled the request.  Count how many responded
+        # "not found;" if the ratio is high enough (currently 75%), report
+        # Not Found; otherwise a generic error.
+        # Q: Including 403 is necessary for the Keep tests to continue
+        # passing, but maybe they should expect KeepReadError instead?
+        not_founds = sum(1 for ks in roots_map.values()
+                         if ks.last_status() in set([403, 404, 410]))
+        if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
+            raise arvados.errors.NotFoundError(loc_s)
+        else:
+            raise arvados.errors.KeepReadError(loc_s)
 
-    def get_url(self, url, headers, expect_hash):
-        h = httplib2.Http()
-        try:
-            _logger.debug("Request: GET %s", url)
-            with timer.Timer() as t:
-                resp, content = h.request(url.encode('utf-8'), 'GET',
-                                          headers=headers)
-            _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
-                         len(content), t.msecs,
-                         (len(content)/(1024*1024))/t.secs)
-            if re.match(r'^2\d\d$', resp['status']):
-                md5 = hashlib.md5(content).hexdigest()
-                if md5 == expect_hash:
-                    return content
-                _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
-        except Exception as e:
-            _logger.debug("Request fail: GET %s => %s: %s",
-                         url, type(e), str(e))
-        return None
-
-    def put(self, data, copies=2):
+    def put(self, data, copies=2, num_retries=0):
+        """Save data in Keep.
+
+        This method will get a list of Keep services from the API server, and
+        send the data to each one simultaneously in a new thread.  Once the
+        uploads are finished, if enough copies are saved, this method returns
+        the most recent HTTP response body.  If requests fail to upload
+        enough copies, this method raises KeepWriteError.
+
+        Arguments:
+        * data: The string of data to upload.
+        * copies: The number of copies that the user requires be saved.
+          Default 2.
+        * num_retries: The number of times to retry PUT requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  Default 0.
+        """
         data_hash = hashlib.md5(data).hexdigest()
-        have_copies = 0
-        want_copies = copies
-        if not (want_copies > 0):
+        if copies < 1:
             return data_hash
-        threads = []
-        thread_limiter = KeepClient.ThreadLimiter(want_copies)
-        for service_root in self.shuffled_service_roots(data_hash):
-            t = KeepClient.KeepWriterThread(
-                self.api_token,
-                data=data,
-                data_hash=data_hash,
-                service_root=service_root,
-                thread_limiter=thread_limiter,
-                timeout=self.timeout,
-                using_proxy=self.using_proxy,
-                want_copies=(want_copies if self.using_proxy else 1))
-            t.start()
-            threads += [t]
-        for t in threads:
-            t.join()
-        if thread_limiter.done() < want_copies:
-            # Retry the threads (i.e., services) that failed the first
-            # time around.
-            threads_retry = []
+
+        headers = {}
+        if self.using_proxy:
+            # Tell the proxy how many copies we want it to store
+            headers['X-Keep-Desired-Replication'] = str(copies)
+        roots_map = {}
+        thread_limiter = KeepClient.ThreadLimiter(copies)
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
+        for tries_left in loop:
+            try:
+                local_roots = self.map_new_services(
+                    roots_map, data_hash,
+                    force_rebuild=(tries_left < num_retries), **headers)
+            except Exception as error:
+                loop.save_result(error)
+                continue
+
+            threads = []
+            for service_root, ks in roots_map.iteritems():
+                if ks.finished():
+                    continue
+                t = KeepClient.KeepWriterThread(
+                    ks,
+                    data=data,
+                    data_hash=data_hash,
+                    service_root=service_root,
+                    thread_limiter=thread_limiter,
+                    timeout=self.timeout)
+                t.start()
+                threads.append(t)
             for t in threads:
-                if not t.success():
-                    _logger.debug("Retrying: PUT %s %s",
-                                    t.args['service_root'],
-                                    t.args['data_hash'])
-                    retry_with_args = t.args.copy()
-                    t_retry = KeepClient.KeepWriterThread(self.api_token,
-                                                          **retry_with_args)
-                    t_retry.start()
-                    threads_retry += [t_retry]
-            for t in threads_retry:
                 t.join()
-        have_copies = thread_limiter.done()
-        # If we're done, return the response from Keep
-        if have_copies >= want_copies:
+            loop.save_result((thread_limiter.done() >= copies, len(threads)))
+
+        if loop.success():
             return thread_limiter.response()
         raise arvados.errors.KeepWriteError(
             "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, want_copies, have_copies))
-
+            (data_hash, copies, thread_limiter.done()))
 
     def local_store_put(self, data):
         md5 = hashlib.md5(data).hexdigest()
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 6198919..4ac9df1 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1,12 +1,11 @@
-# usage example:
-#
-# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
-
+import mock
 import os
 import unittest
 
 import arvados
+import arvados.retry
 import run_test_server
+from arvados_testutil import fake_httplib2_response
 
 class KeepTestCase(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
@@ -219,3 +218,108 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
                          'baz2',
                          'wrong content from Keep.get(md5("baz2"))')
         self.assertTrue(keep_client.using_proxy)
+
+
+class KeepClientRetryTestMixin(object):
+    # Testing with a local Keep store won't exercise the retry behavior.
+    # Instead, our strategy is:
+    # * Create a client with one proxy specified (pointed at a black
+    #   hole), so there's no need to instantiate an API client, and
+    #   all HTTP requests come from one place.
+    # * Mock httplib's request method to provide simulated responses.
+    # This lets us test the retry logic extensively without relying on any
+    # supporting servers, and prevents side effects in case something hiccups.
+    # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
+    # run_method().
+    PROXY_ADDR = 'http://[100::]/'
+    TEST_DATA = 'testdata'
+    TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
+
+    @staticmethod
+    def mock_responses(body, *codes):
+        return mock.patch('httplib2.Http.request', side_effect=(
+                (fake_httplib2_response(code), body) for code in codes))
+
+    def new_client(self):
+        return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+
+    def run_method(self, *args, **kwargs):
+        raise NotImplementedError("test subclasses must define run_method")
+
+    def check_success(self, expected=None, *args, **kwargs):
+        if expected is None:
+            expected = self.DEFAULT_EXPECT
+        self.assertEqual(expected, self.run_method(*args, **kwargs))
+
+    def check_exception(self, error_class=None, *args, **kwargs):
+        if error_class is None:
+            error_class = self.DEFAULT_EXCEPTION
+        self.assertRaises(error_class, self.run_method, *args, **kwargs)
+
+    def test_immediate_success(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+            self.check_success()
+
+    def test_retry_then_success(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+            self.check_success(num_retries=3)
+
+    def test_no_default_retry(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+            self.check_exception()
+
+    def test_no_retry_after_permanent_error(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+            self.check_exception(num_retries=3)
+
+    def test_error_after_retries_exhausted(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+            self.check_exception(num_retries=1)
+
+
+# Don't delay from HTTPRetryLoop's exponential backoff.
+no_backoff = mock.patch('time.sleep', lambda n: None)
+ at no_backoff
+class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
+    DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+    HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
+
+    def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
+                   *args, **kwargs):
+        return self.new_client().get(locator, *args, **kwargs)
+
+    def test_specific_exception_when_not_found(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+            self.check_exception(arvados.errors.NotFoundError, num_retries=3)
+
+    def test_general_exception_with_mixed_errors(self):
+        # get should raise a NotFoundError if no server returns the block,
+        # and a high threshold of servers report that it's not found.
+        # This test rigs up 50/50 disagreement between two servers, and
+        # checks that it does not become a NotFoundError.
+        client = self.new_client()
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+            with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
+                client.get(self.HINTED_LOCATOR)
+            self.assertNotIsInstance(
+                exc_check.exception, arvados.errors.NotFoundError,
+                "mixed errors raised NotFoundError")
+
+    def test_hint_server_can_succeed_without_retries(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+            self.check_success(locator=self.HINTED_LOCATOR)
+
+
+ at no_backoff
+class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
+    DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
+
+    def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
+                   copies=1, *args, **kwargs):
+        return self.new_client().put(data, copies, *args, **kwargs)
+
+    def test_do_not_send_multiple_copies_to_same_server(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+            self.check_exception(copies=2, num_retries=3)
commit dccec0cc48025830ac68cbb4a80f5ae73ace3fa7
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Aug 22 15:57:36 2014 -0400
    3147: Remove old Keep signing support from Python SDK.
    
    Per Tom.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 561d34c..909ee1f 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -242,15 +242,6 @@ class KeepClient(object):
                 resp, content = h.request(url.encode('utf-8'), 'PUT',
                                           headers=headers,
                                           body=self.args['data'])
-                if (resp['status'] == '401' and
-                    re.match(r'Timestamp verification failed', content)):
-                    body = KeepClient.sign_for_old_server(
-                        self.args['data_hash'],
-                        self.args['data'])
-                    h = httplib2.Http(timeout=self.args.get('timeout', None))
-                    resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                              headers=headers,
-                                              body=body)
                 if re.match(r'^2\d\d$', resp['status']):
                     self._success = True
                     _logger.debug("KeepWriterThread %s succeeded %s %s",
@@ -570,9 +561,6 @@ class KeepClient(object):
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, want_copies, have_copies))
 
-    @staticmethod
-    def sign_for_old_server(data_hash, data):
-        return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
 
     def local_store_put(self, data):
         md5 = hashlib.md5(data).hexdigest()
commit 7f5a0cafa4cc3d3155830243ab9df204b3b48a03
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Aug 26 09:46:46 2014 -0400
    3147: Add check_http_response_success to Python SDK.
    
    Other parts of the SDK need to end loops based on the result of an
    HTTP request.  This function puts that logic in one place.
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index a5a303f..5dc31ae 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -6,6 +6,9 @@ from collections import deque
 
 import arvados.errors
 
+_HTTP_SUCCESSES = set(xrange(200, 300))
+_HTTP_CAN_RETRY = set([408, 409, 422, 423, 500, 502, 503, 504])
+
 class RetryLoop(object):
     """Coordinate limited retries of code.
 
@@ -102,3 +105,36 @@ class RetryLoop(object):
         except IndexError:
             raise arvados.errors.AssertionError(
                 "queried loop results before any were recorded")
+
+
+def check_http_response_success(result):
+    """Convert an httplib2 request result to a loop control flag.
+
+    Pass this method the 2-tuple returned by httplib2.Http.request.  It
+    returns True if the response indicates success, None if it indicates
+    temporary failure, and False otherwise.  You can use this as the
+    success_check for a RetryLoop.
+
+    Implementation details:
+    * Any 2xx result returns True.
+    * A select few status codes, or any malformed responses, return None.
+      422 Unprocessable Entity is in this category.  This may not meet the
+      letter of the HTTP specification, but the Arvados API server will
+      use it for various server-side problems like database connection
+      errors.
+    * Everything else returns False.  Note that this includes 1xx and
+      3xx status codes.  They don't indicate success, and you can't
+      retry those requests verbatim.
+    """
+    try:
+        status = int(result[0].status)
+    except Exception:
+        return None
+    if status in _HTTP_SUCCESSES:
+        return True
+    elif status in _HTTP_CAN_RETRY:
+        return None
+    elif 100 <= status < 600:
+        return False
+    else:
+        return None  # Get well soon, server.
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index 131872b..ed0a406 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -147,5 +147,52 @@ class RetryLoopBackoffTestCase(unittest.TestCase, RetryLoopTestMixin):
         self.check_backoff(sleep_mock, 5, 9)
 
 
+class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
+    def results_map(self, *codes):
+        for code in codes:
+            response = (fake_httplib2_response(code), None)
+            yield code, arv_retry.check_http_response_success(response)
+
+    def check(assert_name):
+        def check_method(self, expected, *codes):
+            assert_func = getattr(self, assert_name)
+            for code, actual in self.results_map(*codes):
+                assert_func(expected, actual,
+                            "{} status flagged {}".format(code, actual))
+                if assert_name != 'assertIs':
+                    self.assertTrue(
+                        actual is True or actual is False or actual is None,
+                        "{} status returned {}".format(code, actual))
+        return check_method
+
+    check_is = check('assertIs')
+    check_is_not = check('assertIsNot')
+
+    def test_obvious_successes(self):
+        self.check_is(True, *range(200, 207))
+
+    def test_obvious_stops(self):
+        self.check_is(False, 424, 426, 428, 431,
+                      *range(400, 408) + range(410, 420))
+
+    def test_obvious_retries(self):
+        self.check_is(None, 500, 502, 503, 504)
+
+    def test_4xx_retries(self):
+        self.check_is(None, 408, 409, 422, 423)
+
+    def test_5xx_failures(self):
+        self.check_is(False, 501, *range(505, 512))
+
+    def test_1xx_not_retried(self):
+        self.check_is_not(None, 100, 101)
+
+    def test_redirects_not_retried(self):
+        self.check_is_not(None, *range(300, 309))
+
+    def test_wacky_code_retries(self):
+        self.check_is(None, 0, 99, 600, -200)
+
+
 if __name__ == '__main__':
     unittest.main()
commit adbeb30cc27fa56a4550e28e1fd5e8ec4a1c7836
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Aug 26 09:46:17 2014 -0400
    3147: Add RetryLoop to the Python SDK.
    
    This provides a general-purpose mechanism for us to retry all kinds of
    operations.
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
new file mode 100644
index 0000000..a5a303f
--- /dev/null
+++ b/sdk/python/arvados/retry.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python
+
+import time
+
+from collections import deque
+
+import arvados.errors
+
+class RetryLoop(object):
+    """Coordinate limited retries of code.
+
+    RetryLoop coordinates a loop that runs until it records a
+    successful result or tries too many times, whichever comes first.
+    Typical use looks like:
+
+        loop = RetryLoop(num_retries=2)
+        for tries_left in loop:
+            try:
+                result = do_something()
+            except TemporaryError as error:
+                log("error: {} ({} tries left)".format(error, tries_left))
+            else:
+                loop.save_result(result)
+        if loop.success():
+            return loop.last_result()
+    """
+    def __init__(self, num_retries, success_check=lambda r: True,
+                 backoff_start=0, backoff_growth=2, save_results=1):
+        """Construct a new RetryLoop.
+
+        Arguments:
+        * num_retries: The maximum number of times to retry the loop if it
+          doesn't succeed.  This means the loop could run at most 1+N times.
+        * success_check: This is a function that will be called each
+          time the loop saves a result.  The function should return
+          True if the result indicates loop success, False if it
+          represents a permanent failure state, and None if the loop
+          should continue.  If no function is provided, the loop will
+          end as soon as it records any result.
+        * backoff_start: The number of seconds that must pass before the
+          loop's second iteration.  Default 0, which disables all waiting.
+        * backoff_growth: The wait time multiplier after each iteration.
+          Default 2 (i.e., double the wait time each time).
+        * save_results: Specify a number to save the last N results
+          that the loop recorded.  These records are available through
+          the results attribute, oldest first.  Default 1.
+        """
+        self.tries_left = num_retries + 1
+        self.check_result = success_check
+        self.backoff_wait = backoff_start
+        self.backoff_growth = backoff_growth
+        self.next_start_time = 0
+        self.results = deque(maxlen=save_results)
+        self._running = None
+        self._success = None
+
+    def __iter__(self):
+        return self
+
+    def running(self):
+        return self._running and (self._success is None)
+
+    def next(self):
+        if self._running is None:
+            self._running = True
+        if (self.tries_left < 1) or not self.running():
+            self._running = False
+            raise StopIteration
+        else:
+            wait_time = max(0, self.next_start_time - time.time())
+            time.sleep(wait_time)
+            self.backoff_wait *= self.backoff_growth
+        self.next_start_time = time.time() + self.backoff_wait
+        self.tries_left -= 1
+        return self.tries_left
+
+    def save_result(self, result):
+        """Record a loop result.
+
+        Save the given result, and end the loop if it indicates
+        success or permanent failure.  See __init__'s documentation
+        about success_check to learn how to make that indication.
+        """
+        if not self.running():
+            raise arvados.errors.AssertionError(
+                "recorded a loop result after the loop finished")
+        self.results.append(result)
+        self._success = self.check_result(result)
+
+    def success(self):
+        """Return the loop's end state.
+
+        Returns True if the loop obtained a successful result, False if it
+        encountered permanent failure, or else None.
+        """
+        return self._success
+
+    def last_result(self):
+        """Return the most recent result the loop recorded."""
+        try:
+            return self.results[-1]
+        except IndexError:
+            raise arvados.errors.AssertionError(
+                "queried loop results before any were recorded")
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 2c51e8f..30cc779 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -32,5 +32,5 @@ setup(name='arvados-python-client',
         'ws4py'
         ],
       test_suite='tests',
-      tests_require=['PyYAML'],
+      tests_require=['mock', 'PyYAML'],
       zip_safe=False)
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
new file mode 100644
index 0000000..131872b
--- /dev/null
+++ b/sdk/python/tests/test_retry.py
@@ -0,0 +1,151 @@
+#!/usr/bin/env python
+
+import itertools
+import unittest
+
+import arvados.errors as arv_error
+import arvados.retry as arv_retry
+import mock
+
+from arvados_testutil import fake_httplib2_response
+
+class RetryLoopTestMixin(object):
+    @staticmethod
+    def loop_success(result):
+        # During the tests, we use integers that look like HTTP status
+        # codes as loop results.  Then we define simplified HTTP
+        # heuristics here to decide whether the result is success (True),
+        # permanent failure (False), or temporary failure (None).
+        if result < 400:
+            return True
+        elif result < 500:
+            return False
+        else:
+            return None
+
+    def run_loop(self, num_retries, *results, **kwargs):
+        responses = itertools.chain(results, itertools.repeat(None))
+        retrier = arv_retry.RetryLoop(num_retries, self.loop_success,
+                                      **kwargs)
+        for tries_left, response in itertools.izip(retrier, responses):
+            retrier.save_result(response)
+        return retrier
+
+    def check_result(self, retrier, expect_success, last_code):
+        self.assertIs(retrier.success(), expect_success,
+                      "loop success flag is incorrect")
+        self.assertEqual(last_code, retrier.last_result())
+
+
+class RetryLoopTestCase(unittest.TestCase, RetryLoopTestMixin):
+    def test_zero_retries_and_success(self):
+        retrier = self.run_loop(0, 200)
+        self.check_result(retrier, True, 200)
+
+    def test_zero_retries_and_tempfail(self):
+        retrier = self.run_loop(0, 500, 501)
+        self.check_result(retrier, None, 500)
+
+    def test_zero_retries_and_permfail(self):
+        retrier = self.run_loop(0, 400, 201)
+        self.check_result(retrier, False, 400)
+
+    def test_one_retry_with_immediate_success(self):
+        retrier = self.run_loop(1, 200, 201)
+        self.check_result(retrier, True, 200)
+
+    def test_one_retry_with_delayed_success(self):
+        retrier = self.run_loop(1, 500, 201)
+        self.check_result(retrier, True, 201)
+
+    def test_one_retry_with_no_success(self):
+        retrier = self.run_loop(1, 500, 501, 502)
+        self.check_result(retrier, None, 501)
+
+    def test_one_retry_but_permfail(self):
+        retrier = self.run_loop(1, 400, 201)
+        self.check_result(retrier, False, 400)
+
+    def test_two_retries_with_immediate_success(self):
+        retrier = self.run_loop(2, 200, 201, 202)
+        self.check_result(retrier, True, 200)
+
+    def test_two_retries_with_success_after_one(self):
+        retrier = self.run_loop(2, 500, 201, 502)
+        self.check_result(retrier, True, 201)
+
+    def test_two_retries_with_success_after_two(self):
+        retrier = self.run_loop(2, 500, 501, 202, 503)
+        self.check_result(retrier, True, 202)
+
+    def test_two_retries_with_no_success(self):
+        retrier = self.run_loop(2, 500, 501, 502, 503)
+        self.check_result(retrier, None, 502)
+
+    def test_two_retries_with_permfail(self):
+        retrier = self.run_loop(2, 500, 401, 202)
+        self.check_result(retrier, False, 401)
+
+    def test_save_result_before_start_is_error(self):
+        retrier = arv_retry.RetryLoop(0)
+        self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+
+    def test_save_result_after_end_is_error(self):
+        retrier = arv_retry.RetryLoop(0)
+        for count in retrier:
+            pass
+        self.assertRaises(arv_error.AssertionError, retrier.save_result, 1)
+
+
+ at mock.patch('time.time', side_effect=itertools.count())
+ at mock.patch('time.sleep')
+class RetryLoopBackoffTestCase(unittest.TestCase, RetryLoopTestMixin):
+    def run_loop(self, num_retries, *results, **kwargs):
+        kwargs.setdefault('backoff_start', 8)
+        return super(RetryLoopBackoffTestCase, self).run_loop(
+            num_retries, *results, **kwargs)
+
+    def check_backoff(self, sleep_mock, sleep_count, multiplier=1):
+        # Figure out how much time we actually spent sleeping.
+        sleep_times = [arglist[0][0] for arglist in sleep_mock.call_args_list
+                       if arglist[0][0] > 0]
+        self.assertEqual(sleep_count, len(sleep_times),
+                         "loop did not back off correctly")
+        last_wait = 0
+        for this_wait in sleep_times:
+            self.assertGreater(this_wait, last_wait * multiplier,
+                               "loop did not grow backoff times correctly")
+            last_wait = this_wait
+
+    def test_no_backoff_with_no_retries(self, sleep_mock, time_mock):
+        self.run_loop(0, 500, 201)
+        self.check_backoff(sleep_mock, 0)
+
+    def test_no_backoff_after_success(self, sleep_mock, time_mock):
+        self.run_loop(1, 200, 501)
+        self.check_backoff(sleep_mock, 0)
+
+    def test_no_backoff_after_permfail(self, sleep_mock, time_mock):
+        self.run_loop(1, 400, 201)
+        self.check_backoff(sleep_mock, 0)
+
+    def test_backoff_before_success(self, sleep_mock, time_mock):
+        self.run_loop(5, 500, 501, 502, 203, 504)
+        self.check_backoff(sleep_mock, 3)
+
+    def test_backoff_before_permfail(self, sleep_mock, time_mock):
+        self.run_loop(5, 500, 501, 502, 403, 504)
+        self.check_backoff(sleep_mock, 3)
+
+    def test_backoff_all_tempfail(self, sleep_mock, time_mock):
+        self.run_loop(3, 500, 501, 502, 503, 504)
+        self.check_backoff(sleep_mock, 3)
+
+    def test_backoff_multiplier(self, sleep_mock, time_mock):
+        self.run_loop(5, 500, 501, 502, 503, 504, 505,
+                      backoff_start=5, backoff_growth=10)
+        self.check_backoff(sleep_mock, 5, 9)
+
+
+if __name__ == '__main__':
+    unittest.main()
commit 440d863f1643587d37bd21548410f13b655ac021
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Aug 21 11:13:23 2014 -0400
    3147: Move fake Python HTTP response generation to arvados_testutil.
    
    Other tests can use this functionality.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 369d561..77146db 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -1,11 +1,18 @@
 #!/usr/bin/env python
 
 import errno
+import httplib
+import httplib2
 import os
 import shutil
 import tempfile
 import unittest
 
+def fake_httplib2_response(code, **headers):
+    headers.update(status=str(code),
+                   reason=httplib.responses.get(code, "Unknown Response"))
+    return httplib2.Response(headers)
+
 class ArvadosBaseTestCase(unittest.TestCase):
     # This class provides common utility functions for our tests.
 
diff --git a/sdk/python/tests/test_api.py b/sdk/python/tests/test_api.py
index 2fd709e..e9cb838 100644
--- a/sdk/python/tests/test_api.py
+++ b/sdk/python/tests/test_api.py
@@ -10,22 +10,17 @@ import run_test_server
 import unittest
 
 from apiclient.http import RequestMockBuilder
-from httplib import responses as HTTP_RESPONSES
+from arvados_testutil import fake_httplib2_response
 
 if not mimetypes.inited:
     mimetypes.init()
 
 class ArvadosApiClientTest(unittest.TestCase):
-    @classmethod
-    def response_from_code(cls, code):
-        return httplib2.Response(
-            {'status': code,
-             'reason': HTTP_RESPONSES.get(code, "Unknown Response"),
-             'Content-Type': mimetypes.types_map['.json']})
+    ERROR_HEADERS = {'Content-Type': mimetypes.types_map['.json']}
 
     @classmethod
     def api_error_response(cls, code, *errors):
-        return (cls.response_from_code(code),
+        return (fake_httplib2_response(code, **cls.ERROR_HEADERS),
                 json.dumps({'errors': errors,
                             'error_token': '1234567890+12345678'}))
 
@@ -38,7 +33,9 @@ class ArvadosApiClientTest(unittest.TestCase):
         # FIXME: Figure out a better way to stub this out.
         run_test_server.run()
         mock_responses = {
-            'arvados.humans.delete': (cls.response_from_code(500), ""),
+            'arvados.humans.delete': (
+                fake_httplib2_response(500, **cls.ERROR_HEADERS),
+                ""),
             'arvados.humans.get': cls.api_error_response(
                 422, "Bad UUID format", "Bad output format"),
             'arvados.humans.list': (None, json.dumps(
-----------------------------------------------------------------------
hooks/post-receive
-- 
    
    
More information about the arvados-commits
mailing list