[ARVADOS] updated: 84260dab5182907cae91849acd652c138c2d5095

git at public.curoverse.com git at public.curoverse.com
Tue Nov 17 16:52:10 EST 2015


Summary of changes:
 sdk/python/arvados/keep.py           |  50 ++++++++-----
 sdk/python/tests/keepstub.py         |  69 +++++++++++++++--
 sdk/python/tests/test_keep_client.py | 139 +++++++++++++++++++++++++++--------
 3 files changed, 203 insertions(+), 55 deletions(-)

       via  84260dab5182907cae91849acd652c138c2d5095 (commit)
       via  e9c78ef7855e7ae263fe461e069c89ff7fc0b798 (commit)
      from  714c555bda26a6a27fad7caef382d1d6705ad215 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 84260dab5182907cae91849acd652c138c2d5095
Merge: 714c555 e9c78ef
Author: sguthrie <sallyeguthrie at gmail.com>
Date:   Tue Nov 17 16:49:04 2015 -0500

    Closes #7235. Merge branch '7235-python-keep-client-timeout'


commit e9c78ef7855e7ae263fe461e069c89ff7fc0b798
Author: sguthrie <sallyeguthrie at gmail.com>
Date:   Tue Nov 10 15:23:18 2015 -0500

    Closes #7235. Instead of setting KeepService's pycurl.TIMEOUT_MS, set pycurl.LOW_SPEED_LIMIT and pycurl.LOW_SPEED_TIME.
    Default LOW_SPEED_LIMIT is 32768 bytes per second. Default LOW_SPEED_TIME is 64 seconds.
    If the user specifies a length-two tuple, the first item sets CONNECTTIMEOUT_MS, the second item sets LOW_SPEED_TIME,
    and LOW_SPEED_LIMIT is set to 32768 bytes per second.
    
    Added bandwidth similator to keepstub, which uses millisecond precision (like curl) to measure timeouts.
    Added tests to test_keep_client and modified existing tests to only use integers.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index ec9f6f6..e01fec4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -213,11 +213,13 @@ class KeepBlockCache(object):
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:      300 seconds
+    # Default Keep server read timeout:       64 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
     # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:       300 seconds
-    DEFAULT_TIMEOUT = (2, 300)
-    DEFAULT_PROXY_TIMEOUT = (20, 300)
+    # Default Keep proxy read timeout:        64 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 64, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
 
     class ThreadLimiter(object):
         """Limit the number of threads writing to Keep at once.
@@ -478,11 +480,17 @@ class KeepClient(object):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
-                conn_t, xfer_t = timeouts
+                if len(timeouts) == 2:
+                    conn_t, xfer_t = timeouts
+                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+                else:
+                    conn_t, xfer_t, bandwidth_bps = timeouts
             else:
                 conn_t, xfer_t = (timeouts, timeouts)
+                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             header_line = header_line.decode('iso-8859-1')
@@ -586,20 +594,22 @@ class KeepClient(object):
 
         :timeout:
           The initial timeout (in seconds) for HTTP requests to Keep
-          non-proxy servers.  A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout): see
-          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
-          Because timeouts are often a result of transient server load, the
-          actual connection timeout will be increased by a factor of two on
-          each retry.
-          Default: (2, 300).
+          non-proxy servers.  A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). A connection
+          will be aborted if the average traffic rate falls below
+          minimum_bandwidth bytes per second over an interval of read_timeout
+          seconds. Because timeouts are often a result of transient server
+          load, the actual connection timeout will be increased by a factor
+          of two on each retry.
+          Default: (2, 64, 32768).
 
         :proxy_timeout:
           The initial timeout (in seconds) for HTTP requests to
-          Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). The behavior described
-          above for adjusting connection timeouts on retry also applies.
-          Default: (20, 300).
+          Keep proxies. A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+          described above for adjusting connection timeouts on retry also
+          applies.
+          Default: (20, 64, 32768).
 
         :api_token:
           If you're not using an API client, but only talking
@@ -686,8 +696,10 @@ class KeepClient(object):
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         t = self.proxy_timeout if self.using_proxy else self.timeout
-        return (t[0] * (1 << attempt_number), t[1])
-
+        if len(t) == 2:
+            return (t[0] * (1 << attempt_number), t[1])
+        else:
+            return (t[0] * (1 << attempt_number), t[1], t[2])
     def _any_nondisk_services(self, service_list):
         return any(ks.get('service_type', 'disk') != 'disk'
                    for ks in service_list)
diff --git a/sdk/python/tests/keepstub.py b/sdk/python/tests/keepstub.py
index ef724ed..f074f8d 100644
--- a/sdk/python/tests/keepstub.py
+++ b/sdk/python/tests/keepstub.py
@@ -22,7 +22,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
             'response_body': 0,
             # before returning from handler (thus setting response EOF)
             'response_close': 0,
+            # after writing over 1s worth of data at self.bandwidth
+            'mid_write': 0,
+            # after reading over 1s worth of data at self.bandwidth
+            'mid_read': 0,
         }
+        self.bandwidth = None
         super(Server, self).__init__(*args, **kwargs)
 
     def setdelays(self, **kwargs):
@@ -31,6 +36,12 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
             self.delays.get(k) # NameError if unknown key
             self.delays[k] = v
 
+    def setbandwidth(self, bandwidth):
+        """For future requests, set the maximum bandwidth (number of bytes per
+        second) to operate at. If setbandwidth is never called, function at
+        maximum bandwidth possible"""
+        self.bandwidth = float(bandwidth)
+
     def _sleep_at_least(self, seconds):
         """Sleep for given time, even if signals are received."""
         wake = time.time() + seconds
@@ -44,6 +55,53 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer, object):
 
 
 class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
+    def wfile_bandwidth_write(self, data_to_write):
+        if self.server.bandwidth == None and self.server.delays['mid_write'] == 0:
+            self.wfile.write(data_to_write)
+        else:
+            BYTES_PER_WRITE = int(self.server.bandwidth/4.0) or 32768
+            outage_happened = False
+            num_bytes = len(data_to_write)
+            num_sent_bytes = 0
+            target_time = time.time()
+            while num_sent_bytes < num_bytes:
+                if num_sent_bytes > self.server.bandwidth and not outage_happened:
+                    self.server._do_delay('mid_write')
+                    target_time += self.delays['mid_write']
+                    outage_happened = True
+                num_write_bytes = min(BYTES_PER_WRITE,
+                    num_bytes - num_sent_bytes)
+                self.wfile.write(data_to_write[
+                    num_sent_bytes:num_sent_bytes+num_write_bytes])
+                num_sent_bytes += num_write_bytes
+                if self.server.bandwidth is not None:
+                    target_time += num_write_bytes / self.server.bandwidth
+                    self.server._sleep_at_least(target_time - time.time())
+        return None
+
+    def rfile_bandwidth_read(self, bytes_to_read):
+        if self.server.bandwidth == None and self.server.delays['mid_read'] == 0:
+            return self.rfile.read(bytes_to_read)
+        else:
+            BYTES_PER_READ = int(self.server.bandwidth/4.0) or 32768
+            data = ''
+            outage_happened = False
+            bytes_read = 0
+            target_time = time.time()
+            while bytes_to_read > bytes_read:
+                if bytes_read > self.server.bandwidth and not outage_happened:
+                    self.server._do_delay('mid_read')
+                    target_time += self.delays['mid_read']
+                    outage_happened = True
+                next_bytes_to_read = min(BYTES_PER_READ,
+                    bytes_to_read - bytes_read)
+                data += self.rfile.read(next_bytes_to_read)
+                bytes_read += next_bytes_to_read
+                if self.server.bandwidth is not None:
+                    target_time += next_bytes_to_read / self.server.bandwidth
+                    self.server._sleep_at_least(target_time - time.time())
+        return data
+
     def handle(self, *args, **kwargs):
         self.server._do_delay('request')
         return super(Handler, self).handle(*args, **kwargs)
@@ -60,21 +118,18 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         self.send_header('Content-type', 'application/octet-stream')
         self.end_headers()
         self.server._do_delay('response_body')
-        self.wfile.write(self.server.store[datahash])
+        self.wfile_bandwidth_write(self.server.store[datahash])
         self.server._do_delay('response_close')
 
     def do_PUT(self):
         self.server._do_delay('request_body')
-
         # The comments at https://bugs.python.org/issue1491 implies that Python
         # 2.7 BaseHTTPRequestHandler was patched to support 100 Continue, but
         # reading the actual code that ships in Debian it clearly is not, so we
         # need to send the response on the socket directly.
-
-        self.wfile.write("%s %d %s\r\n\r\n" %
+        self.wfile_bandwidth_write("%s %d %s\r\n\r\n" %
                          (self.protocol_version, 100, "Continue"))
-
-        data = self.rfile.read(int(self.headers.getheader('content-length')))
+        data = self.rfile_bandwidth_read(int(self.headers.getheader('content-length')))
         datahash = hashlib.md5(data).hexdigest()
         self.server.store[datahash] = data
         self.server._do_delay('response')
@@ -82,7 +137,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler, object):
         self.send_header('Content-type', 'text/plain')
         self.end_headers()
         self.server._do_delay('response_body')
-        self.wfile.write(datahash + '+' + str(len(data)))
+        self.wfile_bandwidth_write(datahash + '+' + str(len(data)))
         self.server._do_delay('response_close')
 
     def log_request(self, *args, **kwargs):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index ada0dac..52eb0e9 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -287,8 +287,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
     def test_put_timeout(self):
         api_client = self.mock_keep_services(count=1)
@@ -301,8 +304,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
     def test_proxy_get_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -315,8 +321,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
     def test_proxy_put_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -329,8 +338,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
     def check_no_services_error(self, verb, exc_class):
         api_client = mock.MagicMock(name='api_client')
@@ -570,7 +582,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
 
 
 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
-    DATA = 'x' * 2**10
+    # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
+    # 1s worth of data and then trigger bandwidth errors before running
+    # out of data.
+    DATA = 'x'*2**11
+    BANDWIDTH_LOW_LIM = 1024
+    TIMEOUT_TIME = 1.0
 
     class assertTakesBetween(unittest.TestCase):
         def __init__(self, tmin, tmax):
@@ -581,8 +598,22 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
             self.t0 = time.time()
 
         def __exit__(self, *args, **kwargs):
-            self.assertGreater(time.time() - self.t0, self.tmin)
-            self.assertLess(time.time() - self.t0, self.tmax)
+            # Round times to milliseconds, like CURL. Otherwise, we
+            # fail when CURL reaches a 1s timeout at 0.9998s.
+            delta = round(time.time() - self.t0, 3)
+            self.assertGreaterEqual(delta, self.tmin)
+            self.assertLessEqual(delta, self.tmax)
+
+    class assertTakesGreater(unittest.TestCase):
+        def __init__(self, tmin):
+            self.tmin = tmin
+
+        def __enter__(self):
+            self.t0 = time.time()
+
+        def __exit__(self, *args, **kwargs):
+            delta = round(time.time() - self.t0, 3)
+            self.assertGreaterEqual(delta, self.tmin)
 
     def setUp(self):
         sock = socket.socket()
@@ -602,7 +633,7 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
     def tearDown(self):
         self.server.shutdown()
 
-    def keepClient(self, timeouts=(0.1, 1.0)):
+    def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
         return arvados.KeepClient(
             api_client=self.api_client,
             timeout=timeouts)
@@ -617,39 +648,89 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
         )
         with self.assertTakesBetween(0.1, 0.5):
             with self.assertRaises(arvados.errors.KeepWriteError):
-                self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+                self.keepClient().put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_no_delays_success(self):
+        self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+    def test_too_low_bandwidth_no_delays_failure(self):
+        # Check that lessening bandwidth corresponds to failing
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_with_server_response_delay_failure(self):
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+        self.server.setdelays(response=self.TIMEOUT_TIME)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_with_server_mid_delay_failure(self):
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+        self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
 
     def test_timeout_slow_request(self):
-        self.server.setdelays(request=0.2)
-        self._test_200ms()
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(request=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(request=2)
+        self._test_response_timeout_under_2s(loc)
 
     def test_timeout_slow_response(self):
-        self.server.setdelays(response=0.2)
-        self._test_200ms()
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(response=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(response=2)
+        self._test_response_timeout_under_2s(loc)
 
     def test_timeout_slow_response_body(self):
-        self.server.setdelays(response_body=0.2)
-        self._test_200ms()
-
-    def _test_200ms(self):
-        """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(response_body=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(response_body=2)
+        self._test_response_timeout_under_2s(loc)
 
+    def _test_connect_timeout_under_200ms(self, loc):
         # Allow 100ms to connect, then 1s for response. Everything
         # should work, and everything should take at least 200ms to
         # return.
-        kc = self.keepClient((.1, 1))
+        kc = self.keepClient(timeouts=(.1, 1))
         with self.assertTakesBetween(.2, .3):
-            loc = kc.put(self.DATA, copies=1, num_retries=0)
+            kc.put(self.DATA, copies=1, num_retries=0)
         with self.assertTakesBetween(.2, .3):
             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
 
-        # Allow 1s to connect, then 100ms for response. Nothing should
-        # work, and everything should take at least 100ms to return.
-        kc = self.keepClient((1, .1))
-        with self.assertTakesBetween(.1, .2):
+    def _test_response_timeout_under_2s(self, loc):
+        # Allow 10s to connect, then 1s for response. Nothing should
+        # work, and everything should take at least 1s to return.
+        kc = self.keepClient(timeouts=(10, 1))
+        with self.assertTakesBetween(1, 1.9):
             with self.assertRaises(arvados.errors.KeepReadError):
                 kc.get(loc, num_retries=0)
-        with self.assertTakesBetween(.1, .2):
+        with self.assertTakesBetween(1, 1.9):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list