[ARVADOS] updated: 355b142ceed7f992d8469a79455767461bf41ace

Git user git at public.curoverse.com
Wed May 11 18:19:07 EDT 2016


Summary of changes:
 sdk/python/arvados/events.py    | 42 +++++++++++++++++++++++++-----------
 sdk/python/tests/test_events.py | 48 +++++++++++++++++++++++++++++++++++++----
 2 files changed, 73 insertions(+), 17 deletions(-)

       via  355b142ceed7f992d8469a79455767461bf41ace (commit)
       via  a5c86079a8d500a0826da173866c88e995ab23d9 (commit)
      from  48a69b458cb781972a981b1b11430d50a3c3c759 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 355b142ceed7f992d8469a79455767461bf41ace
Author: Brett Smith <brett at curoverse.com>
Date:   Wed May 11 16:41:51 2016 -0400

    9135: EventClient.run_forever() delegates to successive _EventClients.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d72906c..88a7f98 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -84,17 +84,22 @@ class EventClient(object):
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
         self.is_closed = False
+        self._event_client_setup_cond = threading.Condition(threading.RLock())
         self._setup_event_client()
 
     def _setup_event_client(self):
-        self.ec = _EventClient(self.url, self.filters, self.on_event,
-                               self.last_log_id, self.on_closed)
-        self.ec.daemon = True
-        try:
-            self.ec.connect()
-        except Exception:
-            self.ec.close_connection()
-            raise
+        with self._event_client_setup_cond:
+            ec = _EventClient(self.url, self.filters, self.on_event,
+                              self.last_log_id, self.on_closed)
+            ec.daemon = True
+            try:
+                ec.connect()
+            except Exception:
+                ec.close_connection()
+                raise
+            else:
+                self.ec = ec
+                self._event_client_setup_cond.notify_all()
 
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
@@ -118,7 +123,10 @@ class EventClient(object):
             thread.interrupt_main()
 
     def on_closed(self):
-        if self.is_closed == False:
+        with self._event_client_setup_cond:
+            if self.is_closed:
+                self._event_client_setup_cond.notify_all()
+                return
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
@@ -129,13 +137,21 @@ class EventClient(object):
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
                 self.is_closed = True
+                self._event_client_setup_cond.notify_all()
                 thread.interrupt_main()
                 return
 
-    def _delegate_to_ec(attr_name):
-        return property(lambda self: getattr(self.ec, attr_name))
-    for _method_name in ['connect', 'close_connection', 'run_forever']:
-        locals()[_method_name] = _delegate_to_ec(_method_name)
+    def run_forever(self):
+        last_ec = None
+        while not self.is_closed:
+            with self._event_client_setup_cond:
+                # Wait until a new _EventClient is ready after disconnecting.
+                if self.ec is last_ec:
+                    self._event_client_setup_cond.wait()
+                    if self.is_closed:
+                        break
+                last_ec = self.ec
+            last_ec.run_forever()
 
 
 class PollClient(threading.Thread):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 6733f1d..45d68ea 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -1,5 +1,6 @@
 import arvados
 import io
+import itertools
 import logging
 import mock
 import Queue
@@ -176,7 +177,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # close (im)properly
         if close_unexpected:
-            self.ws.close_connection()
+            self.ws.ec.close_connection()
         else:
             self.ws.close()
 
@@ -264,9 +265,41 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     def test_run_forever(self, websocket_client):
         client = arvados.events.EventClient(
             self.MOCK_WS_URL, [], lambda event: None, None)
+        websocket_client().run_forever.side_effect = lambda: client.close()
         client.run_forever()
         websocket_client().run_forever.assert_called_with()
 
+    @mock.patch('arvados.events._EventClient')
+    def test_run_forever_survives_reconnects(self, websocket_client):
+        run_forever_cond = threading.Condition()
+        run_forever_counter = itertools.count(1)
+        def _run_forever():
+            with run_forever_cond:
+                next(run_forever_counter)
+                run_forever_cond.notify_all()
+        def _build_client(*args, **kwargs):
+            new_client = mock.MagicMock(name="websocket_client_object")
+            new_client.run_forever.side_effect = _run_forever
+            return new_client
+        # Normal MagicMock behavior means
+        # `websocket_client() is websocket_client()`,
+        # but _EventClient calls need to return a new object every time.
+        websocket_client.side_effect = _build_client
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, [], lambda event: None, None)
+        with run_forever_cond:
+            forever_thread = threading.Thread(target=client.run_forever)
+            forever_thread.start()
+            run_forever_cond.wait()
+            # Simulate an unexpected disconnect.
+            client.on_closed()
+            run_forever_cond.wait()
+            client.close()
+            # Simulate _EventClient calling the callback after normal close.
+            client.on_closed()
+        forever_thread.join()
+        self.assertEqual(3, next(run_forever_counter))
+
 
 class PollClientTestCase(unittest.TestCase):
     class MockLogs(object):

commit a5c86079a8d500a0826da173866c88e995ab23d9
Author: Brett Smith <brett at curoverse.com>
Date:   Wed May 11 16:09:36 2016 -0400

    Fixup test commit: add test_subscribe_method, small patches.

diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 0aed754..6733f1d 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -245,11 +245,18 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         rootLogger.removeHandler(streamHandler)
 
     @mock.patch('arvados.events._EventClient')
+    def test_subscribe_method(self, websocket_client):
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, [], lambda event: None, None)
+        client.subscribe(filters[:], 99)
+        websocket_client().subscribe.assert_called_with(filters, 99)
+
+    @mock.patch('arvados.events._EventClient')
     def test_unsubscribe(self, websocket_client):
-        events = Queue.Queue(10)
         filters = [['object_uuid', 'is_a', 'arvados#human']]
         client = arvados.events.EventClient(
-            self.MOCK_WS_URL, filters[:], events.put_nowait, None)
+            self.MOCK_WS_URL, filters[:], lambda event: None, None)
         client.unsubscribe(filters[:])
         websocket_client().unsubscribe.assert_called_with(filters)
 
@@ -292,7 +299,7 @@ class PollClientTestCase(unittest.TestCase):
     def callback(self, event):
         with self.callback_cond:
             self.recv_events.append(event)
-            self.callback_cond.notifyAll()
+            self.callback_cond.notify_all()
 
     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
         if filters is None:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list