[ARVADOS] updated: 7ddbf6b763f15bb84d7d897d373e37e85a8c3d6b
Git user
git at public.curoverse.com
Fri Apr 8 16:15:03 EDT 2016
Summary of changes:
sdk/python/arvados/events.py | 50 ++++++++++++++++++-------------------
sdk/python/tests/test_websockets.py | 2 +-
2 files changed, 25 insertions(+), 27 deletions(-)
via 7ddbf6b763f15bb84d7d897d373e37e85a8c3d6b (commit)
from 7c607284a048225b86b210c8c5397ff8a983d820 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7ddbf6b763f15bb84d7d897d373e37e85a8c3d6b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 8 16:14:59 2016 -0400
7658: Clean up & handle subscription filters consistently across EventClient,
_EventClient and PollClient.
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 942fb76..79960c4 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -28,10 +28,7 @@ class _EventClient(WebSocketClient):
# ws4py's WebSocketBaseClient.__init__.
super(_EventClient, self).__init__(url, ssl_options=ssl_options)
- if filters:
- self.filters = filters
- else:
- self.filters = [[]]
+ self.filters = filters
self.on_event = on_event
self.last_log_id = last_log_id
self._closing_lock = threading.RLock()
@@ -40,7 +37,8 @@ class _EventClient(WebSocketClient):
self.on_closed = on_closed
def opened(self):
- self.subscribe(self.filters, self.last_log_id)
+ for f in self.filters:
+ self.subscribe(f, self.last_log_id)
def closed(self, code, reason=None):
self._closed.set()
@@ -64,25 +62,27 @@ class _EventClient(WebSocketClient):
# wait for ws4py to tell us the connection is closed.
self._closed.wait(timeout=timeout)
- def subscribe(self, filters, last_log_id=None):
- m = {"method": "subscribe", "filters": filters}
+ def subscribe(self, f, last_log_id=None):
+ m = {"method": "subscribe", "filters": f}
if last_log_id is not None:
m["last_log_id"] = last_log_id
self.send(json.dumps(m))
- def unsubscribe(self, filters):
- self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+ def unsubscribe(self, f):
+ self.send(json.dumps({"method": "unsubscribe", "filters": f}))
class EventClient(object):
def __init__(self, url, filters, on_event_cb, last_log_id):
self.url = url
- self.filters = filters
+ if filters:
+ self.filters = [filters]
+ else:
+ self.filters = [[]]
self.on_event_cb = on_event_cb
self.last_log_id = last_log_id
self.is_closed = False
- self.subscriptions = {}
- self.ec = _EventClient(url, filters, self.on_event, last_log_id, self.on_closed)
+ self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
def connect(self):
self.ec.connect()
@@ -90,13 +90,13 @@ class EventClient(object):
def close_connection(self):
self.ec.close_connection()
- def subscribe(self, filters, last_log_id=None):
- self.subscriptions[str(filters)] = filters
- self.ec.subscribe(filters, last_log_id)
+ def subscribe(self, f, last_log_id=None):
+ self.filters.append(f)
+ self.ec.subscribe(f, last_log_id)
- def unsubscribe(self, filters):
- del self.subscriptions[str(filters)]
- self.ec.unsubscribe(filters)
+ def unsubscribe(self, f):
+ del self.filters[self.filters.index(f)]
+ self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
self.is_closed = True
@@ -114,11 +114,9 @@ class EventClient(object):
while True:
try:
self.ec.connect()
- for s in self.subscriptions:
- self.ec.subscribe(self.subscriptions[s], self.last_log_id)
break
- except:
- _logger.warn("Error during websocket reconnect. Will retry after 5s.")
+ except Exception as e:
+ _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e, exc_info=e)
time.sleep(5)
@@ -197,12 +195,12 @@ class PollClient(threading.Thread):
# to do so raises the same exception."
pass
- def subscribe(self, filters):
+ def subscribe(self, f):
self.on_event({'status': 200})
- self.filters.append(filters)
+ self.filters.append(f)
- def unsubscribe(self, filters):
- del self.filters[self.filters.index(filters)]
+ def unsubscribe(self, f):
+ del self.filters[self.filters.index(f)]
def _subscribe_websocket(api, filters, on_event, last_log_id=None):
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index f3f82fe..1ed138c 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -221,6 +221,6 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
# verify log messages to ensure retry happened
log_messages = log_file.read()
- found = log_messages.find("Error during websocket reconnect. Will retry")
+ found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect. Will retry")
self.assertNotEqual(found, -1)
rootLogger.removeHandler(fileHandler)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list