[ARVADOS] created: 7b2cd50f499a1f9c5b21ab6aa641f14701402885
Git user
git at public.curoverse.com
Mon Apr 4 15:42:49 EDT 2016
at 7b2cd50f499a1f9c5b21ab6aa641f14701402885 (commit)
commit 7b2cd50f499a1f9c5b21ab6aa641f14701402885
Author: radhika <radhika at curoverse.com>
Date: Mon Apr 4 15:41:56 2016 -0400
7658: add reconnect logic when a websocket is closed unexpectedly.
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index df824a3..fcc3feb 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -14,8 +14,8 @@ from ws4py.client.threadedclient import WebSocketClient
_logger = logging.getLogger('arvados.events')
-class EventClient(WebSocketClient):
- def __init__(self, url, filters, on_event, last_log_id):
+class _EventClient(WebSocketClient):
+ def __init__(self, url, filters, on_event, last_log_id, on_closed=None):
ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
ssl_options['cert_reqs'] = ssl.CERT_NONE
@@ -26,19 +26,21 @@ class EventClient(WebSocketClient):
# IPv4 addresses (common with "localhost"), only one of them
# will be attempted -- and it might not be the right one. See
# ws4py's WebSocketBaseClient.__init__.
- super(EventClient, self).__init__(url, ssl_options=ssl_options)
+ super(_EventClient, self).__init__(url, ssl_options=ssl_options)
self.filters = filters
self.on_event = on_event
self.last_log_id = last_log_id
self._closing_lock = threading.RLock()
self._closing = False
self._closed = threading.Event()
+ self.on_closed = on_closed
def opened(self):
self.subscribe(self.filters, self.last_log_id)
def closed(self, code, reason=None):
self._closed.set()
+ self.on_closed()
def received_message(self, m):
with self._closing_lock:
@@ -51,7 +53,7 @@ class EventClient(WebSocketClient):
:timeout: is the number of seconds to wait for ws4py to
indicate that the connection has closed.
"""
- super(EventClient, self).close(code, reason)
+ super(_EventClient, self).close(code, reason)
with self._closing_lock:
# make sure we don't process any more messages.
self._closing = True
@@ -68,6 +70,49 @@ class EventClient(WebSocketClient):
self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+class EventClient(object):
+ def __init__(self, url, filters, on_event_cb, last_log_id):
+ self.url = url
+ self.filters = filters
+ self.on_event_cb = on_event_cb
+ self.last_log_id = last_log_id
+ self.is_closed = False
+ self.subscriptions = {}
+ self.subscriptions[str(filters)] = filters
+ self.ec = _EventClient(url, filters, self.on_event, last_log_id, self.on_closed)
+
+ def connect(self):
+ self.ec.connect()
+
+ def close_connection(self):
+ self.ec.close_connection()
+
+ def subscribe(self, filters, last_log_id=None):
+ self.subscriptions[str(filters)] = filters
+ self.ec.subscribe(filters, last_log_id)
+
+ def unsubscribe(self, filters):
+ del self.subscriptions[str(filters)]
+ self.ec.unsubscribe(filters)
+
+ def close(self, code=1000, reason='', timeout=0):
+ self.is_closed = True
+ self.ec.close(code, reason, timeout)
+
+ def on_event(self, m):
+ if m.get('id') != None:
+ self.last_log_id = m.get('id')
+ self.on_event_cb(m)
+
+ def on_closed(self):
+ if self.is_closed == False:
+ filters = []
+ for s in self.subscriptions:
+ filters.append(self.subscriptions[s])
+ self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+ self.ec.connect()
+
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
super(PollClient, self).__init__()
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index 37b644a..2049f56 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -120,3 +120,56 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
def isotz(self, offset):
"""Convert minutes-east-of-UTC to ISO8601 time zone designator"""
return '{:+03d}{:02d}'.format(offset/60, offset%60)
+
+ # Test websocket reconnection on unexecpted close
+ def _test_websocket_reconnect(self, close_unexpected):
+ run_test_server.authorize_with('active')
+ events = Queue.Queue(100)
+
+ filters = [['object_uuid', 'is_a', 'arvados#human']]
+ filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
+ self.ws = arvados.events.subscribe(
+ arvados.api('v1'), filters,
+ events.put_nowait,
+ poll_fallback=False,
+ last_log_id=None)
+ self.assertIsInstance(self.ws, arvados.events.EventClient)
+ self.assertEqual(200, events.get(True, 5)['status'])
+
+ # create obj
+ human = arvados.api('v1').humans().create(body={}).execute()
+
+ # expect an event
+ self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
+ with self.assertRaises(Queue.Empty):
+ self.assertEqual(events.get(True, 2), None)
+
+ # create one more obj
+ human2 = arvados.api('v1').humans().create(body={}).execute()
+
+ # close (im)properly
+ if close_unexpected:
+ self.ws.close_connection()
+ else:
+ self.ws.close()
+
+ # (un)expect the object creation event
+ if close_unexpected:
+ log_object_uuids = []
+ for i in range(0, 2):
+ event = events.get(True, 5)
+ if event.get('object_uuid') != None:
+ log_object_uuids.append(event['object_uuid'])
+ with self.assertRaises(Queue.Empty):
+ self.assertEqual(events.get(True, 2), None)
+ self.assertNotIn(human['uuid'], log_object_uuids)
+ self.assertIn(human2['uuid'], log_object_uuids)
+ else:
+ with self.assertRaises(Queue.Empty):
+ self.assertEqual(events.get(True, 2), None)
+
+ def test_websocket_reconnect_on_unexpected_close(self):
+ self._test_websocket_reconnect(True)
+
+ def test_websocket_no_reconnect_on_close_by_user(self):
+ self._test_websocket_reconnect(False)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list