[ARVADOS] updated: f6605cb2e6c189419bb3bfff1189d62f57c017eb
git at public.curoverse.com
git at public.curoverse.com
Thu Oct 16 14:59:24 EDT 2014
Summary of changes:
sdk/python/arvados/api.py | 3 ---
sdk/python/arvados/commands/ws.py | 3 +--
sdk/python/arvados/events.py | 14 ++++++++---
sdk/python/tests/test_websockets.py | 49 +++++++++++++++----------------------
4 files changed, 32 insertions(+), 37 deletions(-)
via f6605cb2e6c189419bb3bfff1189d62f57c017eb (commit)
from 23796191a7efdae462ee7509b4641be4d63d584d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f6605cb2e6c189419bb3bfff1189d62f57c017eb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Oct 16 14:59:18 2014 -0400
3906: Back out setting api_version, api_host, and api_insecure on api client
and instead document that caller to arvados.events.subscribe() should pass in a
freshly created api client. Add event variable to indicate when subscription
is a success instead of sleeping. Removed unused "import time". Fixed "web
sockets" -> "websockets".
diff --git a/sdk/python/arvados/api.py b/sdk/python/arvados/api.py
index a8a74da..cb716f1 100644
--- a/sdk/python/arvados/api.py
+++ b/sdk/python/arvados/api.py
@@ -154,10 +154,7 @@ def api(version=None, cache=True, host=None, token=None, insecure=False, **kwarg
kwargs['http'] = credentials.authorize(kwargs['http'])
svc = apiclient.discovery.build('arvados', version, **kwargs)
- svc.api_version = version
- svc.api_host = host
svc.api_token = token
- svc.api_insecure = insecure
kwargs['http'].cache = None
if cache:
conncache[connprofile] = svc
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index d399eb5..674daad 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -5,7 +5,6 @@ import logging
import argparse
import arvados
import json
-import time
from arvados.events import subscribe
import signal
@@ -78,7 +77,7 @@ def main(arguments=None):
print json.dumps(ev)
try:
- ws = subscribe(api, filters, on_message, poll_fallback=args.poll_interval)
+ ws = subscribe(arvados.api('v1', cache=False), filters, on_message, poll_fallback=args.poll_interval)
if ws:
if args.pipeline:
c = api.pipeline_instances().get(uuid=args.pipeline).execute()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 4570cc0..b18d3b0 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -67,7 +67,7 @@ class PollClient(threading.Thread):
self.on_event({'status': 200})
while not self.stop.isSet():
- max_id = 0
+ max_id = self.id
for f in self.filters:
items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()['items']
for i in items:
@@ -76,6 +76,8 @@ class PollClient(threading.Thread):
self.on_event(i)
self.id = max_id
self.stop.wait(self.poll_time)
+ except Exception as e:
+ _logger.exception(e)
def close(self):
self.stop.set()
@@ -88,7 +90,14 @@ class PollClient(threading.Thread):
def unsubscribe(self, filters):
del self.filters[self.filters.index(filters)]
+
def subscribe(api, filters, on_event, poll_fallback=15):
+ '''
+ api: Must be a newly created from arvados.api(cache=False), not shared with the caller, as it may be used by a background thread.
+ filters: Initial subscription filters.
+ on_event: The callback when a message is received
+ poll_fallback: If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available.
+ '''
ws = None
if 'websocketUrl' in api._rootDesc:
try:
@@ -97,12 +106,11 @@ def subscribe(api, filters, on_event, poll_fallback=15):
ws.connect()
return ws
except Exception as e:
- _logger.warn("Got exception %s trying to connect to web sockets at %s" % (e, api._rootDesc['websocketUrl']))
+ _logger.warn("Got exception %s trying to connect to websockets at %s" % (e, api._rootDesc['websocketUrl']))
if ws:
ws.close_connection()
if poll_fallback:
_logger.warn("Websockets not available, falling back to log table polling")
- api = arvados.api(version=api.api_version, cache=False, host=api.api_host, token=api.api_token, insecure=api.api_insecure)
p = PollClient(api, filters, on_event, poll_fallback)
p.start()
return p
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index fe065e1..f6074ad 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -5,13 +5,12 @@ import arvados.events
import time
import threading
-class WebsocketTest(run_test_server.TestCaseWithServers):
- MAIN_SERVER = {'websockets': True}
-
+class EventTestBase(object):
def on_event(self, ev):
if self.state == 1:
self.assertEqual(200, ev['status'])
self.state = 2
+ self.subscribed.set()
elif self.state == 2:
self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
self.state = 3
@@ -21,40 +20,32 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
def runTest(self):
self.state = 1
+ self.subscribed = threading.Event()
self.done = threading.Event()
run_test_server.authorize_with("admin")
api = arvados.api('v1', cache=False)
- ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], self.on_event)
- time.sleep(1)
+ self.ws = arvados.events.subscribe(arvados.api('v1', cache=False), [['object_uuid', 'is_a', 'arvados#human']], self.on_event, poll_fallback=2)
+ if not isinstance(self.ws, self.WS_TYPE):
+ self.fail()
+ self.subscribed.wait(10)
self.h = api.humans().create(body={}).execute()
self.done.wait(10)
self.assertEqual(3, self.state)
- ws.close()
-class PollClientTest(run_test_server.TestCaseWithServers):
- MAIN_SERVER = {}
+class WebsocketTest(run_test_server.TestCaseWithServers, EventTestBase):
+ MAIN_SERVER = {'websockets': True}
+ WS_TYPE = arvados.events.EventClient
- def on_event(self, ev):
- if self.state == 1:
- self.assertEqual(200, ev['status'])
- self.state = 2
- elif self.state == 2:
- self.assertEqual(self.h[u'uuid'], ev[u'object_uuid'])
- self.state = 3
- self.done.set()
- elif self.state == 3:
- self.fail()
+ def tearDown(self):
+ self.ws.close()
+ super(run_test_server.TestCaseWithServers, self).tearDown()
- def runTest(self):
- self.state = 1
- self.done = threading.Event()
- run_test_server.authorize_with("admin")
- api = arvados.api('v1', cache=False)
- ws = arvados.events.subscribe(api, [['object_uuid', 'is_a', 'arvados#human']], self.on_event, poll_fallback=2)
- time.sleep(1)
- self.h = api.humans().create(body={}).execute()
- self.done.wait(10)
- self.assertEqual(3, self.state)
- ws.close()
+class PollClientTest(run_test_server.TestCaseWithServers, EventTestBase):
+ MAIN_SERVER = {}
+ WS_TYPE = arvados.events.PollClient
+
+ def tearDown(self):
+ self.ws.close()
+ super(run_test_server.TestCaseWithServers, self).tearDown()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list