[ARVADOS] created: 54fc73b1e8f0bb7547e2cdfdb64d0e2c537b781a
git at public.curoverse.com
git at public.curoverse.com
Tue Dec 1 02:45:34 EST 2015
at 54fc73b1e8f0bb7547e2cdfdb64d0e2c537b781a (commit)
commit 54fc73b1e8f0bb7547e2cdfdb64d0e2c537b781a
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Dec 1 02:50:18 2015 -0500
7697: Avoid reusing long-idle HTTP connections. Avoid retrying non-idempotent operations.
diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index 5ec5ac2..ce4d3dd 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -1,10 +1,12 @@
import collections
+import httplib
import httplib2
import json
import logging
import os
import re
import socket
+import time
import types
import apiclient
@@ -16,6 +18,11 @@ import util
_logger = logging.getLogger('arvados.api')
+MAX_IDLE_CONNECTION_DURATION = 30
+RETRY_DELAY_INITIAL = 2
+RETRY_DELAY_BACKOFF = 2
+RETRY_COUNT = 2
+
class OrderedJsonModel(apiclient.model.JsonModel):
"""Model class for JSON that preserves the contents' order.
@@ -37,8 +44,6 @@ class OrderedJsonModel(apiclient.model.JsonModel):
def _intercept_http_request(self, uri, **kwargs):
- from httplib import BadStatusLine
-
if (self.max_request_size and
kwargs.get('body') and
self.max_request_size < len(kwargs['body'])):
@@ -51,32 +56,54 @@ def _intercept_http_request(self, uri, **kwargs):
kwargs['headers']['X-External-Client'] = '1'
kwargs['headers']['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
- try:
- return self.orig_http_request(uri, **kwargs)
- except BadStatusLine:
- # This is how httplib tells us that it tried to reuse an
- # existing connection but it was already closed by the
- # server. In that case, yes, we would like to retry.
- # Unfortunately, we are not absolutely certain that the
- # previous call did not succeed, so this is slightly
- # risky.
- return self.orig_http_request(uri, **kwargs)
- except socket.error:
- # This is the one case where httplib2 doesn't close the
- # underlying connection first. Close all open connections,
- # expecting this object only has the one connection to the API
- # server. This is safe because httplib2 reopens connections when
- # needed.
- _logger.debug("Retrying API request after socket error", exc_info=True)
+
+ retryable = kwargs.get('method', 'GET') in [
+ 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT']
+ retry_count = self._retry_count if retryable else 0
+
+ if (not retryable and
+ time.time() - self._last_request_time > self._max_keepalive_idle):
+ # High probability of failure due to connection atrophy. Make
+ # sure this request [re]opens a new connection by closing and
+ # forgetting all cached connections first.
for conn in self.connections.itervalues():
conn.close()
- return self.orig_http_request(uri, **kwargs)
+ self.connections.clear()
+
+ delay = self._retry_delay_initial
+ for _ in range(retry_count):
+ self._last_request_time = time.time()
+ try:
+ return self.orig_http_request(uri, **kwargs)
+ except httplib.BadStatusLine, httplib.HTTPException:
+ _logger.debug("Retrying API request in %d s after HTTP error",
+ delay, exc_info=True)
+ except socket.error:
+ # This is the one case where httplib2 doesn't close the
+ # underlying connection first. Close all open
+ # connections, expecting this object only has the one
+ # connection to the API server. This is safe because
+ # httplib2 reopens connections when needed.
+ _logger.debug("Retrying API request in %d s after socket error",
+ delay, exc_info=True)
+ for conn in self.connections.itervalues():
+ conn.close()
+ time.sleep(delay)
+ delay = delay * self._retry_delay_backoff
+
+ self._last_request_time = time.time()
+ return self.orig_http_request(uri, **kwargs)
def _patch_http_request(http, api_token):
http.arvados_api_token = api_token
http.max_request_size = 0
http.orig_http_request = http.request
http.request = types.MethodType(_intercept_http_request, http)
+ http._last_request_time = 0
+ http._max_keepalive_idle = MAX_IDLE_CONNECTION_DURATION
+ http._retry_delay_initial = RETRY_DELAY_INITIAL
+ http._retry_delay_backoff = RETRY_DELAY_BACKOFF
+ http._retry_count = RETRY_COUNT
return http
# Monkey patch discovery._cast() so objects and arrays get serialized
diff --git a/sdk/python/tests/test_api.py b/sdk/python/tests/test_api.py
index 6d1e979..1a5c1ff 100644
--- a/sdk/python/tests/test_api.py
+++ b/sdk/python/tests/test_api.py
@@ -3,9 +3,11 @@
import arvados
import collections
import httplib2
+import itertools
import json
import mimetypes
import os
+import Queue
import socket
import string
import unittest
@@ -15,8 +17,8 @@ import run_test_server
from apiclient import errors as apiclient_errors
from apiclient import http as apiclient_http
-from arvados.api import OrderedJsonModel
-from arvados_testutil import fake_httplib2_response
+from arvados.api import OrderedJsonModel, RETRY_DELAY_INITIAL, RETRY_DELAY_BACKOFF, RETRY_COUNT
+from arvados_testutil import fake_httplib2_response, queue_with
if not mimetypes.inited:
mimetypes.init()
@@ -106,20 +108,78 @@ class ArvadosApiTest(run_test_server.TestCaseWithServers):
result = api.humans().get(uuid='test').execute()
self.assertEqual(string.hexdigits, ''.join(result.keys()))
- def test_socket_errors_retried(self):
- api = arvados.api('v1')
- self.assertTrue(hasattr(api._http, 'orig_http_request'),
+class RetryREST(unittest.TestCase):
+ def setUp(self):
+ self.api = arvados.api('v1')
+ self.assertTrue(hasattr(self.api._http, 'orig_http_request'),
"test doesn't know how to intercept HTTP requests")
- api._http.orig_http_request = mock.MagicMock()
- mock_response = {'user': 'person'}
- api._http.orig_http_request.side_effect = [
- socket.error("mock error"),
- (fake_httplib2_response(200), json.dumps(mock_response))
- ]
- actual_response = api.users().current().execute()
- self.assertEqual(mock_response, actual_response)
- self.assertGreater(api._http.orig_http_request.call_count, 1,
+ self.mock_response = {'user': 'person'}
+ self.request_success = (fake_httplib2_response(200),
+ json.dumps(self.mock_response))
+ self.api._http.orig_http_request = mock.MagicMock()
+ # All requests succeed by default. Tests override as needed.
+ self.api._http.orig_http_request.side_effect = lambda x, *a, **k: self.request_success
+
+ @mock.patch('time.sleep')
+ def test_socket_error_retry_get(self, sleep):
+ self.api._http.orig_http_request.side_effect = (
+ socket.error('mock error'),
+ self.request_success,
+ )
+ self.assertEqual(self.api.users().current().execute(),
+ self.mock_response)
+ self.assertGreater(self.api._http.orig_http_request.call_count, 1,
"client got the right response without retrying")
+ self.assertEqual(sleep.call_args_list,
+ [mock.call(RETRY_DELAY_INITIAL)])
+
+ @mock.patch('time.sleep')
+ def test_socket_error_retry_delay(self, sleep):
+ self.api._http.orig_http_request.side_effect = socket.error('mock')
+ self.api._http._retry_count = 3
+ with self.assertRaises(socket.error):
+ self.api.users().current().execute()
+ self.assertEqual(self.api._http.orig_http_request.call_count, 4)
+ self.assertEqual(sleep.call_args_list, [
+ mock.call(RETRY_DELAY_INITIAL),
+ mock.call(RETRY_DELAY_INITIAL * RETRY_DELAY_BACKOFF),
+ mock.call(RETRY_DELAY_INITIAL * RETRY_DELAY_BACKOFF**2),
+ ])
+
+ @mock.patch('time.time', side_effect=[i*2**20 for i in range(99)])
+ def test_close_old_connections_non_retryable(self, sleep):
+ self._test_connection_close(expect=1)
+
+ @mock.patch('time.time', side_effect=itertools.count())
+ def test_no_close_fresh_connections_non_retryable(self, sleep):
+ self._test_connection_close(expect=0)
+
+ @mock.patch('time.time', side_effect=itertools.count())
+ def test_override_max_idle_time(self, sleep):
+ self.api._http._max_keepalive_idle = 0
+ self._test_connection_close(expect=1)
+
+ def _test_connection_close(self, expect=0):
+ # Do two POST requests. The second one must close all
+ # connections +expect+ times.
+ self.api.users().create(body={}).execute()
+ mock_conns = [mock.MagicMock() for i in range(2)]
+ self.api._http.connections = {str(i): mock_conns[i] for i in range(2)}
+ self.api.users().create(body={}).execute()
+ for c in mock_conns:
+ self.assertEqual(c.close.call_count, expect)
+
+ @mock.patch('time.sleep')
+ def test_socket_error_no_retry_post(self, sleep):
+ self.api._http.orig_http_request.side_effect = (
+ socket.error('mock error'),
+ self.request_success,
+ )
+ with self.assertRaises(socket.error):
+ self.api.users().create(body={}).execute()
+ self.assertEqual(self.api._http.orig_http_request.call_count, 1,
+ "client should try non-retryable method exactly once")
+ self.assertEqual(sleep.call_args_list, [])
if __name__ == '__main__':
commit 1c354a633235acc54b3e9f73528bd0907ec5d427
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Nov 25 09:26:10 2015 -0500
7697: Move diag messages from stdout to stderr.
diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index adb8652..d6febdd 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -98,7 +98,8 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
# Pidfile exists, but we can't parse it. Perhaps the
# server has created the file but hasn't written its PID
# yet?
- print("Parse error reading pidfile {}: {}".format(pidfile, error))
+ print("Parse error reading pidfile {}: {}".format(pidfile, error),
+ file=sys.stderr)
time.sleep(0.1)
now = time.time()
@@ -113,7 +114,8 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
try:
if now >= startTERM:
os.kill(server_pid, signal.SIGTERM)
- print("Sent SIGTERM to {} ({})".format(server_pid, pidfile))
+ print("Sent SIGTERM to {} ({})".format(server_pid, pidfile),
+ file=sys.stderr)
except OSError as error:
if error.errno == errno.ESRCH:
# Thrown by os.getpgid() or os.kill() if the process
@@ -124,7 +126,8 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
now = time.time()
print("Server PID {} ({}) did not exit, giving up after {}s".
- format(server_pid, pidfile, wait))
+ format(server_pid, pidfile, wait),
+ file=sys.stderr)
def find_available_port():
"""Return an IPv4 port number that is not in use right now.
@@ -155,7 +158,8 @@ def _wait_until_port_listens(port, timeout=10):
subprocess.check_output(['which', 'lsof'])
except subprocess.CalledProcessError:
print("WARNING: No `lsof` -- cannot wait for port to listen. "+
- "Sleeping 0.5 and hoping for the best.")
+ "Sleeping 0.5 and hoping for the best.",
+ file=sys.stderr)
time.sleep(0.5)
return
deadline = time.time() + timeout
@@ -695,7 +699,9 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.action not in actions:
- print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions), file=sys.stderr)
+ print("Unrecognized action '{}'. Actions are: {}.".
+ format(args.action, actions),
+ file=sys.stderr)
sys.exit(1)
if args.action == 'start':
stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list