[ARVADOS] updated: 355b142ceed7f992d8469a79455767461bf41ace
Git user
git at public.curoverse.com
Wed May 11 18:19:07 EDT 2016
Summary of changes:
sdk/python/arvados/events.py | 42 +++++++++++++++++++++++++-----------
sdk/python/tests/test_events.py | 48 +++++++++++++++++++++++++++++++++++++----
2 files changed, 73 insertions(+), 17 deletions(-)
via 355b142ceed7f992d8469a79455767461bf41ace (commit)
via a5c86079a8d500a0826da173866c88e995ab23d9 (commit)
from 48a69b458cb781972a981b1b11430d50a3c3c759 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 355b142ceed7f992d8469a79455767461bf41ace
Author: Brett Smith <brett at curoverse.com>
Date: Wed May 11 16:41:51 2016 -0400
9135: EventClient.run_forever() delegates to successive _EventClients.
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d72906c..88a7f98 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -84,17 +84,22 @@ class EventClient(object):
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
self.is_closed = False
+ self._event_client_setup_cond = threading.Condition(threading.RLock())
self._setup_event_client()
def _setup_event_client(self):
- self.ec = _EventClient(self.url, self.filters, self.on_event,
- self.last_log_id, self.on_closed)
- self.ec.daemon = True
- try:
- self.ec.connect()
- except Exception:
- self.ec.close_connection()
- raise
+ with self._event_client_setup_cond:
+ ec = _EventClient(self.url, self.filters, self.on_event,
+ self.last_log_id, self.on_closed)
+ ec.daemon = True
+ try:
+ ec.connect()
+ except Exception:
+ ec.close_connection()
+ raise
+ else:
+ self.ec = ec
+ self._event_client_setup_cond.notify_all()
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
@@ -118,7 +123,10 @@ class EventClient(object):
thread.interrupt_main()
def on_closed(self):
- if self.is_closed == False:
+ with self._event_client_setup_cond:
+ if self.is_closed:
+ self._event_client_setup_cond.notify_all()
+ return
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
@@ -129,13 +137,21 @@ class EventClient(object):
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
self.is_closed = True
+ self._event_client_setup_cond.notify_all()
thread.interrupt_main()
return
- def _delegate_to_ec(attr_name):
- return property(lambda self: getattr(self.ec, attr_name))
- for _method_name in ['connect', 'close_connection', 'run_forever']:
- locals()[_method_name] = _delegate_to_ec(_method_name)
+ def run_forever(self):
+ last_ec = None
+ while not self.is_closed:
+ with self._event_client_setup_cond:
+ # Wait until a new _EventClient is ready after disconnecting.
+ if self.ec is last_ec:
+ self._event_client_setup_cond.wait()
+ if self.is_closed:
+ break
+ last_ec = self.ec
+ last_ec.run_forever()
class PollClient(threading.Thread):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 6733f1d..45d68ea 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -1,5 +1,6 @@
import arvados
import io
+import itertools
import logging
import mock
import Queue
@@ -176,7 +177,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
# close (im)properly
if close_unexpected:
- self.ws.close_connection()
+ self.ws.ec.close_connection()
else:
self.ws.close()
@@ -264,9 +265,41 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
def test_run_forever(self, websocket_client):
client = arvados.events.EventClient(
self.MOCK_WS_URL, [], lambda event: None, None)
+ websocket_client().run_forever.side_effect = lambda: client.close()
client.run_forever()
websocket_client().run_forever.assert_called_with()
+ @mock.patch('arvados.events._EventClient')
+ def test_run_forever_survives_reconnects(self, websocket_client):
+ run_forever_cond = threading.Condition()
+ run_forever_counter = itertools.count(1)
+ def _run_forever():
+ with run_forever_cond:
+ next(run_forever_counter)
+ run_forever_cond.notify_all()
+ def _build_client(*args, **kwargs):
+ new_client = mock.MagicMock(name="websocket_client_object")
+ new_client.run_forever.side_effect = _run_forever
+ return new_client
+ # Normal MagicMock behavior means
+ # `websocket_client() is websocket_client()`,
+ # but _EventClient calls need to return a new object every time.
+ websocket_client.side_effect = _build_client
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, [], lambda event: None, None)
+ with run_forever_cond:
+ forever_thread = threading.Thread(target=client.run_forever)
+ forever_thread.start()
+ run_forever_cond.wait()
+ # Simulate an unexpected disconnect.
+ client.on_closed()
+ run_forever_cond.wait()
+ client.close()
+ # Simulate _EventClient calling the callback after normal close.
+ client.on_closed()
+ forever_thread.join()
+ self.assertEqual(3, next(run_forever_counter))
+
class PollClientTestCase(unittest.TestCase):
class MockLogs(object):
commit a5c86079a8d500a0826da173866c88e995ab23d9
Author: Brett Smith <brett at curoverse.com>
Date: Wed May 11 16:09:36 2016 -0400
Fixup test commit: add test_subscribe_method, small patches.
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 0aed754..6733f1d 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -245,11 +245,18 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
rootLogger.removeHandler(streamHandler)
@mock.patch('arvados.events._EventClient')
+ def test_subscribe_method(self, websocket_client):
+ filters = [['object_uuid', 'is_a', 'arvados#human']]
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, [], lambda event: None, None)
+ client.subscribe(filters[:], 99)
+ websocket_client().subscribe.assert_called_with(filters, 99)
+
+ @mock.patch('arvados.events._EventClient')
def test_unsubscribe(self, websocket_client):
- events = Queue.Queue(10)
filters = [['object_uuid', 'is_a', 'arvados#human']]
client = arvados.events.EventClient(
- self.MOCK_WS_URL, filters[:], events.put_nowait, None)
+ self.MOCK_WS_URL, filters[:], lambda event: None, None)
client.unsubscribe(filters[:])
websocket_client().unsubscribe.assert_called_with(filters)
@@ -292,7 +299,7 @@ class PollClientTestCase(unittest.TestCase):
def callback(self, event):
with self.callback_cond:
self.recv_events.append(event)
- self.callback_cond.notifyAll()
+ self.callback_cond.notify_all()
def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
if filters is None:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list