[ARVADOS] created: 6215b2e1e6373ae2b1f21aaf566c2b31d52a2ff7
git at public.curoverse.com
git at public.curoverse.com
Mon Feb 1 15:57:36 EST 2016
at 6215b2e1e6373ae2b1f21aaf566c2b31d52a2ff7 (commit)
commit 6215b2e1e6373ae2b1f21aaf566c2b31d52a2ff7
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Feb 1 01:58:34 2016 -0500
8288: Add timeout option to close() method of event clients.
Previously in EventClient, close() didn't wait for anything. Now, if a
timeout is given, it waits for ws4py to call the closed() callback to
indicate the connection has closed.
Previously in PollClient, close() waited indefinitely for the polling
thread to terminate. This can take a very long time if, for example,
there are multiple subscriptions and the "get logs" API transaction is
slow.
The only apparent reason a caller would want to wait here at all is to
guarantee the simplifying assumption the on_event() callback is never
called after close(). Now, instead of letting the thread run until
all events are received and handled, PollClient achieves this the same
way EventClient does: ignore events that arrive after close().
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 3132da3..94b8a9d 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -13,6 +13,7 @@ from ws4py.client.threadedclient import WebSocketClient
_logger = logging.getLogger('arvados.events')
+
class EventClient(WebSocketClient):
def __init__(self, url, filters, on_event, last_log_id):
ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
@@ -29,23 +30,33 @@ class EventClient(WebSocketClient):
self.filters = filters
self.on_event = on_event
self.last_log_id = last_log_id
- self._closed_lock = threading.RLock()
- self._closed = False
+ self._closing_lock = threading.RLock()
+ self._closing = False
+ self._closed = threading.Event()
def opened(self):
self.subscribe(self.filters, self.last_log_id)
+ def closed(self, code, reason=None):
+ self._closed.set()
+
def received_message(self, m):
- with self._closed_lock:
- if not self._closed:
+ with self._closing_lock:
+ if not self._closing:
self.on_event(json.loads(str(m)))
- def close(self, code=1000, reason=''):
- """Close event client and wait for it to finish."""
+ def close(self, code=1000, reason='', timeout=0):
+ """Close event client and optionally wait for it to finish.
+
+ :timeout: is the number of seconds to wait for ws4py to
+ indicate that the connection has closed.
+ """
super(EventClient, self).close(code, reason)
- with self._closed_lock:
+ with self._closing_lock:
# make sure we don't process any more messages.
- self._closed = True
+ self._closing = True
+ # wait for ws4py to tell us the connection is closed.
+ self._closed.wait(timeout=timeout)
def subscribe(self, filters, last_log_id=None):
m = {"method": "subscribe", "filters": filters}
@@ -56,6 +67,7 @@ class EventClient(WebSocketClient):
def unsubscribe(self, filters):
self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time, last_log_id):
super(PollClient, self).__init__()
@@ -67,8 +79,9 @@ class PollClient(threading.Thread):
self.on_event = on_event
self.poll_time = poll_time
self.daemon = True
- self.stop = threading.Event()
self.last_log_id = last_log_id
+ self._closing = threading.Event()
+ self._closing_lock = threading.RLock()
def run(self):
self.id = 0
@@ -83,7 +96,7 @@ class PollClient(threading.Thread):
self.on_event({'status': 200})
- while not self.stop.isSet():
+ while not self._closing.is_set():
max_id = self.id
moreitems = False
for f in self.filters:
@@ -91,24 +104,38 @@ class PollClient(threading.Thread):
for i in items["items"]:
if i['id'] > max_id:
max_id = i['id']
- self.on_event(i)
+ with self._closing_lock:
+ if self._closing.is_set():
+ return
+ self.on_event(i)
if items["items_available"] > len(items["items"]):
moreitems = True
self.id = max_id
if not moreitems:
- self.stop.wait(self.poll_time)
+ self._closing.wait(self.poll_time)
def run_forever(self):
# Have to poll here, otherwise KeyboardInterrupt will never get processed.
- while not self.stop.is_set():
- self.stop.wait(1)
+ while not self._closing.is_set():
+ self._closing.wait(1)
+
+ def close(self, code=None, reason=None, timeout=0):
+ """Close poll client and optionally wait for it to finish.
+
+ If an :on_event: handler is running in a different thread,
+ first wait (indefinitely) for it to return.
+
+ After closing, wait up to :timeout: seconds for the thread to
+ finish the poll request in progress (if any).
- def close(self):
- """Close poll client and wait for it to finish."""
+ :code: and :reason: are ignored. They are present for
+ interface compatibility with EventClient.
+ """
- self.stop.set()
+ with self._closing_lock:
+ self._closing.set()
try:
- self.join()
+ self.join(timeout=timeout)
except RuntimeError:
# "join() raises a RuntimeError if an attempt is made to join the
# current thread as that would cause a deadlock. It is also an
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list