[ARVADOS] updated: 84260dab5182907cae91849acd652c138c2d5095
git at public.curoverse.com
git at public.curoverse.com
Tue Nov 17 16:52:10 EST 2015
Summary of changes:
sdk/python/arvados/keep.py | 50 ++++++++-----
sdk/python/tests/keepstub.py | 69 +++++++++++++++--
sdk/python/tests/test_keep_client.py | 139 +++++++++++++++++++++++++++--------
3 files changed, 203 insertions(+), 55 deletions(-)
via 84260dab5182907cae91849acd652c138c2d5095 (commit)
via e9c78ef7855e7ae263fe461e069c89ff7fc0b798 (commit)
from 714c555bda26a6a27fad7caef382d1d6705ad215 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 84260dab5182907cae91849acd652c138c2d5095
Merge: 714c555 e9c78ef
Author: sguthrie <sallyeguthrie at gmail.com>
Date: Tue Nov 17 16:49:04 2015 -0500
Closes #7235. Merge branch '7235-python-keep-client-timeout'
commit e9c78ef7855e7ae263fe461e069c89ff7fc0b798
Author: sguthrie <sallyeguthrie at gmail.com>
Date: Tue Nov 10 15:23:18 2015 -0500
Closes #7235. Instead of setting KeepService's pycurl.TIMEOUT_MS, set pycurl.LOW_SPEED_LIMIT and pycurl.LOW_SPEED_TIME.
Default LOW_SPEED_LIMIT is 32768 bytes per second. Default LOW_SPEED_TIME is 64 seconds.
If the user specifies a length-two tuple, the first item sets CONNECTTIMEOUT_MS, the second item sets LOW_SPEED_TIME,
and LOW_SPEED_LIMIT is set to 32768 bytes per second.
Added bandwidth similator to keepstub, which uses millisecond precision (like curl) to measure timeouts.
Added tests to test_keep_client and modified existing tests to only use integers.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index ec9f6f6..e01fec4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -213,11 +213,13 @@ class KeepBlockCache(object):
class KeepClient(object):
# Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 300 seconds
+ # Default Keep server read timeout: 64 seconds
+ # Default Keep server bandwidth minimum: 32768 bytes per second
# Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 300 seconds
- DEFAULT_TIMEOUT = (2, 300)
- DEFAULT_PROXY_TIMEOUT = (20, 300)
+ # Default Keep proxy read timeout: 64 seconds
+ # Default Keep proxy bandwidth minimum: 32768 bytes per second
+ DEFAULT_TIMEOUT = (2, 64, 32768)
+ DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
class ThreadLimiter(object):
"""Limit the number of threads writing to Keep at once.
@@ -478,11 +480,17 @@ class KeepClient(object):
if not timeouts:
return
elif isinstance(timeouts, tuple):
- conn_t, xfer_t = timeouts
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
else:
conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
header_line = header_line.decode('iso-8859-1')
@@ -586,20 +594,22 @@ class KeepClient(object):
:timeout:
The initial timeout (in seconds) for HTTP requests to Keep
- non-proxy servers. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout): see
- http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
- Because timeouts are often a result of transient server load, the
- actual connection timeout will be increased by a factor of two on
- each retry.
- Default: (2, 300).
+ non-proxy servers. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). A connection
+ will be aborted if the average traffic rate falls below
+ minimum_bandwidth bytes per second over an interval of read_timeout
+ seconds. Because timeouts are often a result of transient server
+ load, the actual connection timeout will be increased by a factor
+ of two on each retry.
+ Default: (2, 64, 32768).
:proxy_timeout:
The initial timeout (in seconds) for HTTP requests to
- Keep proxies. A tuple of two floats is interpreted as
- (connection_timeout, read_timeout). The behavior described
- above for adjusting connection timeouts on retry also applies.
- Default: (20, 300).
+ Keep proxies. A tuple of three floats is interpreted as
+ (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+ described above for adjusting connection timeouts on retry also
+ applies.
+ Default: (20, 64, 32768).
:api_token:
If you're not using an API client, but only talking
@@ -686,8 +696,10 @@ class KeepClient(object):
# TODO(twp): the timeout should be a property of a
# KeepService, not a KeepClient. See #4488.
t = self.proxy_timeout if self.using_proxy else self.timeout
- return (t[0] * (1 << attempt_number), t[1])
-
+ if len(t) == 2:
+ return (t[0] * (1 << attempt_number), t[1])
+ else:
+ return (t[0] * (1 << attempt_number), t[1], t[2])
def _any_nondisk_services(self, service_list):
return any(ks.get('service_type', 'disk') != 'disk'
for ks in service_list)
diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
index ef724ed..f074f8d 100644
--- a/sdk/python/tests/keepstub.py
+++ b/sdk/python/tests/keepstub.py
@@ -22,7 +22,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
'response_body': 0,
# before returning from handler (thus setting response EOF)
'response_close': 0,
+ # after writing over 1s worth of data at self.bandwidth
+ 'mid_write': 0,
+ # after reading over 1s worth of data at self.bandwidth
+ 'mid_read': 0,
}
+ self.bandwidth = None
super(Server, self).__init__(*args, **kwargs)
def setdelays(self, **kwargs):
@@ -31,6 +36,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
self.delays.get(k) # NameError if unknown key
self.delays[k] = v
+ def setbandwidth(self, bandwidth):
+ """For future requests, set the maximum bandwidth (number of bytes per
+ second) to operate at. If setbandwidth is never called, function at
+ maximum bandwidth possible"""
+ self.bandwidth = float(bandwidth)
+
def _sleep_at_least(self, seconds):
"""Sleep for given time, even if signals are received."""
wake = time.time() + seconds
@@ -44,6 +55,53 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+ def wfile_bandwidth_write(self, data_to_write):
+ if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
+ self.wfile.write(data_to_write)
+ else:
+ BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
+ outage_happened = False
+ num_bytes = len(data_to_write)
+ num_sent_bytes = 0
+ target_time = time.time()
+ while num_sent_bytes < num_bytes:
+ if num_sent_bytes > self.server.bandwidth and not outage_happened:
+ self.server._do_delay('mid_write')
+ target_time += self.delays['mid_write']
+ outage_happened = True
+ num_write_bytes = min(BYTES_PER_WRITE,
+ num_bytes - num_sent_bytes)
+ self.wfile.write(data_to_write[
+ num_sent_bytes:num_sent_bytes+num_write_bytes])
+ num_sent_bytes += num_write_bytes
+ if self.server.bandwidth is not None:
+ target_time += num_write_bytes / self.server.bandwidth
+ self.server._sleep_at_least(target_time - time.time())
+ return None
+
+ def rfile_bandwidth_read(self, bytes_to_read):
+ if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
+ return self.rfile.read(bytes_to_read)
+ else:
+ BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
+ data = ''
+ outage_happened = False
+ bytes_read = 0
+ target_time = time.time()
+ while bytes_to_read > bytes_read:
+ if bytes_read > self.server.bandwidth and not outage_happened:
+ self.server._do_delay('mid_read')
+ target_time += self.delays['mid_read']
+ outage_happened = True
+ next_bytes_to_read = min(BYTES_PER_READ,
+ bytes_to_read - bytes_read)
+ data += self.rfile.read(next_bytes_to_read)
+ bytes_read += next_bytes_to_read
+ if self.server.bandwidth is not None:
+ target_time += next_bytes_to_read / self.server.bandwidth
+ self.server._sleep_at_least(target_time - time.time())
+ return data
+
def handle(self, *args, **kwargs):
self.server._do_delay('request')
return super(Handler, self).handle(*args, **kwargs)
@@ -60,21 +118,18 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
self.send_header('Content-type', 'application/octet-stream')
self.end_headers()
self.server._do_delay('response_body')
- self.wfile.write(self.server.store[datahash])
+ self.wfile_bandwidth_write(self.server.store[datahash])
self.server._do_delay('response_close')
def do_PUT(self):
self.server._do_delay('request_body')
-
# The comments at https://bugs.python.org/issue1491 implies that Python
# 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
# reading the actual code that ships in Debian it clearly is not, so we
# need to send the response on the socket directly.
-
- self.wfile.write("%s %d %s\r\n\r\n" %
+ self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
(self.protocol_version, 100, "Continue"))
-
- data = self.rfile.read(int(self.headers.getheader('content-length')))
+ data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
datahash = hashlib.md5(data).hexdigest()
self.server.store[datahash] = data
self.server._do_delay('response')
@@ -82,7 +137,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.server._do_delay('response_body')
- self.wfile.write(datahash + '+' + str(len(data)))
+ self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
self.server._do_delay('response_close')
def log_request(self, *args, **kwargs):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index ada0dac..52eb0e9 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -287,8 +287,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
def test_put_timeout(self):
api_client = self.mock_keep_services(count=1)
@@ -301,8 +304,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
def test_proxy_get_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -315,8 +321,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
def test_proxy_put_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -329,8 +338,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
- mock.responses[0].getopt(pycurl.TIMEOUT_MS),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+ mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ self.assertEqual(
+ mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+ int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
def check_no_services_error(self, verb, exc_class):
api_client = mock.MagicMock(name='api_client')
@@ -570,7 +582,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
- DATA = 'x' * 2**10
+ # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
+ # 1s worth of data and then trigger bandwidth errors before running
+ # out of data.
+ DATA = 'x'*2**11
+ BANDWIDTH_LOW_LIM = 1024
+ TIMEOUT_TIME = 1.0
class assertTakesBetween(unittest.TestCase):
def __init__(self, tmin, tmax):
@@ -581,8 +598,22 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
self.t0 = time.time()
def __exit__(self, *args, **kwargs):
- self.assertGreater(time.time() - self.t0, self.tmin)
- self.assertLess(time.time() - self.t0, self.tmax)
+ # Round times to milliseconds, like CURL. Otherwise, we
+ # fail when CURL reaches a 1s timeout at 0.9998s.
+ delta = round(time.time() - self.t0, 3)
+ self.assertGreaterEqual(delta, self.tmin)
+ self.assertLessEqual(delta, self.tmax)
+
+ class assertTakesGreater(unittest.TestCase):
+ def __init__(self, tmin):
+ self.tmin = tmin
+
+ def __enter__(self):
+ self.t0 = time.time()
+
+ def __exit__(self, *args, **kwargs):
+ delta = round(time.time() - self.t0, 3)
+ self.assertGreaterEqual(delta, self.tmin)
def setUp(self):
sock = socket.socket()
@@ -602,7 +633,7 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
def tearDown(self):
self.server.shutdown()
- def keepClient(self, timeouts=(0.1, 1.0)):
+ def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
return arvados.KeepClient(
api_client=self.api_client,
timeout=timeouts)
@@ -617,39 +648,89 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
)
with self.assertTakesBetween(0.1, 0.5):
with self.assertRaises(arvados.errors.KeepWriteError):
- self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+ self.keepClient().put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_no_delays_success(self):
+ self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+ def test_too_low_bandwidth_no_delays_failure(self):
+ # Check that lessening bandwidth corresponds to failing
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_with_server_response_delay_failure(self):
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+ self.server.setdelays(response=self.TIMEOUT_TIME)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
+
+ def test_low_bandwidth_with_server_mid_delay_failure(self):
+ kc = self.keepClient()
+ loc = kc.put(self.DATA, copies=1, num_retries=0)
+ self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+ self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepReadError) as e:
+ kc.get(loc, num_retries=0)
+ with self.assertTakesGreater(self.TIMEOUT_TIME):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ kc.put(self.DATA, copies=1, num_retries=0)
def test_timeout_slow_request(self):
- self.server.setdelays(request=0.2)
- self._test_200ms()
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(request=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(request=2)
+ self._test_response_timeout_under_2s(loc)
def test_timeout_slow_response(self):
- self.server.setdelays(response=0.2)
- self._test_200ms()
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(response=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(response=2)
+ self._test_response_timeout_under_2s(loc)
def test_timeout_slow_response_body(self):
- self.server.setdelays(response_body=0.2)
- self._test_200ms()
-
- def _test_200ms(self):
- """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+ loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+ self.server.setdelays(response_body=.2)
+ self._test_connect_timeout_under_200ms(loc)
+ self.server.setdelays(response_body=2)
+ self._test_response_timeout_under_2s(loc)
+ def _test_connect_timeout_under_200ms(self, loc):
# Allow 100ms to connect, then 1s for response. Everything
# should work, and everything should take at least 200ms to
# return.
- kc = self.keepClient((.1, 1))
+ kc = self.keepClient(timeouts=(.1, 1))
with self.assertTakesBetween(.2, .3):
- loc = kc.put(self.DATA, copies=1, num_retries=0)
+ kc.put(self.DATA, copies=1, num_retries=0)
with self.assertTakesBetween(.2, .3):
self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
- # Allow 1s to connect, then 100ms for response. Nothing should
- # work, and everything should take at least 100ms to return.
- kc = self.keepClient((1, .1))
- with self.assertTakesBetween(.1, .2):
+ def _test_response_timeout_under_2s(self, loc):
+ # Allow 10s to connect, then 1s for response. Nothing should
+ # work, and everything should take at least 1s to return.
+ kc = self.keepClient(timeouts=(10, 1))
+ with self.assertTakesBetween(1, 1.9):
with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
- with self.assertTakesBetween(.1, .2):
+ with self.assertTakesBetween(1, 1.9):
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list