[ARVADOS] created: 54fc73b1e8f0bb7547e2cdfdb64d0e2c537b781a

git at public.curoverse.com git at public.curoverse.com
Tue Dec 1 02:45:34 EST 2015


        at  54fc73b1e8f0bb7547e2cdfdb64d0e2c537b781a (commit)


commit 54fc73b1e8f0bb7547e2cdfdb64d0e2c537b781a
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Dec 1 02:50:18 2015 -0500

    7697: Avoid reusing long-idle HTTP connections. Avoid retrying non-idempotent operations.

diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index 5ec5ac2..ce4d3dd 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -1,10 +1,12 @@
 import collections
+import httplib
 import httplib2
 import json
 import logging
 import os
 import re
 import socket
+import time
 import types
 
 import apiclient
@@ -16,6 +18,11 @@ import util
 
 _logger = logging.getLogger('arvados.api')
 
+MAX_IDLE_CONNECTION_DURATION = 30
+RETRY_DELAY_INITIAL = 2
+RETRY_DELAY_BACKOFF = 2
+RETRY_COUNT = 2
+
 class OrderedJsonModel(apiclient.model.JsonModel):
     """Model class for JSON that preserves the contents' order.
 
@@ -37,8 +44,6 @@ class OrderedJsonModel(apiclient.model.JsonModel):
 
 
 def _intercept_http_request(self, uri, **kwargs):
-    from httplib import BadStatusLine
-
     if (self.max_request_size and
         kwargs.get('body') and
         self.max_request_size < len(kwargs['body'])):
@@ -51,32 +56,54 @@ def _intercept_http_request(self, uri, **kwargs):
         kwargs['headers']['X-External-Client'] = '1'
 
     kwargs['headers']['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
-    try:
-        return self.orig_http_request(uri, **kwargs)
-    except BadStatusLine:
-        # This is how httplib tells us that it tried to reuse an
-        # existing connection but it was already closed by the
-        # server. In that case, yes, we would like to retry.
-        # Unfortunately, we are not absolutely certain that the
-        # previous call did not succeed, so this is slightly
-        # risky.
-        return self.orig_http_request(uri, **kwargs)
-    except socket.error:
-        # This is the one case where httplib2 doesn't close the
-        # underlying connection first.  Close all open connections,
-        # expecting this object only has the one connection to the API
-        # server.  This is safe because httplib2 reopens connections when
-        # needed.
-        _logger.debug("Retrying API request after socket error", exc_info=True)
+
+    retryable = kwargs.get('method', 'GET') in [
+        'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT']
+    retry_count = self._retry_count if retryable else 0
+
+    if (not retryable and
+        time.time() - self._last_request_time > self._max_keepalive_idle):
+        # High probability of failure due to connection atrophy. Make
+        # sure this request [re]opens a new connection by closing and
+        # forgetting all cached connections first.
         for conn in self.connections.itervalues():
             conn.close()
-        return self.orig_http_request(uri, **kwargs)
+        self.connections.clear()
+
+    delay = self._retry_delay_initial
+    for _ in range(retry_count):
+        self._last_request_time = time.time()
+        try:
+            return self.orig_http_request(uri, **kwargs)
+        except httplib.BadStatusLine, httplib.HTTPException:
+            _logger.debug("Retrying API request in %d s after HTTP error",
+                          delay, exc_info=True)
+        except socket.error:
+            # This is the one case where httplib2 doesn't close the
+            # underlying connection first.  Close all open
+            # connections, expecting this object only has the one
+            # connection to the API server.  This is safe because
+            # httplib2 reopens connections when needed.
+            _logger.debug("Retrying API request in %d s after socket error",
+                          delay, exc_info=True)
+            for conn in self.connections.itervalues():
+                conn.close()
+        time.sleep(delay)
+        delay = delay * self._retry_delay_backoff
+
+    self._last_request_time = time.time()
+    return self.orig_http_request(uri, **kwargs)
 
 def _patch_http_request(http, api_token):
     http.arvados_api_token = api_token
     http.max_request_size = 0
     http.orig_http_request = http.request
     http.request = types.MethodType(_intercept_http_request, http)
+    http._last_request_time = 0
+    http._max_keepalive_idle = MAX_IDLE_CONNECTION_DURATION
+    http._retry_delay_initial = RETRY_DELAY_INITIAL
+    http._retry_delay_backoff = RETRY_DELAY_BACKOFF
+    http._retry_count = RETRY_COUNT
     return http
 
 # Monkey patch discovery._cast() so objects and arrays get serialized
diff --git a/sdk/python/tests/test_api.py b/sdk/python/tests/test_api.py
index 6d1e979..1a5c1ff 100644
--- a/sdk/python/tests/test_api.py
+++ b/sdk/python/tests/test_api.py
@@ -3,9 +3,11 @@
 import arvados
 import collections
 import httplib2
+import itertools
 import json
 import mimetypes
 import os
+import Queue
 import socket
 import string
 import unittest
@@ -15,8 +17,8 @@ import run_test_server
 
 from apiclient import errors as apiclient_errors
 from apiclient import http as apiclient_http
-from arvados.api import OrderedJsonModel
-from arvados_testutil import fake_httplib2_response
+from arvados.api import OrderedJsonModel, RETRY_DELAY_INITIAL, RETRY_DELAY_BACKOFF, RETRY_COUNT
+from arvados_testutil import fake_httplib2_response, queue_with
 
 if not mimetypes.inited:
     mimetypes.init()
@@ -106,20 +108,78 @@ class ArvadosApiTest(run_test_server.TestCaseWithServers):
         result = api.humans().get(uuid='test').execute()
         self.assertEqual(string.hexdigits, ''.join(result.keys()))
 
-    def test_socket_errors_retried(self):
-        api = arvados.api('v1')
-        self.assertTrue(hasattr(api._http, 'orig_http_request'),
+class RetryREST(unittest.TestCase):
+    def setUp(self):
+        self.api = arvados.api('v1')
+        self.assertTrue(hasattr(self.api._http, 'orig_http_request'),
                         "test doesn't know how to intercept HTTP requests")
-        api._http.orig_http_request = mock.MagicMock()
-        mock_response = {'user': 'person'}
-        api._http.orig_http_request.side_effect = [
-            socket.error("mock error"),
-            (fake_httplib2_response(200), json.dumps(mock_response))
-            ]
-        actual_response = api.users().current().execute()
-        self.assertEqual(mock_response, actual_response)
-        self.assertGreater(api._http.orig_http_request.call_count, 1,
+        self.mock_response = {'user': 'person'}
+        self.request_success = (fake_httplib2_response(200),
+                                json.dumps(self.mock_response))
+        self.api._http.orig_http_request = mock.MagicMock()
+        # All requests succeed by default. Tests override as needed.
+        self.api._http.orig_http_request.side_effect = lambda x, *a, **k: self.request_success
+
+    @mock.patch('time.sleep')
+    def test_socket_error_retry_get(self, sleep):
+        self.api._http.orig_http_request.side_effect = (
+            socket.error('mock error'),
+            self.request_success,
+        )
+        self.assertEqual(self.api.users().current().execute(),
+                         self.mock_response)
+        self.assertGreater(self.api._http.orig_http_request.call_count, 1,
                            "client got the right response without retrying")
+        self.assertEqual(sleep.call_args_list,
+                         [mock.call(RETRY_DELAY_INITIAL)])
+
+    @mock.patch('time.sleep')
+    def test_socket_error_retry_delay(self, sleep):
+        self.api._http.orig_http_request.side_effect = socket.error('mock')
+        self.api._http._retry_count = 3
+        with self.assertRaises(socket.error):
+            self.api.users().current().execute()
+        self.assertEqual(self.api._http.orig_http_request.call_count, 4)
+        self.assertEqual(sleep.call_args_list, [
+            mock.call(RETRY_DELAY_INITIAL),
+            mock.call(RETRY_DELAY_INITIAL * RETRY_DELAY_BACKOFF),
+            mock.call(RETRY_DELAY_INITIAL * RETRY_DELAY_BACKOFF**2),
+        ])
+
+    @mock.patch('time.time', side_effect=[i*2**20 for i in range(99)])
+    def test_close_old_connections_non_retryable(self, sleep):
+        self._test_connection_close(expect=1)
+
+    @mock.patch('time.time', side_effect=itertools.count())
+    def test_no_close_fresh_connections_non_retryable(self, sleep):
+        self._test_connection_close(expect=0)
+
+    @mock.patch('time.time', side_effect=itertools.count())
+    def test_override_max_idle_time(self, sleep):
+        self.api._http._max_keepalive_idle = 0
+        self._test_connection_close(expect=1)
+
+    def _test_connection_close(self, expect=0):
+        # Do two POST requests. The second one must close all
+        # connections +expect+ times.
+        self.api.users().create(body={}).execute()
+        mock_conns = [mock.MagicMock() for i in range(2)]
+        self.api._http.connections = {str(i): mock_conns[i] for i in range(2)}
+        self.api.users().create(body={}).execute()
+        for c in mock_conns:
+            self.assertEqual(c.close.call_count, expect)
+
+    @mock.patch('time.sleep')
+    def test_socket_error_no_retry_post(self, sleep):
+        self.api._http.orig_http_request.side_effect = (
+            socket.error('mock error'),
+            self.request_success,
+        )
+        with self.assertRaises(socket.error):
+            self.api.users().create(body={}).execute()
+        self.assertEqual(self.api._http.orig_http_request.call_count, 1,
+                         "client should try non-retryable method exactly once")
+        self.assertEqual(sleep.call_args_list, [])
 
 
 if __name__ == '__main__':

commit 1c354a633235acc54b3e9f73528bd0907ec5d427
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Nov 25 09:26:10 2015 -0500

    7697: Move diag messages from stdout to stderr.

diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index adb8652..d6febdd 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -98,7 +98,8 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
             # Pidfile exists, but we can't parse it. Perhaps the
             # server has created the file but hasn't written its PID
             # yet?
-            print("Parse error reading pidfile {}: {}".format(pidfile, error))
+            print("Parse error reading pidfile {}: {}".format(pidfile, error),
+                  file=sys.stderr)
             time.sleep(0.1)
             now = time.time()
 
@@ -113,7 +114,8 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
         try:
             if now >= startTERM:
                 os.kill(server_pid, signal.SIGTERM)
-                print("Sent SIGTERM to {} ({})".format(server_pid, pidfile))
+                print("Sent SIGTERM to {} ({})".format(server_pid, pidfile),
+                      file=sys.stderr)
         except OSError as error:
             if error.errno == errno.ESRCH:
                 # Thrown by os.getpgid() or os.kill() if the process
@@ -124,7 +126,8 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
         now = time.time()
 
     print("Server PID {} ({}) did not exit, giving up after {}s".
-          format(server_pid, pidfile, wait))
+          format(server_pid, pidfile, wait),
+          file=sys.stderr)
 
 def find_available_port():
     """Return an IPv4 port number that is not in use right now.
@@ -155,7 +158,8 @@ def _wait_until_port_listens(port, timeout=10):
         subprocess.check_output(['which', 'lsof'])
     except subprocess.CalledProcessError:
         print("WARNING: No `lsof` -- cannot wait for port to listen. "+
-              "Sleeping 0.5 and hoping for the best.")
+              "Sleeping 0.5 and hoping for the best.",
+              file=sys.stderr)
         time.sleep(0.5)
         return
     deadline = time.time() + timeout
@@ -695,7 +699,9 @@ if __name__ == "__main__":
     args = parser.parse_args()
 
     if args.action not in actions:
-        print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions), file=sys.stderr)
+        print("Unrecognized action '{}'. Actions are: {}.".
+              format(args.action, actions),
+              file=sys.stderr)
         sys.exit(1)
     if args.action == 'start':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list