[ARVADOS] updated: 758d59141102b29371f44512671b9efe0cf62821

git at public.curoverse.com git at public.curoverse.com
Wed Oct 15 16:22:34 EDT 2014


Summary of changes:
 sdk/python/arvados/commands/ws.py                  | 19 ++++-----
 sdk/python/arvados/events.py                       | 46 +++++++++++-----------
 sdk/python/tests/test_websockets.py                | 29 +++++++++++++-
 .../controllers/arvados/v1/schema_controller.rb    |  4 +-
 4 files changed, 62 insertions(+), 36 deletions(-)

       via  758d59141102b29371f44512671b9efe0cf62821 (commit)
       via  7509b9e08acebd9e28ab2cea7d8b2e383c46859c (commit)
      from  55888e63181a0847d3e00344fa9c7c5e747082ae (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 758d59141102b29371f44512671b9efe0cf62821
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 15 16:22:26 2014 -0400

    3609: Add test case for PollClient.  arvados.event.subscribe() starts a new
    polling thread by default so as to provide the same behavior as websockets.
    Add documentation strings to ws.py command line parameters.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index b657b96..c27503c 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -11,16 +11,16 @@ def main(arguments=None):
     logger = logging.getLogger('arvados.arv-ws')
 
     parser = argparse.ArgumentParser()
-    parser.add_argument('-u', '--uuid', type=str, default="")
-    parser.add_argument('-f', '--filters', type=str, default="")
+    parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
+    parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
 
     group = parser.add_argument_group('Polling fallback')
-    group.add_argument('--poll-fallback', default=15)
-    group.add_argument('--no-poll-fallback', action='store_false', dest='poll_fallback')
+    group.add_argument('--poll-interval', default=15, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
+    group.add_argument('--no-poll', action='store_false', dest='poll_fallback', help="Do not poll if websockets are not available, just fail")
 
     group = parser.add_argument_group('Jobs and Pipelines')
-    group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
-    group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+    group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs")
+    group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs")
 
     args = parser.parse_args(arguments)
 
@@ -55,9 +55,6 @@ def main(arguments=None):
             if ev['event_type'] in ('stderr', 'stdout'):
                 sys.stdout.write(ev["properties"]["text"])
             elif ev["event_type"] in ("create", "update"):
-                #if args.job or ev["object_kind"] == "arvados#pipelineInstance":
-                #    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                #        ws.close()
                 if ev["object_kind"] == "arvados#pipelineInstance":
                     pipeline_jobs = set()
                     for c in ev["properties"]["new_attributes"]["components"]:
@@ -72,7 +69,7 @@ def main(arguments=None):
             print json.dumps(ev)
 
     try:
-        ws = subscribe(api, filters, lambda ev: on_message(ev), poll_fallback=args.poll_fallback)
+        ws = subscribe(api, filters, on_message, poll_fallback=args.poll_fallback)
         ws.run_forever()
     except KeyboardInterrupt:
         pass
@@ -80,4 +77,4 @@ def main(arguments=None):
         logger.exception('')
     finally:
         if ws:
-            ws.close_connection()
+            ws.close()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index efdde94..489be5a 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -46,29 +46,33 @@ class EventClient(WebSocketClient):
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time):
+        super(PollClient, self).__init__()
         self.api = api
         self.filters = filters
         self.on_event = on_event
-        items = self.api.logs().list(limit=1, order=json.dumps(["id desc"]), filters=json.dumps(filters)).execute()['items']
+        items = self.api.logs().list(limit=1, order=["id desc"], filters=filters).execute()['items']
         if len(items) > 0:
             self.id = items[0]["id"]
         else:
             self.id = 0
         self.poll_time = poll_time
         self.loop = True
+        self.on_event({'status': 200})
 
-    def run_forever(self):
+    def run(self):
         while self.loop:
             time.sleep(self.poll_time)
-            items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
+            items = self.api.logs().list(order=["id asc"], filters=self.filters+[["id", ">", str(self.id)]]).execute()['items']
             for i in items:
                 self.id = i['id']
                 self.on_event(i)
 
-    def close_connection(self):
+    def close(self):
         self.loop = False
+        self.join()
 
     def subscribe(self, filters):
+        self.on_event({'status': 200})
         self.filters += filters
 
     def unsubscribe(self, filters):
@@ -76,23 +80,21 @@ class PollClient(threading.Thread):
 
 def subscribe(api, filters, on_event, poll_fallback=15):
     ws = None
-    try:
-        if 'websocketUrl' in api._rootDesc:
-            url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
+    if 'websocketUrl' in api._rootDesc:
+        try:
+            url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], api.api_token)
             ws = EventClient(url, filters, on_event)
             ws.connect()
-        elif poll_fallback:
-            _logger.warn("Web sockets not available, falling back to log table polling")
-            ws = PollClient(api, filters, on_event, poll_fallback)
-        else:
-            _logger.error("Web sockets not available")
-            return None
-        return ws
-    except Exception:
-        if ws:
-            ws.close_connection()
-        if poll_fallback:
-            return PollClient(api, filters, on_event, poll_fallback)
-        else:
-            _logger.error("Web sockets not available at %s" % api._rootDesc['websocketUrl'])
-            raise
+            return ws
+        except Exception as e:
+            _logger.warn("Got exception %s trying to connect to web sockets" % e)
+            if ws:
+                ws.close()
+    if poll_fallback:
+        _logger.warn("Web sockets not available, falling back to log table polling")
+        p = PollClient(api, filters, on_event, poll_fallback)
+        p.start()
+        return p
+    else:
+        _logger.error("Web sockets not available")
+        return None
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index 1dae978..83b95b8 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -22,7 +22,34 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
         run_test_server.authorize_with("admin")
         api = arvados.api('v1', cache=False)
-        arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev))
+        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev))
         time.sleep(1)
         self.h = api.humans().create(body={}).execute()
+        time.sleep(2)
+        self.assertEqual(3, self.state)
+        ws.close()
+
+class PollClientTest(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
+
+    def on_event(self, ev):
+        if self.state == 1:
+            self.assertEqual(200, ev['status'])
+            self.state = 2
+        elif self.state == 2:
+            self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
+            self.state = 3
+        elif self.state == 3:
+            self.fail()
+
+    def runTest(self):
+        self.state = 1
+
+        run_test_server.authorize_with("admin")
+        api = arvados.api('v1', cache=False)
+        ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev), poll_fallback=2)
         time.sleep(1)
+        self.h = api.humans().create(body={}).execute()
+        time.sleep(5)
+        self.assertEqual(3, self.state)
+        ws.close()

commit 7509b9e08acebd9e28ab2cea7d8b2e383c46859c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 15 16:20:04 2014 -0400

    3609: Fix schema so 'order' parameter of 'list' is an array instead of a string.

diff --git a/services/api/app/controllers/arvados/v1/schema_controller.rb b/services/api/app/controllers/arvados/v1/schema_controller.rb
index c5b2bcf..eef8e65 100644
--- a/services/api/app/controllers/arvados/v1/schema_controller.rb
+++ b/services/api/app/controllers/arvados/v1/schema_controller.rb
@@ -230,8 +230,8 @@ class Arvados::V1::SchemaController < ApplicationController
                   location: "query"
                 },
                 order: {
-                  type: "string",
-                  description: "Order in which to return matching #{k.to_s.underscore.pluralize}.",
+                  type: "array",
+                  description: "Fields to use to determine order for returning #{k.to_s.underscore.pluralize} object matches.",
                   location: "query"
                 },
                 select: {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list