[ARVADOS] updated: 758d59141102b29371f44512671b9efe0cf62821
git at public.curoverse.com
git at public.curoverse.com
Wed Oct 15 16:22:34 EDT 2014
Summary of changes:
sdk/python/arvados/commands/ws.py | 19 ++++-----
sdk/python/arvados/events.py | 46 +++++++++++-----------
sdk/python/tests/test_websockets.py | 29 +++++++++++++-
.../controllers/arvados/v1/schema_controller.rb | 4 +-
4 files changed, 62 insertions(+), 36 deletions(-)
via 758d59141102b29371f44512671b9efe0cf62821 (commit)
via 7509b9e08acebd9e28ab2cea7d8b2e383c46859c (commit)
from 55888e63181a0847d3e00344fa9c7c5e747082ae (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 758d59141102b29371f44512671b9efe0cf62821
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 15 16:22:26 2014 -0400
3609: Add test case for PollClient. arvados.event.subscribe() starts a new
polling thread by default so as to provide the same behavior as websockets.
Add documentation strings to ws.py command line parameters.
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index b657b96..c27503c 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -11,16 +11,16 @@ def main(arguments=None):
logger = logging.getLogger('arvados.arv-ws')
parser = argparse.ArgumentParser()
- parser.add_argument('-u', '--uuid', type=str, default="")
- parser.add_argument('-f', '--filters', type=str, default="")
+ parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
+ parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
group = parser.add_argument_group('Polling fallback')
- group.add_argument('--poll-fallback', default=15)
- group.add_argument('--no-poll-fallback', action='store_false', dest='poll_fallback')
+ group.add_argument('--poll-interval', default=15, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
+ group.add_argument('--no-poll', action='store_false', dest='poll_fallback', help="Do not poll if websockets are not available, just fail")
group = parser.add_argument_group('Jobs and Pipelines')
- group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
- group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+ group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs")
+ group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs")
args = parser.parse_args(arguments)
@@ -55,9 +55,6 @@ def main(arguments=None):
if ev['event_type'] in ('stderr', 'stdout'):
sys.stdout.write(ev["properties"]["text"])
elif ev["event_type"] in ("create", "update"):
- #if args.job or ev["object_kind"] == "arvados#pipelineInstance":
- # if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- # ws.close()
if ev["object_kind"] == "arvados#pipelineInstance":
pipeline_jobs = set()
for c in ev["properties"]["new_attributes"]["components"]:
@@ -72,7 +69,7 @@ def main(arguments=None):
print json.dumps(ev)
try:
- ws = subscribe(api, filters, lambda ev: on_message(ev), poll_fallback=args.poll_fallback)
+ ws = subscribe(api, filters, on_message, poll_fallback=args.poll_fallback)
ws.run_forever()
except KeyboardInterrupt:
pass
@@ -80,4 +77,4 @@ def main(arguments=None):
logger.exception('')
finally:
if ws:
- ws.close_connection()
+ ws.close()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index efdde94..489be5a 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -46,29 +46,33 @@ class EventClient(WebSocketClient):
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event, poll_time):
+ super(PollClient, self).__init__()
self.api = api
self.filters = filters
self.on_event = on_event
- items = self.api.logs().list(limit=1, order=json.dumps(["id desc"]), filters=json.dumps(filters)).execute()['items']
+ items = self.api.logs().list(limit=1, order=["id desc"], filters=filters).execute()['items']
if len(items) > 0:
self.id = items[0]["id"]
else:
self.id = 0
self.poll_time = poll_time
self.loop = True
+ self.on_event({'status': 200})
- def run_forever(self):
+ def run(self):
while self.loop:
time.sleep(self.poll_time)
- items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
+ items = self.api.logs().list(order=["id asc"], filters=self.filters+[["id", ">", str(self.id)]]).execute()['items']
for i in items:
self.id = i['id']
self.on_event(i)
- def close_connection(self):
+ def close(self):
self.loop = False
+ self.join()
def subscribe(self, filters):
+ self.on_event({'status': 200})
self.filters += filters
def unsubscribe(self, filters):
@@ -76,23 +80,21 @@ class PollClient(threading.Thread):
def subscribe(api, filters, on_event, poll_fallback=15):
ws = None
- try:
- if 'websocketUrl' in api._rootDesc:
- url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
+ if 'websocketUrl' in api._rootDesc:
+ try:
+ url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], api.api_token)
ws = EventClient(url, filters, on_event)
ws.connect()
- elif poll_fallback:
- _logger.warn("Web sockets not available, falling back to log table polling")
- ws = PollClient(api, filters, on_event, poll_fallback)
- else:
- _logger.error("Web sockets not available")
- return None
- return ws
- except Exception:
- if ws:
- ws.close_connection()
- if poll_fallback:
- return PollClient(api, filters, on_event, poll_fallback)
- else:
- _logger.error("Web sockets not available at %s" % api._rootDesc['websocketUrl'])
- raise
+ return ws
+ except Exception as e:
+ _logger.warn("Got exception %s trying to connect to web sockets" % e)
+ if ws:
+ ws.close()
+ if poll_fallback:
+ _logger.warn("Web sockets not available, falling back to log table polling")
+ p = PollClient(api, filters, on_event, poll_fallback)
+ p.start()
+ return p
+ else:
+ _logger.error("Web sockets not available")
+ return None
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index 1dae978..83b95b8 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -22,7 +22,34 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
run_test_server.authorize_with("admin")
api = arvados.api('v1', cache=False)
- arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev))
+ ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev))
time.sleep(1)
self.h = api.humans().create(body={}).execute()
+ time.sleep(2)
+ self.assertEqual(3, self.state)
+ ws.close()
+
+class PollClientTest(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
+
+ def on_event(self, ev):
+ if self.state == 1:
+ self.assertEqual(200, ev['status'])
+ self.state = 2
+ elif self.state == 2:
+ self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
+ self.state = 3
+ elif self.state == 3:
+ self.fail()
+
+ def runTest(self):
+ self.state = 1
+
+ run_test_server.authorize_with("admin")
+ api = arvados.api('v1', cache=False)
+ ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], lambda ev: self.on_event(ev), poll_fallback=2)
time.sleep(1)
+ self.h = api.humans().create(body={}).execute()
+ time.sleep(5)
+ self.assertEqual(3, self.state)
+ ws.close()
commit 7509b9e08acebd9e28ab2cea7d8b2e383c46859c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 15 16:20:04 2014 -0400
3609: Fix schema so 'order' parameter of 'list' is an array instead of a string.
diff --git a/services/api/app/controllers/arvados/v1/schema_controller.rb b/services/api/app/controllers/arvados/v1/schema_controller.rb
index c5b2bcf..eef8e65 100644
--- a/services/api/app/controllers/arvados/v1/schema_controller.rb
+++ b/services/api/app/controllers/arvados/v1/schema_controller.rb
@@ -230,8 +230,8 @@ class Arvados::V1::SchemaController < ApplicationController
location: "query"
},
order: {
- type: "string",
- description: "Order in which to return matching #{k.to_s.underscore.pluralize}.",
+ type: "array",
+ description: "Fields to use to determine order for returning #{k.to_s.underscore.pluralize} object matches.",
location: "query"
},
select: {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list