[ARVADOS] created: aae3db0804ac50aaf3e6b731166d934258cbf671

Git user git at public.curoverse.com
Fri May 6 17:59:35 EDT 2016


        at  aae3db0804ac50aaf3e6b731166d934258cbf671 (commit)


commit aae3db0804ac50aaf3e6b731166d934258cbf671
Author: Brett Smith <brett at curoverse.com>
Date:   Fri May 6 17:47:09 2016 -0400

    9135: Fix how filters are passed in PollClient.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 39e1567..074abf3 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -186,7 +186,7 @@ class PollClient(threading.Thread):
             for f in self.filters:
                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
                     try:
-                        items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+                        items = self.api.logs().list(order="id asc", filters=[f, ["id", ">", str(self.id)]]).execute()
                         break
                     except errors.ApiError as error:
                         pass
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index ee0d0ce..9403add 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -139,7 +139,6 @@ class PollClientTestCase(unittest.TestCase):
         self.client.close(timeout=None)
         self.assertIn(test_log, self.recv_events)
 
-    @unittest.skip("method is buggy")
     def test_subscribe(self):
         client_filter = ['kind', '=', 'arvados#test']
         self.build_client()

commit 319f99b37bb82f455bf21edfdba3f3aa506c7751
Author: Brett Smith <brett at curoverse.com>
Date:   Fri May 6 17:52:01 2016 -0400

    9135: Make EventClient initialization more consistent.
    
    * DRY up the setup code.
    * Make the client a daemon thread, for consistency with PollClient.
    * Always try to close the connection on failure.  We do this in
      _subscribe_websocket, so why not do it on reconnect as well?

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 8f7ae69..39e1567 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -84,7 +84,17 @@ class EventClient(object):
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
         self.is_closed = False
-        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+        self._setup_event_client()
+
+    def _setup_event_client(self):
+        self.ec = _EventClient(self.url, self.filters, self.on_event,
+                               self.last_log_id, self.on_closed)
+        self.ec.daemon = True
+        try:
+            self.ec.connect()
+        except Exception:
+            self.ec.close_connection()
+            raise
 
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
@@ -112,8 +122,7 @@ class EventClient(object):
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
-                    self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-                    self.ec.connect()
+                    self._setup_event_client()
                     break
                 except Exception as e:
                     _logger.warn("Error '%s' during websocket reconnect.", e)
@@ -249,18 +258,10 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     if not endpoint:
         raise errors.FeatureNotEnabledError(
             "Server does not advertise a websocket endpoint")
+    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
     try:
-        uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
-        ok = False
-        try:
-            client.connect()
-            ok = True
-            return client
-        finally:
-            if not ok:
-                client.close_connection()
-    except:
+    except Exception:
         _logger.warn("Failed to connect to websockets on %s" % endpoint)
         raise
 
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 6ad8e05..ee0d0ce 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -29,6 +29,14 @@ class EventClientTestCase(unittest.TestCase):
             yield arvados.events.EventClient(TEST_WS_URL, filters, callback,
                                              last_log_id), ws_mock
 
+    def test_client_init(self):
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            ws_mock.assert_called_with(TEST_WS_URL, [[]], client.on_event, None,
+                                       client.on_closed)
+            ws_mock().connect.assert_called()
+            self.assertIs(ws_mock().daemon, True)
+
     def test_subscribe_calls_ws(self):
         ws_filter = ['kind', '=', 'arvados#test']
         with self.mocked_client() as client_tuple:
@@ -54,15 +62,20 @@ class EventClientTestCase(unittest.TestCase):
     def test_run_forever_calls_ws(self):
         with self.mocked_client() as client_tuple:
             client, ws_mock = client_tuple
-            client.connect()
             client.run_forever()
             ws_mock().run_forever.assert_called()
 
+    def test_reconnect_rebuilds_and_reconnects_underlying_client(self):
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            client.on_closed()
+            self.assertEqual(2, ws_mock.call_count)
+            self.assertEqual(2, ws_mock().connect.call_count)
+
     def test_close_calls_ws(self):
         with self.mocked_client() as client_tuple:
             client, ws_mock = client_tuple
             ws_mock().close.side_effect = lambda *args: client.on_closed()
-            client.connect()
             client.close()
             ws_mock().close.assert_called()
             # Check on_closed did not try to reconnect.

commit 29bcac8896081c2eda515a79a339e82c48aebaaf
Author: Brett Smith <brett at curoverse.com>
Date:   Fri May 6 17:57:29 2016 -0400

    9135: EventClient.run_forever() delegates to underlying client.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d88897f..8f7ae69 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -86,12 +86,6 @@ class EventClient(object):
         self.is_closed = False
         self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
 
-    def connect(self):
-        self.ec.connect()
-
-    def close_connection(self):
-        self.ec.close_connection()
-
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
         self.ec.subscribe(f, last_log_id)
@@ -129,6 +123,11 @@ class EventClient(object):
                 thread.interrupt_main()
                 return
 
+    def _delegate_to_ec(attr_name):
+        return property(lambda self: getattr(self.ec, attr_name))
+    for _method_name in ['connect', 'close_connection', 'run_forever']:
+        locals()[_method_name] = _delegate_to_ec(_method_name)
+
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 3ec8bf2..6ad8e05 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -51,7 +51,6 @@ class EventClientTestCase(unittest.TestCase):
             client.connect()
             ws_mock().connect.assert_called()
 
-    @unittest.skip("method missing")
     def test_run_forever_calls_ws(self):
         with self.mocked_client() as client_tuple:
             client, ws_mock = client_tuple

commit bdd942dadfc6bb9530f99d9c9bdcf9a3be50ee3f
Author: Brett Smith <brett at curoverse.com>
Date:   Thu May 5 16:55:31 2016 -0400

    9135: Add basic tests for Python events listeners.
    
    These ensure that both classes have the core methods subscribe,
    unsubscribe, run_forever, and close.

diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
new file mode 100644
index 0000000..3ec8bf2
--- /dev/null
+++ b/sdk/python/tests/test_events.py
@@ -0,0 +1,159 @@
+from __future__ import absolute_import, print_function
+
+import contextlib
+import threading
+import unittest
+
+from . import arvados_testutil
+import arvados.events
+import mock
+
+TEST_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
+
+class EventClientTestCase(unittest.TestCase):
+    def setUp(self):
+        self.recv_events = []
+
+    def callback(self, event):
+        self.recv_events.append(event)
+
+    @contextlib.contextmanager
+    def mocked_client(self, filters=None, callback=None, last_log_id=None):
+        if filters is None:
+            filters = []
+        if callback is None:
+            callback = self.callback
+        # It would be nice to just patch WebSocketClient, but that's not
+        # sufficient to patch methods that _EventClient inherits from it.
+        with mock.patch('arvados.events._EventClient') as ws_mock:
+            yield arvados.events.EventClient(TEST_WS_URL, filters, callback,
+                                             last_log_id), ws_mock
+
+    def test_subscribe_calls_ws(self):
+        ws_filter = ['kind', '=', 'arvados#test']
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            client.subscribe(ws_filter)
+            ws_mock().subscribe.assert_called_with(ws_filter, None)
+
+    def test_unsubscribe_calls_ws(self):
+        ws_filter = ['kind', '=', 'arvados#test']
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            client.subscribe(ws_filter[:])
+            client.unsubscribe(ws_filter[:])
+            ws_mock().unsubscribe.assert_called_with(ws_filter)
+
+    # PollClient doesn't have this method, but we do for historical reasons.
+    def test_connect_calls_ws(self):
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            client.connect()
+            ws_mock().connect.assert_called()
+
+    @unittest.skip("method missing")
+    def test_run_forever_calls_ws(self):
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            client.connect()
+            client.run_forever()
+            ws_mock().run_forever.assert_called()
+
+    def test_close_calls_ws(self):
+        with self.mocked_client() as client_tuple:
+            client, ws_mock = client_tuple
+            ws_mock().close.side_effect = lambda *args: client.on_closed()
+            client.connect()
+            client.close()
+            ws_mock().close.assert_called()
+            # Check on_closed did not try to reconnect.
+            ws_mock().connect.assert_called_once()
+
+
+class PollClientTestCase(unittest.TestCase):
+    class MockLogs(object):
+        def __init__(self):
+            self.logs = []
+            self.lock = threading.Lock()
+
+        def add(self, log):
+            with self.lock:
+                self.logs.append(log)
+
+        def return_list(self, num_retries=None):
+            with self.lock:
+                retval = self.logs
+                self.logs = []
+            return {'items': retval, 'items_available': len(retval)}
+
+
+    def setUp(self):
+        self.logs = self.MockLogs()
+        self.arv = mock.MagicMock(name='arvados.api()')
+        self.arv.logs().list().execute.side_effect = self.logs.return_list
+        self.callback_cond = threading.Condition()
+        self.recv_events = []
+
+    def tearDown(self):
+        if hasattr(self, 'client'):
+            self.client.close(timeout=None)
+
+    def callback(self, event):
+        with self.callback_cond:
+            self.recv_events.append(event)
+            self.callback_cond.notifyAll()
+
+    def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
+        if filters is None:
+            filters = []
+        if callback is None:
+            callback = self.callback
+        self.client = arvados.events.PollClient(
+            self.arv, filters, callback, poll_time, last_log_id)
+
+    def was_filter_used(self, target):
+        return any(target in call[-1].get('filters', [])
+                   for call in self.arv.logs().list.call_args_list)
+
+    def test_callback(self):
+        test_log = {'id': 12345, 'testkey': 'testtext'}
+        self.logs.add({'id': 123})
+        self.build_client(poll_time=.01)
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+            self.logs.add(test_log.copy())
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertIn(test_log, self.recv_events)
+
+    @unittest.skip("method is buggy")
+    def test_subscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe(client_filter[:])
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertTrue(self.was_filter_used(client_filter))
+
+    def test_unsubscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe(client_filter[:])
+        self.client.unsubscribe(client_filter[:])
+        self.client.start()
+        self.client.close(timeout=None)
+        self.assertFalse(self.was_filter_used(client_filter))
+
+    def test_run_forever(self):
+        self.build_client()
+        with self.callback_cond:
+            self.client.start()
+            forever_thread = threading.Thread(target=self.client.run_forever)
+            forever_thread.start()
+            self.callback_cond.wait()
+        self.assertTrue(forever_thread.is_alive())
+        self.client.close()
+        forever_thread.join()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list