[ARVADOS] updated: 7f7ed63314bec77bf93edc233514c50a1030cac4
Git user
git at public.curoverse.com
Fri May 13 10:56:00 EDT 2016
Summary of changes:
sdk/python/arvados/events.py | 46 +++----
.../tests/{test_websockets.py => test_events.py} | 143 +++++++++++++++++++--
2 files changed, 158 insertions(+), 31 deletions(-)
rename sdk/python/tests/{test_websockets.py => test_events.py} (65%)
via 7f7ed63314bec77bf93edc233514c50a1030cac4 (commit)
via 839a409f51114316c0683fbd00d8174fc24b4a9f (commit)
via 3dd2a1957ae4106bfc2bd5405662c47c087eb79c (commit)
via ec38d4ae1a6597e1f35d5cd0b25f6384666c55be (commit)
via c60995ff0471ce31f0858d1f19a6788014112438 (commit)
from a7f30c913114a5e22d5e37b3fba67332f56ce6d9 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7f7ed63314bec77bf93edc233514c50a1030cac4
Merge: a7f30c9 839a409
Author: Brett Smith <brett at curoverse.com>
Date: Fri May 13 10:55:31 2016 -0400
Merge branch '9135-eventclient-run-forever-wip'
Closes #9135, #9157.
commit 839a409f51114316c0683fbd00d8174fc24b4a9f
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 9 12:54:23 2016 -0400
9135: Bring EventClient's public interface closer to PollClient's.
* Restore the run_forever method, which was previously inherited from
WebSocketClient.
* Remove the connect and close_connection methods, which are
WebSocketClient implementation details that don't make sense as part
of the public interface. (A running EventClient will just reconnect
if you call close_connection on it.)
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 68cfa21..81a9b36 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -83,7 +83,7 @@ class EventClient(object):
self.filters = [[]]
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
- self.is_closed = False
+ self.is_closed = threading.Event()
self._setup_event_client()
def _setup_event_client(self):
@@ -96,12 +96,6 @@ class EventClient(object):
self.ec.close_connection()
raise
- def connect(self):
- self.ec.connect()
-
- def close_connection(self):
- self.ec.close_connection()
-
def subscribe(self, f, last_log_id=None):
self.filters.append(f)
self.ec.subscribe(f, last_log_id)
@@ -111,7 +105,7 @@ class EventClient(object):
self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
- self.is_closed = True
+ self.is_closed.set()
self.ec.close(code, reason, timeout)
def on_event(self, m):
@@ -124,7 +118,7 @@ class EventClient(object):
thread.interrupt_main()
def on_closed(self):
- if self.is_closed == False:
+ if not self.is_closed.is_set():
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
@@ -134,10 +128,15 @@ class EventClient(object):
_logger.warn("Error '%s' during websocket reconnect.", e)
if tries_left == 0:
_logger.exception("EventClient thread could not contact websocket server.")
- self.is_closed = True
+ self.is_closed.set()
thread.interrupt_main()
return
+ def run_forever(self):
+ # Have to poll here to let KeyboardInterrupt get raised.
+ while not self.is_closed.wait(1):
+ pass
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 7b69fa2..f2cdba2 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -176,7 +176,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
# close (im)properly
if close_unexpected:
- self.ws.close_connection()
+ self.ws.ec.close_connection()
else:
self.ws.close()
@@ -260,13 +260,28 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
client.unsubscribe(filters[:])
websocket_client().unsubscribe.assert_called_with(filters)
- @unittest.expectedFailure
@mock.patch('arvados.events._EventClient')
- def test_run_forever(self, websocket_client):
+ def test_run_forever_survives_reconnects(self, websocket_client):
+ connection_cond = threading.Condition()
+ def ws_connect():
+ with connection_cond:
+ connection_cond.notify_all()
+ websocket_client().connect.side_effect = ws_connect
client = arvados.events.EventClient(
self.MOCK_WS_URL, [], lambda event: None, None)
- client.run_forever()
- websocket_client().run_forever.assert_called_with()
+ with connection_cond:
+ forever_thread = threading.Thread(target=client.run_forever)
+ forever_thread.start()
+ # Simulate an unexpected disconnect, and wait for reconnect.
+ close_thread = threading.Thread(target=client.on_closed)
+ close_thread.start()
+ connection_cond.wait()
+ close_thread.join()
+ run_forever_alive = forever_thread.is_alive()
+ client.close()
+ forever_thread.join()
+ self.assertTrue(run_forever_alive)
+ self.assertEqual(2, websocket_client().connect.call_count)
class PollClientTestCase(unittest.TestCase):
commit 3dd2a1957ae4106bfc2bd5405662c47c087eb79c
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 9 12:57:42 2016 -0400
9135: Make EventClient initialization more consistent.
* DRY up the setup code. This includes always trying to close the
conenction after failure, since we were doing that in the initial
connection.
* Make the client a daemon thread, for consistency with PollClient.
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d88897f..68cfa21 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -84,7 +84,17 @@ class EventClient(object):
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
self.is_closed = False
- self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+ self._setup_event_client()
+
+ def _setup_event_client(self):
+ self.ec = _EventClient(self.url, self.filters, self.on_event,
+ self.last_log_id, self.on_closed)
+ self.ec.daemon = True
+ try:
+ self.ec.connect()
+ except Exception:
+ self.ec.close_connection()
+ raise
def connect(self):
self.ec.connect()
@@ -118,8 +128,7 @@ class EventClient(object):
_logger.warn("Unexpected close. Reconnecting.")
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
try:
- self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
- self.ec.connect()
+ self._setup_event_client()
break
except Exception as e:
_logger.warn("Error '%s' during websocket reconnect.", e)
@@ -250,20 +259,14 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
if not endpoint:
raise errors.FeatureNotEnabledError(
"Server does not advertise a websocket endpoint")
+ uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
try:
- uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
client = EventClient(uri_with_token, filters, on_event, last_log_id)
- ok = False
- try:
- client.connect()
- ok = True
- return client
- finally:
- if not ok:
- client.close_connection()
- except:
+ except Exception:
_logger.warn("Failed to connect to websockets on %s" % endpoint)
raise
+ else:
+ return client
def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
commit ec38d4ae1a6597e1f35d5cd0b25f6384666c55be
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 9 12:40:10 2016 -0400
9135: Clean imports in test_events.
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index c637efe..7b69fa2 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -1,14 +1,9 @@
import arvados
-import arvados.events
-import arvados.errors
-from datetime import datetime, timedelta, tzinfo
+import io
import logging
-import logging.handlers
import mock
import Queue
import run_test_server
-import StringIO
-import tempfile
import threading
import time
import unittest
@@ -156,7 +151,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
run_test_server.authorize_with('active')
events = Queue.Queue(100)
- logstream = StringIO.StringIO()
+ logstream = io.BytesIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)
@@ -224,7 +219,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
def test_websocket_reconnect_retry(self, event_client_connect):
event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
- logstream = StringIO.StringIO()
+ logstream = io.BytesIO()
rootLogger = logging.getLogger()
streamHandler = logging.StreamHandler(logstream)
rootLogger.addHandler(streamHandler)
commit c60995ff0471ce31f0858d1f19a6788014112438
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 9 12:18:28 2016 -0400
9135: Add basic tests for Python events listeners.
These ensure that both classes have the core methods subscribe,
unsubscribe, run_forever, and close.
Rename the test file to test_events, to better match other test
patterns, and account for the fact it tests both classes in the
module.
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_events.py
similarity index 69%
rename from sdk/python/tests/test_websockets.py
rename to sdk/python/tests/test_events.py
index d122a1c..c637efe 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_events.py
@@ -13,11 +13,14 @@ import threading
import time
import unittest
+import arvados_testutil
+
class WebsocketTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
TIME_PAST = time.time()-3600
TIME_FUTURE = time.time()+3600
+ MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
def setUp(self):
self.ws = None
@@ -245,3 +248,115 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
self.assertNotEqual(found, -1)
rootLogger.removeHandler(streamHandler)
+
+ @mock.patch('arvados.events._EventClient')
+ def test_subscribe_method(self, websocket_client):
+ filters = [['object_uuid', 'is_a', 'arvados#human']]
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, [], lambda event: None, None)
+ client.subscribe(filters[:], 99)
+ websocket_client().subscribe.assert_called_with(filters, 99)
+
+ @mock.patch('arvados.events._EventClient')
+ def test_unsubscribe(self, websocket_client):
+ filters = [['object_uuid', 'is_a', 'arvados#human']]
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, filters[:], lambda event: None, None)
+ client.unsubscribe(filters[:])
+ websocket_client().unsubscribe.assert_called_with(filters)
+
+ @unittest.expectedFailure
+ @mock.patch('arvados.events._EventClient')
+ def test_run_forever(self, websocket_client):
+ client = arvados.events.EventClient(
+ self.MOCK_WS_URL, [], lambda event: None, None)
+ client.run_forever()
+ websocket_client().run_forever.assert_called_with()
+
+
+class PollClientTestCase(unittest.TestCase):
+ class MockLogs(object):
+ def __init__(self):
+ self.logs = []
+ self.lock = threading.Lock()
+
+ def add(self, log):
+ with self.lock:
+ self.logs.append(log)
+
+ def return_list(self, num_retries=None):
+ with self.lock:
+ retval = self.logs
+ self.logs = []
+ return {'items': retval, 'items_available': len(retval)}
+
+
+ def setUp(self):
+ self.logs = self.MockLogs()
+ self.arv = mock.MagicMock(name='arvados.api()')
+ self.arv.logs().list().execute.side_effect = self.logs.return_list
+ self.callback_cond = threading.Condition()
+ self.recv_events = []
+
+ def tearDown(self):
+ if hasattr(self, 'client'):
+ self.client.close(timeout=None)
+
+ def callback(self, event):
+ with self.callback_cond:
+ self.recv_events.append(event)
+ self.callback_cond.notify_all()
+
+ def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
+ if filters is None:
+ filters = []
+ if callback is None:
+ callback = self.callback
+ self.client = arvados.events.PollClient(
+ self.arv, filters, callback, poll_time, last_log_id)
+
+ def was_filter_used(self, target):
+ return any(target in call[-1].get('filters', [])
+ for call in self.arv.logs().list.call_args_list)
+
+ def test_callback(self):
+ test_log = {'id': 12345, 'testkey': 'testtext'}
+ self.logs.add({'id': 123})
+ self.build_client(poll_time=.01)
+ with self.callback_cond:
+ self.client.start()
+ self.callback_cond.wait()
+ self.logs.add(test_log.copy())
+ self.callback_cond.wait()
+ self.client.close(timeout=None)
+ self.assertIn(test_log, self.recv_events)
+
+ def test_subscribe(self):
+ client_filter = ['kind', '=', 'arvados#test']
+ self.build_client()
+ self.client.subscribe([client_filter[:]])
+ with self.callback_cond:
+ self.client.start()
+ self.callback_cond.wait()
+ self.client.close(timeout=None)
+ self.assertTrue(self.was_filter_used(client_filter))
+
+ def test_unsubscribe(self):
+ client_filter = ['kind', '=', 'arvados#test']
+ self.build_client()
+ self.client.subscribe([client_filter[:]])
+ self.client.unsubscribe([client_filter[:]])
+ self.client.start()
+ self.client.close(timeout=None)
+ self.assertFalse(self.was_filter_used(client_filter))
+
+ def test_run_forever(self):
+ self.build_client()
+ with self.callback_cond:
+ self.client.start()
+ forever_thread = threading.Thread(target=self.client.run_forever)
+ forever_thread.start()
+ self.callback_cond.wait()
+ self.assertTrue(forever_thread.is_alive())
+ self.client.close()
+ forever_thread.join()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list