[ARVADOS] created: 441cb401a907ebca19042f73942a76d247bab847
Git user
git at public.curoverse.com
Thu May 5 17:26:08 EDT 2016
at 441cb401a907ebca19042f73942a76d247bab847 (commit)
commit 441cb401a907ebca19042f73942a76d247bab847
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 5 17:25:39 2016 -0400
9135: Make EventClient initialization more consistent.
* DRY up the setup code.
* Make the client a daemon thread, for consistency with PollClient.
* Always try to close the connection on failure. We do this in
_subscribe_websocket, so why not do it on reconnect as well?
* Remove the public connect() and close_connection() methods from
EventClient. They're no longer needed, and avoid leaking
implementation details.
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 8f7ae69..96d7840 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -84,7 +84,17 @@ class EventClient(object):
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
self.is_closed = False
- self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+ self._setup_event_client()
+
+ def _setup_event_client(self):
+ self.ec = _EventClient(self.url, self.filters, self.on_event,
+ self.last_log_id, self.on_closed)
+ self.ec.daemon = True
+ try:
+ self.ec.connect()
+ except Exception:
+ self.ec.close_connection()
+ raise
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
@@ -112,8 +122,7 @@ class EventClient(object):
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
- self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- self.ec.connect()
+ self._setup_event_client()
break
except Exception as e:
_logger.warn("Error '%s' during websocket reconnect.", e)
@@ -123,10 +132,8 @@ class EventClient(object):
thread.interrupt_main()
return
- def _delegate_to_ec(attr_name):
- return property(lambda self: getattr(self.ec, attr_name))
- for _method_name in ['connect', 'close_connection', 'run_forever']:
- locals()[_method_name] = _delegate_to_ec(_method_name)
+ def run_forever(self):
+ return self.ec.run_forever()
class PollClient(threading.Thread):
@@ -249,18 +256,10 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
if not endpoint:
raise errors.FeatureNotEnabledError(
"Server does not advertise a websocket endpoint")
+ uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
try:
- uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
client = EventClient(uri_with_token, filters, on_event, last_log_id)
- ok = False
- try:
- client.connect()
- ok = True
- return client
- finally:
- if not ok:
- client.close_connection()
- except:
+ except Exception:
_logger.warn("Failed to connect to websockets on %s" % endpoint)
raise
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 81beba5..23bd024 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -26,6 +26,14 @@ class EventClientTestCase(unittest.TestCase):
yield arvados.events.EventClient(TEST_WS_URL, filters, callback,
last_log_id), ws_mock
+ def test_client_init(self):
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ ws_mock.assert_called_with(TEST_WS_URL, [[]], client.on_event, None,
+ client.on_closed)
+ ws_mock().connect.assert_called()
+ self.assertIs(ws_mock().daemon, True)
+
def test_subscribe_calls_ws(self):
ws_filter = ['kind', '=', 'arvados#test']
with self.mocked_client() as client_tuple:
@@ -41,19 +49,10 @@ class EventClientTestCase(unittest.TestCase):
client.unsubscribe(ws_filter)
ws_mock().unsubscribe.assert_called_with(ws_filter)
- # PollClient doesn't have this method, but for now you have to call it
- # for anything to work.
- def test_connect_calls_ws(self):
- with self.mocked_client() as client_tuple:
- client, ws_mock = client_tuple
- client.connect()
- ws_mock().connect.assert_called()
-
def test_close_calls_ws(self):
with self.mocked_client() as client_tuple:
client, ws_mock = client_tuple
ws_mock().close.side_effect = lambda *args: client.on_closed()
- client.connect()
client.close()
ws_mock().close.assert_called()
# Check on_closed did not try to reconnect.
@@ -62,6 +61,12 @@ class EventClientTestCase(unittest.TestCase):
def test_run_forever_calls_ws(self):
with self.mocked_client() as client_tuple:
client, ws_mock = client_tuple
- client.connect()
client.run_forever()
ws_mock().run_forever.assert_called()
+
+ def test_reconnect_rebuilds_and_reconnects_underlying_client(self):
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ client.on_closed()
+ self.assertEqual(2, ws_mock.call_count)
+ self.assertEqual(2, ws_mock().connect.call_count)
commit 67bc8ded4ab0d00ca464d360d93d1eed29844e6f
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 5 17:08:27 2016 -0400
9135: EventClient.run_forever() delegates to underlying client.
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d88897f..8f7ae69 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -86,12 +86,6 @@ class EventClient(object):
self.is_closed = False
self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
- def connect(self):
- self.ec.connect()
-
- def close_connection(self):
- self.ec.close_connection()
-
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
self.ec.subscribe(f, last_log_id)
@@ -129,6 +123,11 @@ class EventClient(object):
thread.interrupt_main()
return
+ def _delegate_to_ec(attr_name):
+ return property(lambda self: getattr(self.ec, attr_name))
+ for _method_name in ['connect', 'close_connection', 'run_forever']:
+ locals()[_method_name] = _delegate_to_ec(_method_name)
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 401a1be..81beba5 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -58,3 +58,10 @@ class EventClientTestCase(unittest.TestCase):
ws_mock().close.assert_called()
# Check on_closed did not try to reconnect.
ws_mock().connect.assert_called_once()
+
+ def test_run_forever_calls_ws(self):
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ client.connect()
+ client.run_forever()
+ ws_mock().run_forever.assert_called()
commit f3ec4b1597a43e8f19dbf7c40e4c90fd4510cc9d
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 5 16:55:31 2016 -0400
9135: Add basic tests for EventClient.
They're not much, but they're cheap and they at least make sure
communication happens between classes correctly.
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
new file mode 100644
index 0000000..401a1be
--- /dev/null
+++ b/sdk/python/tests/test_events.py
@@ -0,0 +1,60 @@
+from __future__ import absolute_import, print_function
+
+import contextlib
+import unittest
+
+from . import arvados_testutil
+import arvados.events
+import mock
+
+TEST_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
+
+class EventClientTestCase(unittest.TestCase):
+ def setUp(self):
+ self.recv_events = []
+
+ def callback(self, event):
+ self.recv_events.append(event)
+
+ @contextlib.contextmanager
+ def mocked_client(self, filters=None, callback=None, last_log_id=None):
+ if filters is None:
+ filters = []
+ if callback is None:
+ callback = self.callback
+ with mock.patch('arvados.events._EventClient') as ws_mock:
+ yield arvados.events.EventClient(TEST_WS_URL, filters, callback,
+ last_log_id), ws_mock
+
+ def test_subscribe_calls_ws(self):
+ ws_filter = ['kind', '=', 'arvados#test']
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ client.subscribe(ws_filter)
+ ws_mock().subscribe.assert_called_with(ws_filter, None)
+
+ def test_unsubscribe_calls_ws(self):
+ ws_filter = ['kind', '=', 'arvados#test']
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ client.subscribe(ws_filter)
+ client.unsubscribe(ws_filter)
+ ws_mock().unsubscribe.assert_called_with(ws_filter)
+
+ # PollClient doesn't have this method, but for now you have to call it
+ # for anything to work.
+ def test_connect_calls_ws(self):
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ client.connect()
+ ws_mock().connect.assert_called()
+
+ def test_close_calls_ws(self):
+ with self.mocked_client() as client_tuple:
+ client, ws_mock = client_tuple
+ ws_mock().close.side_effect = lambda *args: client.on_closed()
+ client.connect()
+ client.close()
+ ws_mock().close.assert_called()
+ # Check on_closed did not try to reconnect.
+ ws_mock().connect.assert_called_once()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list