[ARVADOS] updated: 1c6515a4445e9fd79961c9c8caf21528f16d9399
git at public.curoverse.com
git at public.curoverse.com
Sat Jul 18 09:21:47 EDT 2015
Summary of changes:
.../app/controllers/application_controller.rb | 1 +
.../test/integration/application_layout_test.rb | 8 ++++
sdk/python/arvados/commands/ws.py | 7 ++--
sdk/python/arvados/events.py | 17 ++++----
sdk/python/tests/run_test_server.py | 7 ++--
sdk/python/tests/test_websockets.py | 47 ++++++++++++++++------
6 files changed, 61 insertions(+), 26 deletions(-)
via 1c6515a4445e9fd79961c9c8caf21528f16d9399 (commit)
via 210883bcb2428ad0262a4b064c56612de0a22ef6 (commit)
via 702ce6c3da9a1b09a2ed546fd3da775d21bd703b (commit)
via 6ba447b83ef96f0f52db5eebd04fd22e5a0e1c74 (commit)
via 12ca84522ca29c237c477ab1974299d04715b09d (commit)
via 25e646a708d1d91aebcf8db80b8ae1fafa044034 (commit)
via 041af47977925c319ad3b6a809089eb64ffdd738 (commit)
from 22f68d307a3229fbf842fba0f61ea0ae0330832b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1c6515a4445e9fd79961c9c8caf21528f16d9399
Author: radhika <radhika at curoverse.com>
Date: Sat Jul 18 09:21:05 2015 -0400
6473: include last_log_id when start_time argument is provided.
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 26d528a..be48dc0 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -14,7 +14,7 @@ def main(arguments=None):
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
- parser.add_argument('-s', '--start_time', type=str, default="", help="Arvados query filter to apply to log events created after this time. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+ parser.add_argument('-s', '--start_time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
group = parser.add_mutually_exclusive_group()
group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
@@ -62,7 +62,8 @@ def main(arguments=None):
filters += [ ['object_uuid', '=', args.pipeline] ]
if args.start_time:
- filters += [ ['created_at', '>', args.start_time] ]
+ args.last_log_id = 1
+ filters += [ ['created_at', '>=', args.start_time] ]
def on_message(ev):
global filters
@@ -89,7 +90,7 @@ def main(arguments=None):
print json.dumps(ev)
try:
- ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval)
+ ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval, last_log_id=args.last_log_id)
if ws:
if args.pipeline:
c = api.pipeline_instances().get(uuid=args.pipeline).execute()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 3036a25..b187200 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -14,7 +14,7 @@ from ws4py.client.threadedclient import WebSocketClient
_logger = logging.getLogger('arvados.events')
class EventClient(WebSocketClient):
- def __init__(self, url, filters, on_event):
+ def __init__(self, url, filters, on_event, last_log_id):
ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
ssl_options['cert_reqs'] = ssl.CERT_NONE
@@ -28,9 +28,10 @@ class EventClient(WebSocketClient):
super(EventClient, self).__init__(url, ssl_options=ssl_options)
self.filters = filters
self.on_event = on_event
+ self.last_log_id = last_log_id
def opened(self):
- self.subscribe(self.filters)
+ self.subscribe(self.filters, self.last_log_id)
def received_message(self, m):
self.on_event(json.loads(str(m)))
@@ -109,13 +110,13 @@ class PollClient(threading.Thread):
del self.filters[self.filters.index(filters)]
-def _subscribe_websocket(api, filters, on_event):
+def _subscribe_websocket(api, filters, on_event, last_log_id=None):
endpoint = api._rootDesc.get('websocketUrl', None)
if not endpoint:
raise errors.FeatureNotEnabledError(
"Server does not advertise a websocket endpoint")
uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
- client = EventClient(uri_with_token, filters, on_event)
+ client = EventClient(uri_with_token, filters, on_event, last_log_id)
ok = False
try:
client.connect()
@@ -125,7 +126,7 @@ def _subscribe_websocket(api, filters, on_event):
if not ok:
client.close_connection()
-def subscribe(api, filters, on_event, poll_fallback=15):
+def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
"""
:api:
a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
@@ -135,13 +136,15 @@ def subscribe(api, filters, on_event, poll_fallback=15):
The callback when a message is received.
:poll_fallback:
If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available.
+ :last_log_id:
+ Log rows that are newer than the log id
"""
if not poll_fallback:
- return _subscribe_websocket(api, filters, on_event)
+ return _subscribe_websocket(api, filters, on_event, last_log_id)
try:
- return _subscribe_websocket(api, filters, on_event)
+ return _subscribe_websocket(api, filters, on_event, last_log_id)
except Exception as e:
_logger.warn("Falling back to polling after websocket error: %s" % e)
p = PollClient(api, filters, on_event, poll_fallback)
diff --git a/sdk/python/tests/test_websockets.py b/sdk/python/tests/test_websockets.py
index e82d569..c62397f 100644
--- a/sdk/python/tests/test_websockets.py
+++ b/sdk/python/tests/test_websockets.py
@@ -6,6 +6,7 @@ import arvados.events
import mock
import threading
from datetime import datetime, timedelta
+import time
class WebsocketTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
@@ -18,20 +19,40 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
self.ws.close()
super(WebsocketTest, self).tearDown()
- def _test_subscribe(self, poll_fallback, expect_type, additional_filters=None):
+ def _test_subscribe(self, poll_fallback, expect_type, last_log_id=None, additional_filters=None, expected=1):
run_test_server.authorize_with('active')
- events = Queue.Queue(3)
+ events = Queue.Queue(100)
filters = [['object_uuid', 'is_a', 'arvados#human']]
if additional_filters:
filters = filters + additional_filters
+
+ # Create an extra object before subscribing and verify that as well
+ ancestor = arvados.api('v1').humans().create(body={}).execute()
+ time.sleep(5)
+
self.ws = arvados.events.subscribe(
arvados.api('v1'), filters,
- events.put, poll_fallback=poll_fallback)
+ events.put, poll_fallback=poll_fallback, last_log_id=last_log_id)
self.assertIsInstance(self.ws, expect_type)
self.assertEqual(200, events.get(True, 10)['status'])
human = arvados.api('v1').humans().create(body={}).execute()
- self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid'])
- self.assertTrue(events.empty(), "got more events than expected")
+
+ if last_log_id == None or expected == 0:
+ self.assertEqual(human['uuid'], events.get(True, 10)['object_uuid'])
+ self.assertTrue(events.empty(), "got more events than expected")
+ else:
+ log_events = []
+ for i in range(0, 10):
+ try:
+ event = events.get(True, 10)
+ self.assertTrue(event['object_uuid'] is not None)
+ log_events.append(event['object_uuid'])
+ except:
+ break;
+
+ self.assertTrue(len(log_events)>1)
+ self.assertTrue(human['uuid'] in log_events)
+ self.assertTrue(ancestor['uuid'] in log_events)
def test_subscribe_websocket(self):
self._test_subscribe(
@@ -40,28 +61,28 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
def test_subscribe_websocket_with_start_time_today(self):
now = datetime.today()
self._test_subscribe(
- poll_fallback=False, expect_type=arvados.events.EventClient,
- additional_filters=[['created_at', '>', now.strftime('%Y-%m-%d')]])
+ poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
+ additional_filters=[['created_at', '>=', now.strftime('%Y-%m-%d')]])
def test_subscribe_websocket_with_start_time_last_hour(self):
lastHour = datetime.today() - timedelta(hours = 1)
self._test_subscribe(
- poll_fallback=False, expect_type=arvados.events.EventClient,
- additional_filters=[['created_at', '>', lastHour.strftime('%Y-%m-%d %H:%M:%S')]])
+ poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
+ additional_filters=[['created_at', '>=', lastHour.strftime('%Y-%m-%d %H:%M:%S')]])
def test_subscribe_websocket_with_start_time_next_hour(self):
nextHour = datetime.today() + timedelta(hours = 1)
with self.assertRaises(Queue.Empty):
self._test_subscribe(
- poll_fallback=False, expect_type=arvados.events.EventClient,
- additional_filters=[['created_at', '>', nextHour.strftime('%Y-%m-%d %H:%M:%S')]])
+ poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
+ additional_filters=[['created_at', '>=', nextHour.strftime('%Y-%m-%d %H:%M:%S')]], expected=0)
def test_subscribe_websocket_with_start_time_tomorrow(self):
tomorrow = datetime.today() + timedelta(hours = 24)
with self.assertRaises(Queue.Empty):
self._test_subscribe(
- poll_fallback=False, expect_type=arvados.events.EventClient,
- additional_filters=[['created_at', '>', tomorrow.strftime('%Y-%m-%d')]])
+ poll_fallback=False, expect_type=arvados.events.EventClient, last_log_id=1,
+ additional_filters=[['created_at', '>=', tomorrow.strftime('%Y-%m-%d')]], expected=0)
@mock.patch('arvados.events.EventClient.__init__')
def test_subscribe_poll(self, event_client_constr):
commit 210883bcb2428ad0262a4b064c56612de0a22ef6
Merge: 22f68d3 702ce6c
Author: radhika <radhika at curoverse.com>
Date: Fri Jul 17 15:53:33 2015 -0400
Merge branch 'master' into 6473-fetch-events-starting-at
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list