[ARVADOS] updated: 7ddbf6b763f15bb84d7d897d373e37e85a8c3d6b

Git user git at public.curoverse.com
Fri Apr 8 16:15:03 EDT 2016


Summary of changes:
 sdk/python/arvados/events.py        | 50 ++++++++++++++++++-------------------
 sdk/python/tests/test_websockets.py |  2 +-
 2 files changed, 25 insertions(+), 27 deletions(-)

       via  7ddbf6b763f15bb84d7d897d373e37e85a8c3d6b (commit)
      from  7c607284a048225b86b210c8c5397ff8a983d820 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7ddbf6b763f15bb84d7d897d373e37e85a8c3d6b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Apr 8 16:14:59 2016 -0400

    7658: Clean up & handle subscription filters consistently across EventClient,
    _EventClient and PollClient.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 942fb76..79960c4 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -28,10 +28,7 @@ class _EventClient(WebSocketClient):
         # ws4py's WebSocketBaseClient.__init__.
         super(_EventClient, self).__init__(url, ssl_options=ssl_options)
 
-        if filters:
-            self.filters = filters
-        else:
-            self.filters = [[]]
+        self.filters = filters
         self.on_event = on_event
         self.last_log_id = last_log_id
         self._closing_lock = threading.RLock()
@@ -40,7 +37,8 @@ class _EventClient(WebSocketClient):
         self.on_closed = on_closed
 
     def opened(self):
-        self.subscribe(self.filters, self.last_log_id)
+        for f in self.filters:
+            self.subscribe(f, self.last_log_id)
 
     def closed(self, code, reason=None):
         self._closed.set()
@@ -64,25 +62,27 @@ class _EventClient(WebSocketClient):
         # wait for ws4py to tell us the connection is closed.
         self._closed.wait(timeout=timeout)
 
-    def subscribe(self, filters, last_log_id=None):
-        m = {"method": "subscribe", "filters": filters}
+    def subscribe(self, f, last_log_id=None):
+        m = {"method": "subscribe", "filters": f}
         if last_log_id is not None:
             m["last_log_id"] = last_log_id
         self.send(json.dumps(m))
 
-    def unsubscribe(self, filters):
-        self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+    def unsubscribe(self, f):
+        self.send(json.dumps({"method": "unsubscribe", "filters": f}))
 
 
 class EventClient(object):
     def __init__(self, url, filters, on_event_cb, last_log_id):
         self.url = url
-        self.filters = filters
+        if filters:
+            self.filters = [filters]
+        else:
+            self.filters = [[]]
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
         self.is_closed = False
-        self.subscriptions = {}
-        self.ec = _EventClient(url, filters, self.on_event, last_log_id, self.on_closed)
+        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
 
     def connect(self):
         self.ec.connect()
@@ -90,13 +90,13 @@ class EventClient(object):
     def close_connection(self):
         self.ec.close_connection()
 
-    def subscribe(self, filters, last_log_id=None):
-        self.subscriptions[str(filters)] = filters
-        self.ec.subscribe(filters, last_log_id)
+    def subscribe(self, f, last_log_id=None):
+        self.filters.append(f)
+        self.ec.subscribe(f, last_log_id)
 
-    def unsubscribe(self, filters):
-        del self.subscriptions[str(filters)]
-        self.ec.unsubscribe(filters)
+    def unsubscribe(self, f):
+        del self.filters[self.filters.index(f)]
+        self.ec.unsubscribe(f)
 
     def close(self, code=1000, reason='', timeout=0):
         self.is_closed = True
@@ -114,11 +114,9 @@ class EventClient(object):
             while True:
               try:
                   self.ec.connect()
-                  for s in self.subscriptions:
-                      self.ec.subscribe(self.subscriptions[s], self.last_log_id)
                   break
-              except:
-                  _logger.warn("Error during websocket reconnect. Will retry after 5s.")
+              except Exception as e:
+                  _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
                   time.sleep(5)
 
 
@@ -197,12 +195,12 @@ class PollClient(threading.Thread):
             # to do so raises the same exception."
             pass
 
-    def subscribe(self, filters):
+    def subscribe(self, f):
         self.on_event({'status': 200})
-        self.filters.append(filters)
+        self.filters.append(f)
 
-    def unsubscribe(self, filters):
-        del self.filters[self.filters.index(filters)]
+    def unsubscribe(self, f):
+        del self.filters[self.filters.index(f)]
 
 
 def _subscribe_websocket(api, filters, on_event, last_log_id=None):
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index f3f82fe..1ed138c 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -221,6 +221,6 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # verify log messages to ensure retry happened
         log_messages = log_file.read()
-        found = log_messages.find("Error during websocket reconnect. Will retry")
+        found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect. Will retry")
         self.assertNotEqual(found, -1)
         rootLogger.removeHandler(fileHandler)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list