[ARVADOS] updated: db2aa9fbecde5e972b7c8d340c171f8b5646be97
Git user
git at public.curoverse.com
Wed May 11 18:57:56 EDT 2016
Summary of changes:
sdk/python/arvados/events.py | 46 +++++++++++++---------------------------
sdk/python/tests/test_events.py | 47 +++++++++++++----------------------------
2 files changed, 30 insertions(+), 63 deletions(-)
discards 355b142ceed7f992d8469a79455767461bf41ace (commit)
via db2aa9fbecde5e972b7c8d340c171f8b5646be97 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (355b142ceed7f992d8469a79455767461bf41ace)
\
N -- N -- N (db2aa9fbecde5e972b7c8d340c171f8b5646be97)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit db2aa9fbecde5e972b7c8d340c171f8b5646be97
Author: Brett Smith <brett at curoverse.com>
Date: Wed May 11 18:40:31 2016 -0400
9135: Restore EventClient.run_forever().
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d72906c..81a9b36 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -83,7 +83,7 @@ class EventClient(object):
self.filters = [[]]
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
- self.is_closed = False
+ self.is_closed = threading.Event()
self._setup_event_client()
def _setup_event_client(self):
@@ -105,7 +105,7 @@ class EventClient(object):
self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
- self.is_closed = True
+ self.is_closed.set()
self.ec.close(code, reason, timeout)
def on_event(self, m):
@@ -118,7 +118,7 @@ class EventClient(object):
thread.interrupt_main()
def on_closed(self):
- if self.is_closed == False:
+ if not self.is_closed.is_set():
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
@@ -128,14 +128,14 @@ class EventClient(object):
_logger.warn("Error '%s' during websocket reconnect.", e)
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
- self.is_closed = True
+ self.is_closed.set()
thread.interrupt_main()
return
- def _delegate_to_ec(attr_name):
- return property(lambda self: getattr(self.ec, attr_name))
- for _method_name in ['connect', 'close_connection', 'run_forever']:
- locals()[_method_name] = _delegate_to_ec(_method_name)
+ def run_forever(self):
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
class PollClient(threading.Thread):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 6733f1d..f2cdba2 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -176,7 +176,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
# close (im)properly
if close_unexpected:
- self.ws.close_connection()
+ self.ws.ec.close_connection()
else:
self.ws.close()
@@ -261,11 +261,27 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
websocket_client().unsubscribe.assert_called_with(filters)
@mock.patch('arvados.events._EventClient')
- def test_run_forever(self, websocket_client):
+ def test_run_forever_survives_reconnects(self, websocket_client):
+ connection_cond = threading.Condition()
+ def ws_connect():
+ with connection_cond:
+ connection_cond.notify_all()
+ websocket_client().connect.side_effect = ws_connect
client = arvados.events.EventClient(
self.MOCK_WS_URL, [], lambda event: None, None)
- client.run_forever()
- websocket_client().run_forever.assert_called_with()
+ with connection_cond:
+ forever_thread = threading.Thread(target=client.run_forever)
+ forever_thread.start()
+ # Simulate an unexpected disconnect, and wait for reconnect.
+ close_thread = threading.Thread(target=client.on_closed)
+ close_thread.start()
+ connection_cond.wait()
+ close_thread.join()
+ run_forever_alive = forever_thread.is_alive()
+ client.close()
+ forever_thread.join()
+ self.assertTrue(run_forever_alive)
+ self.assertEqual(2, websocket_client().connect.call_count)
class PollClientTestCase(unittest.TestCase):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list