[ARVADOS] updated: 7c12fc4d989dd2b4c47a174280a4f9526ecb0798
git at public.curoverse.com
git at public.curoverse.com
Fri Apr 25 11:19:26 EDT 2014
Summary of changes:
services/api/lib/eventbus.rb | 120 +++++--
services/api/test/integration/websocket_test.rb | 475 +++++++++++++++++++++--
2 files changed, 538 insertions(+), 57 deletions(-)
via 7c12fc4d989dd2b4c47a174280a4f9526ecb0798 (commit)
from 0a72fd5c9f764eba4fc295d8beb24ac3d01885c5 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7c12fc4d989dd2b4c47a174280a4f9526ecb0798
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 25 11:19:22 2014 -0400
Added tests, especially for error handling behavior. Fixed bugs found by tests.
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 0865567..ac023e5 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -15,17 +15,20 @@ end
class Filter
include LoadParam
- def initialize p
- @p = p
+ attr_accessor :filters
+
+ def initialize p, fid
+ @params = p
+ @filter_id = fid
load_filters_param
end
def params
- @p
+ @params
end
- def filters
- @filters
+ def filter_id
+ @filter_id
end
end
@@ -33,42 +36,44 @@ class EventBus
include CurrentApiClient
include RecordFilters
+ # used in RecordFilters
+ def model_class
+ Log
+ end
+
+ # used in RecordFilters
+ def table_name
+ model_class.table_name
+ end
+
def initialize
@channel = EventMachine::Channel.new
@mtx = Mutex.new
@bgthread = false
+ @filter_id_counter = 0
end
- def on_connect ws
- if not current_user
- ws.send ({status: 401, message: "Valid API token required"}.to_json)
- ws.close
- return
- end
-
- ws.user = current_user
- ws.filters = []
- ws.last_log_id = nil
+ def alloc_filter_id
+ (@filter_id_counter += 1)
+ end
- sub = @channel.subscribe do |msg|
+ def push_events ws, msg = nil
begin
# Must have at least one filter set up to receive events
if ws.filters.length > 0
-
# Start with log rows readable by user, sorted in ascending order
logs = Log.readable_by(ws.user).order("id asc")
if ws.last_log_id
- # Only get log rows that are new
- logs = logs.where("logs.id > ? and logs.id <= ?", ws.last_log_id, msg.to_i)
- else
+ # Only interested in log rows that are new
+ logs = logs.where("logs.id > ?", ws.last_log_id)
+ elsif msg
# No last log id, so only look at the most recently changed row
logs = logs.where("logs.id = ?", msg.to_i)
+ else
+ return
end
- # Record the most recent row
- ws.last_log_id = msg.to_i
-
# Now process filters provided by client
cond_out = []
param_out = []
@@ -86,25 +91,76 @@ class EventBus
# Finally execute query and send matching rows
logs.each do |l|
ws.send(l.as_api_response.to_json)
+ ws.last_log_id = l.id
end
- else
+ elsif msg
# No filters set up, so just record the sequence number
- ws.last_log_id.nil = msg.to_i
+ ws.last_log_id = msg.to_i
end
rescue Exception => e
- puts "#{e}"
+ puts "Error publishing event: #{$!}"
+ puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+ ws.send ({status: 500, message: 'error'}.to_json)
ws.close
end
+ end
+
+ MAX_FILTERS = 16
+
+ def on_connect ws
+ if not current_user
+ ws.send ({status: 401, message: "Valid API token required"}.to_json)
+ ws.close
+ return
+ end
+
+ ws.user = current_user
+ ws.filters = []
+ ws.last_log_id = nil
+
+ sub = @channel.subscribe do |msg|
+ push_events ws, msg
end
ws.on :message do |event|
- p = Oj.load event.data
- if p["method"] == 'subscribe'
- if p["starting_log_id"]
- ws.last_log_id = p["starting_log_id"].to_i
+ begin
+ p = (Oj.load event.data).symbolize_keys
+ if p[:method] == 'subscribe'
+ if p[:last_log_id]
+ ws.last_log_id = p[:last_log_id].to_i
+ end
+
+ if ws.filters.length < MAX_FILTERS
+ filter_id = alloc_filter_id
+ ws.filters.push Filter.new(p, filter_id)
+ ws.send ({status: 200, message: 'subscribe ok', filter_id: filter_id}.to_json)
+ push_events ws
+ else
+ ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+ end
+ elsif p[:method] == 'unsubscribe'
+ if filter_id = p[:filter_id]
+ filter_id = filter_id.to_i
+ len = ws.filters.length
+ ws.filters = ws.filters.select { |f| f.filter_id != filter_id }
+ if ws.filters.length < len
+ ws.send ({status: 200, message: 'unsubscribe ok', filter_id: filter_id}.to_json)
+ else
+ ws.send ({status: 404, message: 'filter_id not found', filter_id: filter_id}.to_json)
+ end
+ else
+ ws.send ({status: 400, message: 'must provide filter_id'}.to_json)
+ end
+ else
+ ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
end
- ws.filters.push(Filter.new p)
- ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+ rescue Oj::Error => e
+ ws.send ({status: 400, message: "malformed request"}.to_json)
+ rescue Exception => e
+ puts "Error handling message: #{$!}"
+ puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+ ws.send ({status: 500, message: 'error'}.to_json)
+ ws.close
end
end
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index 8f28fc8..666175d 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -3,7 +3,7 @@ require 'websocket_runner'
require 'oj'
require 'database_cleaner'
-DatabaseCleaner.strategy = :deletion
+DatabaseCleaner.strategy = :truncation
class WebsocketTest < ActionDispatch::IntegrationTest
self.use_transactional_fixtures = false
@@ -16,8 +16,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
DatabaseCleaner.clean
end
- def ws_helper (token = nil)
+ def ws_helper (token = nil, timeout = true)
+ opened = false
close_status = nil
+ too_long = false
EM.run {
if token
@@ -26,31 +28,33 @@ class WebsocketTest < ActionDispatch::IntegrationTest
ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
end
- ws.on :close do |event|
- close_status = [:close, event.code, event.reason]
- EM.stop_event_loop
+ ws.on :open do |event|
+ opened = true
+ if timeout
+ EM::Timer.new 3 do
+ too_long = true
+ EM.stop_event_loop
+ end
+ end
end
- EM::Timer.new 3 do
+ ws.on :close do |event|
+ close_status = [:close, event.code, event.reason]
EM.stop_event_loop
end
yield ws
}
- assert_not_nil close_status, "Test took too long"
- assert_equal 1000, close_status[1], "Server closed the connection unexpectedly (check server log for errors)"
+ assert opened, "Should have opened web socket"
+ assert (not too_long), "Test took too long"
+ assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
end
test "connect with no token" do
- opened = false
status = nil
ws_helper do |ws|
- ws.on :open do |event|
- opened = true
- end
-
ws.on :message do |event|
d = Oj.load event.data
status = d["status"]
@@ -58,18 +62,15 @@ class WebsocketTest < ActionDispatch::IntegrationTest
end
end
- assert opened, "Should have opened web socket"
assert_equal 401, status
end
test "connect, subscribe and get response" do
- opened = false
status = nil
ws_helper :admin do |ws|
ws.on :open do |event|
- opened = true
ws.send ({method: 'subscribe'}.to_json)
end
@@ -80,21 +81,18 @@ class WebsocketTest < ActionDispatch::IntegrationTest
end
end
- assert opened, "Should have opened web socket"
assert_equal 200, status
end
test "connect, subscribe, get event" do
- opened = false
state = 1
- spec_uuid = nil
+ spec = nil
ev_uuid = nil
authorize_with :admin
ws_helper :admin do |ws|
ws.on :open do |event|
- opened = true
ws.send ({method: 'subscribe'}.to_json)
end
@@ -104,8 +102,6 @@ class WebsocketTest < ActionDispatch::IntegrationTest
when 1
assert_equal 200, d["status"]
spec = Specimen.create
- spec.save
- spec_uuid = spec.uuid
state = 2
when 2
ev_uuid = d["object_uuid"]
@@ -115,9 +111,438 @@ class WebsocketTest < ActionDispatch::IntegrationTest
end
- assert opened, "Should have opened web socket"
- assert_not_nil spec_uuid
- assert_equal spec_uuid, ev_uuid
+ assert_not_nil spec
+ assert_equal spec.uuid, ev_uuid
+ end
+
+ test "connect, subscribe, get two events" do
+ state = 1
+ spec = nil
+ human = nil
+ spec_ev_uuid = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ human = Human.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ state = 3
+ when 3
+ human_ev_uuid = d["object_uuid"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_not_nil human
+ assert_equal spec.uuid, spec_ev_uuid
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+ test "connect, subscribe, filter events" do
+ state = 1
+ human = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ Specimen.create
+ human = Human.create
+ state = 2
+ when 2
+ human_ev_uuid = d["object_uuid"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_not_nil human
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+
+ test "connect, subscribe, multiple filters" do
+ state = 1
+ spec = nil
+ human = nil
+ spec_ev_uuid = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ state = 2
+ when 2
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ human = Human.create
+ state = 3
+ when 3
+ spec_ev_uuid = d["object_uuid"]
+ state = 4
+ when 4
+ human_ev_uuid = d["object_uuid"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_not_nil human
+ assert_equal spec.uuid, spec_ev_uuid
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+ test "connect, subscribe, ask events starting at seq num" do
+ state = 1
+ human = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ lastid = logs(:log3).id
+ l1 = nil
+ l2 = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ state = 2
+ when 2
+ l1 = d["object_uuid"]
+ state = 3
+ when 3
+ l2 = d["object_uuid"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_equal l1, logs(:log4).object_uuid
+ assert_equal l2, logs(:log5).object_uuid
+ end
+
+ test "connect, subscribe, get event, unsubscribe" do
+ state = 1
+ spec = nil
+ spec_ev_uuid = nil
+ filter_id = nil
+
+ authorize_with :admin
+
+ ws_helper :admin, false do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ EM::Timer.new 3 do
+ ws.close
+ end
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ filter_id = d["filter_id"]
+ spec = Specimen.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ ws.send ({method: 'unsubscribe', filter_id: filter_id}.to_json)
+
+ EM::Timer.new 1 do
+ Human.create
+ end
+
+ state = 3
+ when 3
+ assert_equal 200, d["status"]
+ state = 4
+ when 4
+ assert false, "Should not get any more events"
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_equal spec.uuid, spec_ev_uuid
+ end
+
+
+ test "connect, subscribe, get event, try to unsubscribe with bogus filter_id" do
+ state = 1
+ spec = nil
+ spec_ev_uuid = nil
+ human = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ ws.send ({method: 'unsubscribe', filter_id: 100000}.to_json)
+
+ EM::Timer.new 1 do
+ human = Human.create
+ end
+
+ state = 3
+ when 3
+ assert_equal 404, d["status"]
+ state = 4
+ when 4
+ human_ev_uuid = d["object_uuid"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_not_nil human
+ assert_equal spec.uuid, spec_ev_uuid
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+ test "connect, subscribe, get event, try to unsubscribe with missing filter_id" do
+ state = 1
+ spec = nil
+ spec_ev_uuid = nil
+ human = nil
+ human_ev_uuid = nil
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ spec = Specimen.create
+ state = 2
+ when 2
+ spec_ev_uuid = d["object_uuid"]
+ ws.send ({method: 'unsubscribe'}.to_json)
+
+ EM::Timer.new 1 do
+ human = Human.create
+ end
+
+ state = 3
+ when 3
+ assert_equal 400, d["status"]
+ state = 4
+ when 4
+ human_ev_uuid = d["object_uuid"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_not_nil spec
+ assert_not_nil human
+ assert_equal spec.uuid, spec_ev_uuid
+ assert_equal human.uuid, human_ev_uuid
+ end
+
+
+ test "connected, not subscribed, no event" do
+ authorize_with :admin
+
+ ws_helper :admin, false do |ws|
+ ws.on :open do |event|
+ EM::Timer.new 1 do
+ Specimen.create
+ end
+
+ EM::Timer.new 3 do
+ ws.close
+ end
+ end
+
+ ws.on :message do |event|
+ assert false, "Should not get any messages, message was #{event.data}"
+ end
+ end
+ end
+
+ test "connected, not authorized to see event" do
+ state = 1
+
+ authorize_with :admin
+
+ ws_helper :active, false do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'subscribe'}.to_json)
+
+ EM::Timer.new 3 do
+ ws.close
+ end
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when 1
+ assert_equal 200, d["status"]
+ Specimen.create
+ state = 2
+ when 2
+ assert false, "Should not get any messages, message was #{event.data}"
+ end
+ end
+
+ end
+
+ end
+
+ test "connect, try bogus method" do
+ status = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({method: 'frobnabble'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 400, status
+ end
+
+ test "connect, missing method" do
+ status = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send ({fizzbuzz: 'frobnabble'}.to_json)
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 400, status
+ end
+
+ test "connect, send malformed request" do
+ status = nil
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ ws.send '<XML4EVER></XML4EVER>'
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ status = d["status"]
+ ws.close
+ end
+ end
+
+ assert_equal 400, status
+ end
+
+
+ test "connect, try subscribe too many filters" do
+ state = 1
+
+ authorize_with :admin
+
+ ws_helper :admin do |ws|
+ ws.on :open do |event|
+ (1..17).each do |i|
+ ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
+ end
+ end
+
+ ws.on :message do |event|
+ d = Oj.load event.data
+ case state
+ when (1..16)
+ assert_equal 200, d["status"]
+ state += 1
+ when 17
+ assert_equal 403, d["status"]
+ ws.close
+ end
+ end
+
+ end
+
+ assert_equal 17, state
+
end
end
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list