[ARVADOS] updated: 7f7ed63314bec77bf93edc233514c50a1030cac4

Git user git at public.curoverse.com
Fri May 13 10:56:00 EDT 2016


Summary of changes:
 sdk/python/arvados/events.py                       |  46 +++----
 .../tests/{test_websockets.py => test_events.py}   | 143 +++++++++++++++++++--
 2 files changed, 158 insertions(+), 31 deletions(-)
 rename sdk/python/tests/{test_websockets.py => test_events.py} (65%)

       via  7f7ed63314bec77bf93edc233514c50a1030cac4 (commit)
       via  839a409f51114316c0683fbd00d8174fc24b4a9f (commit)
       via  3dd2a1957ae4106bfc2bd5405662c47c087eb79c (commit)
       via  ec38d4ae1a6597e1f35d5cd0b25f6384666c55be (commit)
       via  c60995ff0471ce31f0858d1f19a6788014112438 (commit)
      from  a7f30c913114a5e22d5e37b3fba67332f56ce6d9 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7f7ed63314bec77bf93edc233514c50a1030cac4
Merge: a7f30c9 839a409
Author: Brett Smith <brett at curoverse.com>
Date:   Fri May 13 10:55:31 2016 -0400

    Merge branch '9135-eventclient-run-forever-wip'
    
    Closes #9135, #9157.


commit 839a409f51114316c0683fbd00d8174fc24b4a9f
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:54:23 2016 -0400

    9135: Bring EventClient's public interface closer to PollClient's.
    
    * Restore the run_forever method, which was previously inherited from
      WebSocketClient.
    * Remove the connect and close_connection methods, which are
      WebSocketClient implementation details that don't make sense as part
      of the public interface.  (A running EventClient will just reconnect
      if you call close_connection on it.)

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 68cfa21..81a9b36 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -83,7 +83,7 @@ class EventClient(object):
             self.filters = [[]]
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
-        self.is_closed = False
+        self.is_closed = threading.Event()
         self._setup_event_client()
 
     def _setup_event_client(self):
@@ -96,12 +96,6 @@ class EventClient(object):
             self.ec.close_connection()
             raise
 
-    def connect(self):
-        self.ec.connect()
-
-    def close_connection(self):
-        self.ec.close_connection()
-
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
         self.ec.subscribe(f, last_log_id)
@@ -111,7 +105,7 @@ class EventClient(object):
         self.ec.unsubscribe(f)
 
     def close(self, code=1000, reason='', timeout=0):
-        self.is_closed = True
+        self.is_closed.set()
         self.ec.close(code, reason, timeout)
 
     def on_event(self, m):
@@ -124,7 +118,7 @@ class EventClient(object):
             thread.interrupt_main()
 
     def on_closed(self):
-        if self.is_closed == False:
+        if not self.is_closed.is_set():
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
@@ -134,10 +128,15 @@ class EventClient(object):
                     _logger.warn("Error '%s' during websocket reconnect.", e)
             if tries_left == 0:
                 _logger.exception("EventClient thread could not contact websocket server.")
-                self.is_closed = True
+                self.is_closed.set()
                 thread.interrupt_main()
                 return
 
+    def run_forever(self):
+        # Have to poll here to let KeyboardInterrupt get raised.
+        while not self.is_closed.wait(1):
+            pass
+
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 7b69fa2..f2cdba2 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -176,7 +176,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         # close (im)properly
         if close_unexpected:
-            self.ws.close_connection()
+            self.ws.ec.close_connection()
         else:
             self.ws.close()
 
@@ -260,13 +260,28 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         client.unsubscribe(filters[:])
         websocket_client().unsubscribe.assert_called_with(filters)
 
-    @unittest.expectedFailure
     @mock.patch('arvados.events._EventClient')
-    def test_run_forever(self, websocket_client):
+    def test_run_forever_survives_reconnects(self, websocket_client):
+        connection_cond = threading.Condition()
+        def ws_connect():
+            with connection_cond:
+                connection_cond.notify_all()
+        websocket_client().connect.side_effect = ws_connect
         client = arvados.events.EventClient(
             self.MOCK_WS_URL, [], lambda event: None, None)
-        client.run_forever()
-        websocket_client().run_forever.assert_called_with()
+        with connection_cond:
+            forever_thread = threading.Thread(target=client.run_forever)
+            forever_thread.start()
+            # Simulate an unexpected disconnect, and wait for reconnect.
+            close_thread = threading.Thread(target=client.on_closed)
+            close_thread.start()
+            connection_cond.wait()
+        close_thread.join()
+        run_forever_alive = forever_thread.is_alive()
+        client.close()
+        forever_thread.join()
+        self.assertTrue(run_forever_alive)
+        self.assertEqual(2, websocket_client().connect.call_count)
 
 
 class PollClientTestCase(unittest.TestCase):

commit 3dd2a1957ae4106bfc2bd5405662c47c087eb79c
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:57:42 2016 -0400

    9135: Make EventClient initialization more consistent.
    
    * DRY up the setup code.  This includes always trying to close the
      conenction after failure, since we were doing that in the initial
      connection.
    * Make the client a daemon thread, for consistency with PollClient.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d88897f..68cfa21 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -84,7 +84,17 @@ class EventClient(object):
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
         self.is_closed = False
-        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+        self._setup_event_client()
+
+    def _setup_event_client(self):
+        self.ec = _EventClient(self.url, self.filters, self.on_event,
+                               self.last_log_id, self.on_closed)
+        self.ec.daemon = True
+        try:
+            self.ec.connect()
+        except Exception:
+            self.ec.close_connection()
+            raise
 
     def connect(self):
         self.ec.connect()
@@ -118,8 +128,7 @@ class EventClient(object):
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
-                    self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-                    self.ec.connect()
+                    self._setup_event_client()
                     break
                 except Exception as e:
                     _logger.warn("Error '%s' during websocket reconnect.", e)
@@ -250,20 +259,14 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     if not endpoint:
         raise errors.FeatureNotEnabledError(
             "Server does not advertise a websocket endpoint")
+    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
     try:
-        uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
-        ok = False
-        try:
-            client.connect()
-            ok = True
-            return client
-        finally:
-            if not ok:
-                client.close_connection()
-    except:
+    except Exception:
         _logger.warn("Failed to connect to websockets on %s" % endpoint)
         raise
+    else:
+        return client
 
 
 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):

commit ec38d4ae1a6597e1f35d5cd0b25f6384666c55be
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:40:10 2016 -0400

    9135: Clean imports in test_events.

diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index c637efe..7b69fa2 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -1,14 +1,9 @@
 import arvados
-import arvados.events
-import arvados.errors
-from datetime import datetime, timedelta, tzinfo
+import io
 import logging
-import logging.handlers
 import mock
 import Queue
 import run_test_server
-import StringIO
-import tempfile
 import threading
 import time
 import unittest
@@ -156,7 +151,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         run_test_server.authorize_with('active')
         events = Queue.Queue(100)
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)
@@ -224,7 +219,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     def test_websocket_reconnect_retry(self, event_client_connect):
         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)

commit c60995ff0471ce31f0858d1f19a6788014112438
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:18:28 2016 -0400

    9135: Add basic tests for Python events listeners.
    
    These ensure that both classes have the core methods subscribe,
    unsubscribe, run_forever, and close.
    
    Rename the test file to test_events, to better match other test
    patterns, and account for the fact it tests both classes in the
    module.

diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_events.py
similarity index 69%
rename from sdk/python/tests/test_websockets.py
rename to sdk/python/tests/test_events.py
index d122a1c..c637efe 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_events.py
@@ -13,11 +13,14 @@ import threading
 import time
 import unittest
 
+import arvados_testutil
+
 class WebsocketTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
 
     TIME_PAST = time.time()-3600
     TIME_FUTURE = time.time()+3600
+    MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
 
     def setUp(self):
         self.ws = None
@@ -245,3 +248,115 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
         self.assertNotEqual(found, -1)
         rootLogger.removeHandler(streamHandler)
+
+    @mock.patch('arvados.events._EventClient')
+    def test_subscribe_method(self, websocket_client):
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, [], lambda event: None, None)
+        client.subscribe(filters[:], 99)
+        websocket_client().subscribe.assert_called_with(filters, 99)
+
+    @mock.patch('arvados.events._EventClient')
+    def test_unsubscribe(self, websocket_client):
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, filters[:], lambda event: None, None)
+        client.unsubscribe(filters[:])
+        websocket_client().unsubscribe.assert_called_with(filters)
+
+    @unittest.expectedFailure
+    @mock.patch('arvados.events._EventClient')
+    def test_run_forever(self, websocket_client):
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, [], lambda event: None, None)
+        client.run_forever()
+        websocket_client().run_forever.assert_called_with()
+
+
+class PollClientTestCase(unittest.TestCase):
+    class MockLogs(object):
+        def __init__(self):
+            self.logs = []
+            self.lock = threading.Lock()
+
+        def add(self, log):
+            with self.lock:
+                self.logs.append(log)
+
+        def return_list(self, num_retries=None):
+            with self.lock:
+                retval = self.logs
+                self.logs = []
+            return {'items': retval, 'items_available': len(retval)}
+
+
+    def setUp(self):
+        self.logs = self.MockLogs()
+        self.arv = mock.MagicMock(name='arvados.api()')
+        self.arv.logs().list().execute.side_effect = self.logs.return_list
+        self.callback_cond = threading.Condition()
+        self.recv_events = []
+
+    def tearDown(self):
+        if hasattr(self, 'client'):
+            self.client.close(timeout=None)
+
+    def callback(self, event):
+        with self.callback_cond:
+            self.recv_events.append(event)
+            self.callback_cond.notify_all()
+
+    def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
+        if filters is None:
+            filters = []
+        if callback is None:
+            callback = self.callback
+        self.client = arvados.events.PollClient(
+            self.arv, filters, callback, poll_time, last_log_id)
+
+    def was_filter_used(self, target):
+        return any(target in call[-1].get('filters', [])
+                   for call in self.arv.logs().list.call_args_list)
+
+    def test_callback(self):
+        test_log = {'id': 12345, 'testkey': 'testtext'}
+        self.logs.add({'id': 123})
+        self.build_client(poll_time=.01)
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+            self.logs.add(test_log.copy())
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertIn(test_log, self.recv_events)
+
+    def test_subscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe([client_filter[:]])
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertTrue(self.was_filter_used(client_filter))
+
+    def test_unsubscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe([client_filter[:]])
+        self.client.unsubscribe([client_filter[:]])
+        self.client.start()
+        self.client.close(timeout=None)
+        self.assertFalse(self.was_filter_used(client_filter))
+
+    def test_run_forever(self):
+        self.build_client()
+        with self.callback_cond:
+            self.client.start()
+            forever_thread = threading.Thread(target=self.client.run_forever)
+            forever_thread.start()
+            self.callback_cond.wait()
+        self.assertTrue(forever_thread.is_alive())
+        self.client.close()
+        forever_thread.join()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list