[ARVADOS] created: 7b2cd50f499a1f9c5b21ab6aa641f14701402885

Git user git at public.curoverse.com
Mon Apr 4 15:42:49 EDT 2016


        at  7b2cd50f499a1f9c5b21ab6aa641f14701402885 (commit)


commit 7b2cd50f499a1f9c5b21ab6aa641f14701402885
Author: radhika <radhika at curoverse.com>
Date:   Mon Apr 4 15:41:56 2016 -0400

    7658: add reconnect logic when a websocket is closed unexpectedly.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index df824a3..fcc3feb 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -14,8 +14,8 @@ from ws4py.client.threadedclient import WebSocketClient
 _logger = logging.getLogger('arvados.events')
 
 
-class EventClient(WebSocketClient):
-    def __init__(self, url, filters, on_event, last_log_id):
+class _EventClient(WebSocketClient):
+    def __init__(self, url, filters, on_event, last_log_id, on_closed=None):
         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
             ssl_options['cert_reqs'] = ssl.CERT_NONE
@@ -26,19 +26,21 @@ class EventClient(WebSocketClient):
         # IPv4 addresses (common with "localhost"), only one of them
         # will be attempted -- and it might not be the right one. See
         # ws4py's WebSocketBaseClient.__init__.
-        super(EventClient, self).__init__(url, ssl_options=ssl_options)
+        super(_EventClient, self).__init__(url, ssl_options=ssl_options)
         self.filters = filters
         self.on_event = on_event
         self.last_log_id = last_log_id
         self._closing_lock = threading.RLock()
         self._closing = False
         self._closed = threading.Event()
+        self.on_closed = on_closed
 
     def opened(self):
         self.subscribe(self.filters, self.last_log_id)
 
     def closed(self, code, reason=None):
         self._closed.set()
+        self.on_closed()
 
     def received_message(self, m):
         with self._closing_lock:
@@ -51,7 +53,7 @@ class EventClient(WebSocketClient):
         :timeout: is the number of seconds to wait for ws4py to
         indicate that the connection has closed.
         """
-        super(EventClient, self).close(code, reason)
+        super(_EventClient, self).close(code, reason)
         with self._closing_lock:
             # make sure we don't process any more messages.
             self._closing = True
@@ -68,6 +70,49 @@ class EventClient(WebSocketClient):
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
 
+class EventClient(object):
+    def __init__(self, url, filters, on_event_cb, last_log_id):
+        self.url = url
+        self.filters = filters
+        self.on_event_cb = on_event_cb
+        self.last_log_id = last_log_id
+        self.is_closed = False
+        self.subscriptions = {}
+        self.subscriptions[str(filters)] = filters
+        self.ec = _EventClient(url, filters, self.on_event, last_log_id, self.on_closed)
+
+    def connect(self):
+        self.ec.connect()
+
+    def close_connection(self):
+        self.ec.close_connection()
+
+    def subscribe(self, filters, last_log_id=None):
+        self.subscriptions[str(filters)] = filters
+        self.ec.subscribe(filters, last_log_id)
+
+    def unsubscribe(self, filters):
+        del self.subscriptions[str(filters)]
+        self.ec.unsubscribe(filters)
+
+    def close(self, code=1000, reason='', timeout=0):
+        self.is_closed = True
+        self.ec.close(code, reason, timeout)
+
+    def on_event(self, m):
+        if m.get('id') != None:
+            self.last_log_id = m.get('id')
+        self.on_event_cb(m)
+
+    def on_closed(self):
+        if self.is_closed == False:
+            filters = []
+            for s in self.subscriptions:
+                filters.append(self.subscriptions[s])
+            self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
+            self.ec.connect()
+
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
         super(PollClient, self).__init__()
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index 37b644a..2049f56 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -120,3 +120,56 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     def isotz(self, offset):
         """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
         return '{:+03d}{:02d}'.format(offset/60, offset%60)
+
+    # Test websocket reconnection on unexecpted close
+    def _test_websocket_reconnect(self, close_unexpected):
+        run_test_server.authorize_with('active')
+        events = Queue.Queue(100)
+
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        filters.append(['created_at', '>=', self.localiso(self.TIME_PAST)])
+        self.ws = arvados.events.subscribe(
+            arvados.api('v1'), filters,
+            events.put_nowait,
+            poll_fallback=False,
+            last_log_id=None)
+        self.assertIsInstance(self.ws, arvados.events.EventClient)
+        self.assertEqual(200, events.get(True, 5)['status'])
+
+        # create obj
+        human = arvados.api('v1').humans().create(body={}).execute()
+
+        # expect an event
+        self.assertIn(human['uuid'], events.get(True, 5)['object_uuid'])
+        with self.assertRaises(Queue.Empty):
+            self.assertEqual(events.get(True, 2), None)
+
+        # create one more obj
+        human2 = arvados.api('v1').humans().create(body={}).execute()
+
+        # close (im)properly
+        if close_unexpected:
+            self.ws.close_connection()
+        else:
+            self.ws.close()
+
+        # (un)expect the object creation event
+        if close_unexpected:
+            log_object_uuids = []
+            for i in range(0, 2):
+                event = events.get(True, 5)
+                if event.get('object_uuid') != None:
+                    log_object_uuids.append(event['object_uuid'])
+            with self.assertRaises(Queue.Empty):
+                self.assertEqual(events.get(True, 2), None)
+            self.assertNotIn(human['uuid'], log_object_uuids)
+            self.assertIn(human2['uuid'], log_object_uuids)
+        else:
+            with self.assertRaises(Queue.Empty):
+                self.assertEqual(events.get(True, 2), None)
+
+    def test_websocket_reconnect_on_unexpected_close(self):
+        self._test_websocket_reconnect(True)
+
+    def test_websocket_no_reconnect_on_close_by_user(self):
+        self._test_websocket_reconnect(False)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list