[ARVADOS] updated: 48a69b458cb781972a981b1b11430d50a3c3c759

Git user git at public.curoverse.com
Mon May 9 13:19:51 EDT 2016


Summary of changes:
 sdk/python/arvados/events.py        |   4 +-
 sdk/python/tests/test_events.py     | 333 +++++++++++++++++++++++++++---------
 sdk/python/tests/test_websockets.py | 247 --------------------------
 3 files changed, 259 insertions(+), 325 deletions(-)
 delete mode 100644 sdk/python/tests/test_websockets.py

  discards  aae3db0804ac50aaf3e6b731166d934258cbf671 (commit)
  discards  319f99b37bb82f455bf21edfdba3f3aa506c7751 (commit)
  discards  29bcac8896081c2eda515a79a339e82c48aebaaf (commit)
  discards  bdd942dadfc6bb9530f99d9c9bdcf9a3be50ee3f (commit)
       via  48a69b458cb781972a981b1b11430d50a3c3c759 (commit)
       via  4a22f4f44fd95bc3b622ce8a5c01d86f81750619 (commit)
       via  14fd99a4dd62774143e38d4bb79d4732e9bb293d (commit)
       via  14a4652df680db98cfc8c40ed2b981a1fb7dd238 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (aae3db0804ac50aaf3e6b731166d934258cbf671)
            \
             N -- N -- N (48a69b458cb781972a981b1b11430d50a3c3c759)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 48a69b458cb781972a981b1b11430d50a3c3c759
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:57:42 2016 -0400

    9135: Make EventClient initialization more consistent.
    
    * DRY up the setup code.  This includes always trying to close the
      conenction after failure, since we were doing that in the initial
      connection.
    * Make the client a daemon thread, for consistency with PollClient.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 8f7ae69..d72906c 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -84,7 +84,17 @@ class EventClient(object):
         self.on_event_cb = on_event_cb
         self.last_log_id = last_log_id
         self.is_closed = False
-        self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
+        self._setup_event_client()
+
+    def _setup_event_client(self):
+        self.ec = _EventClient(self.url, self.filters, self.on_event,
+                               self.last_log_id, self.on_closed)
+        self.ec.daemon = True
+        try:
+            self.ec.connect()
+        except Exception:
+            self.ec.close_connection()
+            raise
 
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
@@ -112,8 +122,7 @@ class EventClient(object):
             _logger.warn("Unexpected close. Reconnecting.")
             for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                 try:
-                    self.ec = _EventClient(self.url, self.filters, self.on_event, self.last_log_id, self.on_closed)
-                    self.ec.connect()
+                    self._setup_event_client()
                     break
                 except Exception as e:
                     _logger.warn("Error '%s' during websocket reconnect.", e)
@@ -249,20 +258,14 @@ def _subscribe_websocket(api, filters, on_event, last_log_id=None):
     if not endpoint:
         raise errors.FeatureNotEnabledError(
             "Server does not advertise a websocket endpoint")
+    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
     try:
-        uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
         client = EventClient(uri_with_token, filters, on_event, last_log_id)
-        ok = False
-        try:
-            client.connect()
-            ok = True
-            return client
-        finally:
-            if not ok:
-                client.close_connection()
-    except:
+    except Exception:
         _logger.warn("Failed to connect to websockets on %s" % endpoint)
         raise
+    else:
+        return client
 
 
 def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):

commit 4a22f4f44fd95bc3b622ce8a5c01d86f81750619
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:54:23 2016 -0400

    9135: EventClient.run_forever() delegates to the underlying client.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index d88897f..8f7ae69 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -86,12 +86,6 @@ class EventClient(object):
         self.is_closed = False
         self.ec = _EventClient(url, self.filters, self.on_event, last_log_id, self.on_closed)
 
-    def connect(self):
-        self.ec.connect()
-
-    def close_connection(self):
-        self.ec.close_connection()
-
     def subscribe(self, f, last_log_id=None):
         self.filters.append(f)
         self.ec.subscribe(f, last_log_id)
@@ -129,6 +123,11 @@ class EventClient(object):
                 thread.interrupt_main()
                 return
 
+    def _delegate_to_ec(attr_name):
+        return property(lambda self: getattr(self.ec, attr_name))
+    for _method_name in ['connect', 'close_connection', 'run_forever']:
+        locals()[_method_name] = _delegate_to_ec(_method_name)
+
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 1ed9aff..0aed754 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -253,7 +253,6 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         client.unsubscribe(filters[:])
         websocket_client().unsubscribe.assert_called_with(filters)
 
-    @unittest.expectedFailure
     @mock.patch('arvados.events._EventClient')
     def test_run_forever(self, websocket_client):
         client = arvados.events.EventClient(

commit 14fd99a4dd62774143e38d4bb79d4732e9bb293d
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:40:10 2016 -0400

    9135: Clean imports in test_events.

diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index db361dd..1ed9aff 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -1,14 +1,9 @@
 import arvados
-import arvados.events
-import arvados.errors
-from datetime import datetime, timedelta, tzinfo
+import io
 import logging
-import logging.handlers
 import mock
 import Queue
 import run_test_server
-import StringIO
-import tempfile
 import threading
 import time
 import unittest
@@ -156,7 +151,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         run_test_server.authorize_with('active')
         events = Queue.Queue(100)
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)
@@ -224,7 +219,7 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     def test_websocket_reconnect_retry(self, event_client_connect):
         event_client_connect.side_effect = [None, Exception('EventClient.connect error'), None]
 
-        logstream = StringIO.StringIO()
+        logstream = io.BytesIO()
         rootLogger = logging.getLogger()
         streamHandler = logging.StreamHandler(logstream)
         rootLogger.addHandler(streamHandler)

commit 14a4652df680db98cfc8c40ed2b981a1fb7dd238
Author: Brett Smith <brett at curoverse.com>
Date:   Mon May 9 12:18:28 2016 -0400

    9135: Add basic tests for Python events listeners.
    
    These ensure that both classes have the core methods subscribe,
    unsubscribe, run_forever, and close.
    
    Rename the test file to test_events, to better match other test
    patterns, and account for the fact it tests both classes in the
    module.

diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_events.py
similarity index 71%
rename from sdk/python/tests/test_websockets.py
rename to sdk/python/tests/test_events.py
index d122a1c..db361dd 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_events.py
@@ -13,11 +13,14 @@ import threading
 import time
 import unittest
 
+import arvados_testutil
+
 class WebsocketTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
 
     TIME_PAST = time.time()-3600
     TIME_FUTURE = time.time()+3600
+    MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
 
     def setUp(self):
         self.ws = None
@@ -245,3 +248,108 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         found = log_messages.find("Error 'EventClient.connect error' during websocket reconnect.")
         self.assertNotEqual(found, -1)
         rootLogger.removeHandler(streamHandler)
+
+    @mock.patch('arvados.events._EventClient')
+    def test_unsubscribe(self, websocket_client):
+        events = Queue.Queue(10)
+        filters = [['object_uuid', 'is_a', 'arvados#human']]
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, filters[:], events.put_nowait, None)
+        client.unsubscribe(filters[:])
+        websocket_client().unsubscribe.assert_called_with(filters)
+
+    @unittest.expectedFailure
+    @mock.patch('arvados.events._EventClient')
+    def test_run_forever(self, websocket_client):
+        client = arvados.events.EventClient(
+            self.MOCK_WS_URL, [], lambda event: None, None)
+        client.run_forever()
+        websocket_client().run_forever.assert_called_with()
+
+
+class PollClientTestCase(unittest.TestCase):
+    class MockLogs(object):
+        def __init__(self):
+            self.logs = []
+            self.lock = threading.Lock()
+
+        def add(self, log):
+            with self.lock:
+                self.logs.append(log)
+
+        def return_list(self, num_retries=None):
+            with self.lock:
+                retval = self.logs
+                self.logs = []
+            return {'items': retval, 'items_available': len(retval)}
+
+
+    def setUp(self):
+        self.logs = self.MockLogs()
+        self.arv = mock.MagicMock(name='arvados.api()')
+        self.arv.logs().list().execute.side_effect = self.logs.return_list
+        self.callback_cond = threading.Condition()
+        self.recv_events = []
+
+    def tearDown(self):
+        if hasattr(self, 'client'):
+            self.client.close(timeout=None)
+
+    def callback(self, event):
+        with self.callback_cond:
+            self.recv_events.append(event)
+            self.callback_cond.notifyAll()
+
+    def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
+        if filters is None:
+            filters = []
+        if callback is None:
+            callback = self.callback
+        self.client = arvados.events.PollClient(
+            self.arv, filters, callback, poll_time, last_log_id)
+
+    def was_filter_used(self, target):
+        return any(target in call[-1].get('filters', [])
+                   for call in self.arv.logs().list.call_args_list)
+
+    def test_callback(self):
+        test_log = {'id': 12345, 'testkey': 'testtext'}
+        self.logs.add({'id': 123})
+        self.build_client(poll_time=.01)
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+            self.logs.add(test_log.copy())
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertIn(test_log, self.recv_events)
+
+    def test_subscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe([client_filter[:]])
+        with self.callback_cond:
+            self.client.start()
+            self.callback_cond.wait()
+        self.client.close(timeout=None)
+        self.assertTrue(self.was_filter_used(client_filter))
+
+    def test_unsubscribe(self):
+        client_filter = ['kind', '=', 'arvados#test']
+        self.build_client()
+        self.client.subscribe([client_filter[:]])
+        self.client.unsubscribe([client_filter[:]])
+        self.client.start()
+        self.client.close(timeout=None)
+        self.assertFalse(self.was_filter_used(client_filter))
+
+    def test_run_forever(self):
+        self.build_client()
+        with self.callback_cond:
+            self.client.start()
+            forever_thread = threading.Thread(target=self.client.run_forever)
+            forever_thread.start()
+            self.callback_cond.wait()
+        self.assertTrue(forever_thread.is_alive())
+        self.client.close()
+        forever_thread.join()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list