[ARVADOS] created: c12d9a869449778a616c116408c4798563c16360

git at public.curoverse.com git at public.curoverse.com
Thu Oct 16 15:03:54 EDT 2014


        at  c12d9a869449778a616c116408c4798563c16360 (commit)


commit c12d9a869449778a616c116408c4798563c16360
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 16 14:59:18 2014 -0400

    3609: Back out setting api_version, api_host, and api_insecure on api client
    and instead document that caller to arvados.events.subscribe() should pass in a
    freshly created api client.  Add event variable to indicate when subscription
    is a success instead of sleeping.  Removed unused "import time".  Fixed "web
    sockets" -> "websockets".

diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index a8a74da..cb716f1 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -154,10 +154,7 @@ def api(version=None, cache=True, host=None, token=None, insecure=False, **kwarg
     kwargs['http'] = credentials.authorize(kwargs['http'])
 
     svc = apiclient.discovery.build('arvados', version, **kwargs)
-    svc.api_version = version
-    svc.api_host = host
     svc.api_token = token
-    svc.api_insecure = insecure
     kwargs['http'].cache = None
     if cache:
         conncache[connprofile] = svc
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index d399eb5..674daad 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -5,7 +5,6 @@ import logging
 import argparse
 import arvados
 import json
-import time
 from arvados.events import subscribe
 import signal
 
@@ -78,7 +77,7 @@ def main(arguments=None):
             print json.dumps(ev)
 
     try:
-        ws = subscribe(api, filters, on_message, poll_fallback=args.poll_interval)
+        ws = subscribe(arvados.api('v1', cache=False), filters, on_message, poll_fallback=args.poll_interval)
         if ws:
             if args.pipeline:
                 c = api.pipeline_instances().get(uuid=args.pipeline).execute()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 4570cc0..b18d3b0 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -67,7 +67,7 @@ class PollClient(threading.Thread):
         self.on_event({'status': 200})
 
         while not self.stop.isSet():
-            max_id = 0
+            max_id = self.id
             for f in self.filters:
                 items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()['items']
                 for i in items:
@@ -76,6 +76,8 @@ class PollClient(threading.Thread):
                     self.on_event(i)
             self.id = max_id
             self.stop.wait(self.poll_time)
+    except Exception as e:
+        _logger.exception(e)
 
     def close(self):
         self.stop.set()
@@ -88,7 +90,14 @@ class PollClient(threading.Thread):
     def unsubscribe(self, filters):
         del self.filters[self.filters.index(filters)]
 
+
 def subscribe(api, filters, on_event, poll_fallback=15):
+    '''
+    api: Must be a newly created from arvados.api(cache=False), not shared with the caller, as it may be used by a background thread.
+    filters: Initial subscription filters.
+    on_event: The callback when a message is received
+    poll_fallback: If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
+    '''
     ws = None
     if 'websocketUrl' in api._rootDesc:
         try:
@@ -97,12 +106,11 @@ def subscribe(api, filters, on_event, poll_fallback=15):
             ws.connect()
             return ws
         except Exception as e:
-            _logger.warn("Got exception %s trying to connect to web sockets at %s" % (e, api._rootDesc['websocketUrl']))
+            _logger.warn("Got exception %s trying to connect to websockets at %s" % (e, api._rootDesc['websocketUrl']))
             if ws:
                 ws.close_connection()
     if poll_fallback:
         _logger.warn("Websockets not available, falling back to log table polling")
-        api = arvados.api(version=api.api_version, cache=False, host=api.api_host, token=api.api_token, insecure=api.api_insecure)
         p = PollClient(api, filters, on_event, poll_fallback)
         p.start()
         return p
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index fe065e1..f6074ad 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -5,13 +5,12 @@ import arvados.events
 import time
 import threading
 
-class WebsocketTest(run_test_server.TestCaseWithServers):
-    MAIN_SERVER = {'websockets': True}
-
+class EventTestBase(object):
     def on_event(self, ev):
         if self.state == 1:
             self.assertEqual(200, ev['status'])
             self.state = 2
+            self.subscribed.set()
         elif self.state == 2:
             self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
             self.state = 3
@@ -21,40 +20,32 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
     def runTest(self):
         self.state = 1
+        self.subscribed = threading.Event()
         self.done = threading.Event()
 
         run_test_server.authorize_with("admin")
         api = arvados.api('v1', cache=False)
-        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], self.on_event)
-        time.sleep(1)
+        self.ws = arvados.events.subscribe(arvados.api('v1', cache=False), [['object_uuid', 'is_a', 'arvados#human']], self.on_event, poll_fallback=2)
+        if not isinstance(self.ws, self.WS_TYPE):
+            self.fail()
+        self.subscribed.wait(10)
         self.h = api.humans().create(body={}).execute()
         self.done.wait(10)
         self.assertEqual(3, self.state)
-        ws.close()
 
-class PollClientTest(run_test_server.TestCaseWithServers):
-    MAIN_SERVER = {}
+class WebsocketTest(run_test_server.TestCaseWithServers, EventTestBase):
+    MAIN_SERVER = {'websockets': True}
+    WS_TYPE = arvados.events.EventClient
 
-    def on_event(self, ev):
-        if self.state == 1:
-            self.assertEqual(200, ev['status'])
-            self.state = 2
-        elif self.state == 2:
-            self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
-            self.state = 3
-            self.done.set()
-        elif self.state == 3:
-            self.fail()
+    def tearDown(self):
+        self.ws.close()
+        super(run_test_server.TestCaseWithServers, self).tearDown()
 
-    def runTest(self):
-        self.state = 1
-        self.done = threading.Event()
 
-        run_test_server.authorize_with("admin")
-        api = arvados.api('v1', cache=False)
-        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], self.on_event, poll_fallback=2)
-        time.sleep(1)
-        self.h = api.humans().create(body={}).execute()
-        self.done.wait(10)
-        self.assertEqual(3, self.state)
-        ws.close()
+class PollClientTest(run_test_server.TestCaseWithServers, EventTestBase):
+    MAIN_SERVER = {}
+    WS_TYPE = arvados.events.PollClient
+
+    def tearDown(self):
+        self.ws.close()
+        super(run_test_server.TestCaseWithServers, self).tearDown()

commit 23796191a7efdae462ee7509b4641be4d63d584d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 16 13:20:12 2014 -0400

    3609: import signal; add_mutually_exclusive_group doesn't take any arguments.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 4ac870e..d399eb5 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -7,6 +7,7 @@ import arvados
 import json
 import time
 from arvados.events import subscribe
+import signal
 
 def main(arguments=None):
     logger = logging.getLogger('arvados.arv-ws')
@@ -15,11 +16,11 @@ def main(arguments=None):
     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
 
-    group = parser.add_mutually_exclusive_group('Polling fallback')
+    group = parser.add_mutually_exclusive_group()
     group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
     group.add_argument('--no-poll', action='store_false', dest='poll_interval', help="Do not poll if websockets are not available, just fail")
 
-    group = parser.add_mutually_exclusive_group('Jobs and Pipelines')
+    group = parser.add_mutually_exclusive_group()
     group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs")
     group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs")
 

commit 723f0c8477dbd1d6a5e90a29dd3003c9b00d04aa
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 16 12:23:41 2014 -0400

    3609: Add additional api_ fields to api client object so that subscribe() can
    create a new api client.  Change test to use event variable instead of
    sleeping.  Fix "Web sockets" -> "Websockets".  Make some command line options
    mutually exclusive.

diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index cb716f1..a8a74da 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -154,7 +154,10 @@ def api(version=None, cache=True, host=None, token=None, insecure=False, **kwarg
     kwargs['http'] = credentials.authorize(kwargs['http'])
 
     svc = apiclient.discovery.build('arvados', version, **kwargs)
+    svc.api_version = version
+    svc.api_host = host
     svc.api_token = token
+    svc.api_insecure = insecure
     kwargs['http'].cache = None
     if cache:
         conncache[connprofile] = svc
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 457880e..4ac870e 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -15,11 +15,11 @@ def main(arguments=None):
     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
 
-    group = parser.add_argument_group('Polling fallback')
+    group = parser.add_mutually_exclusive_group('Polling fallback')
     group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
     group.add_argument('--no-poll', action='store_false', dest='poll_interval', help="Do not poll if websockets are not available, just fail")
 
-    group = parser.add_argument_group('Jobs and Pipelines')
+    group = parser.add_mutually_exclusive_group('Jobs and Pipelines')
     group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs")
     group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs")
 
@@ -84,7 +84,7 @@ def main(arguments=None):
                 update_subscribed_components(c["components"])
 
             while True:
-                time.sleep(60)
+                signal.pause()
     except KeyboardInterrupt:
         pass
     except Exception as e:
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index df64b4e..4570cc0 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -7,6 +7,7 @@ import ssl
 import re
 import config
 import logging
+import arvados
 
 _logger = logging.getLogger('arvados.events')
 
@@ -48,18 +49,23 @@ class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time):
         super(PollClient, self).__init__()
         self.api = api
-        self.filters = [filters]
+        if filters:
+            self.filters = [filters]
+        else:
+            self.filters = []
         self.on_event = on_event
-        items = self.api.logs().list(limit=1, order="id desc", filters=filters).execute()['items']
+        self.poll_time = poll_time
+        self.stop = threading.Event()
+
+    def run(self):
+        items = self.api.logs().list(limit=1, order="id desc", filters=self.filters[0]).execute()['items']
         if len(items) > 0:
             self.id = items[0]["id"]
         else:
             self.id = 0
-        self.poll_time = poll_time
-        self.stop = threading.Event()
+
         self.on_event({'status': 200})
 
-    def run(self):
         while not self.stop.isSet():
             max_id = 0
             for f in self.filters:
@@ -95,10 +101,11 @@ def subscribe(api, filters, on_event, poll_fallback=15):
             if ws:
                 ws.close_connection()
     if poll_fallback:
-        _logger.warn("Web sockets not available, falling back to log table polling")
+        _logger.warn("Websockets not available, falling back to log table polling")
+        api = arvados.api(version=api.api_version, cache=False, host=api.api_host, token=api.api_token, insecure=api.api_insecure)
         p = PollClient(api, filters, on_event, poll_fallback)
         p.start()
         return p
     else:
-        _logger.error("Web sockets not available")
+        _logger.error("Websockets not available")
         return None
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index 83b95b8..fe065e1 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -3,6 +3,7 @@ import unittest
 import arvados
 import arvados.events
 import time
+import threading
 
 class WebsocketTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {'websockets': True}
@@ -14,18 +15,20 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         elif self.state == 2:
             self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
             self.state = 3
+            self.done.set()
         elif self.state == 3:
             self.fail()
 
     def runTest(self):
         self.state = 1
+        self.done = threading.Event()
 
         run_test_server.authorize_with("admin")
         api = arvados.api('v1', cache=False)
-        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev))
+        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], self.on_event)
         time.sleep(1)
         self.h = api.humans().create(body={}).execute()
-        time.sleep(2)
+        self.done.wait(10)
         self.assertEqual(3, self.state)
         ws.close()
 
@@ -39,17 +42,19 @@ class PollClientTest(run_test_server.TestCaseWithServers):
         elif self.state == 2:
             self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
             self.state = 3
+            self.done.set()
         elif self.state == 3:
             self.fail()
 
     def runTest(self):
         self.state = 1
+        self.done = threading.Event()
 
         run_test_server.authorize_with("admin")
         api = arvados.api('v1', cache=False)
-        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev), poll_fallback=2)
+        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], self.on_event, poll_fallback=2)
         time.sleep(1)
         self.h = api.humans().create(body={}).execute()
-        time.sleep(5)
+        self.done.wait(10)
         self.assertEqual(3, self.state)
         ws.close()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list