[ARVADOS] updated: 27b18bf9b168319660bdde4632ac4c3f359666d6
Git user
git at public.curoverse.com
Mon Jun 20 14:29:27 EDT 2016
Summary of changes:
services/api/config/application.default.yml | 14 +++++++
services/api/config/initializers/eventbus.rb | 13 +++++++
services/api/lib/eventbus.rb | 52 ++++++++++++-------------
services/api/test/integration/websocket_test.rb | 8 ++--
4 files changed, 55 insertions(+), 32 deletions(-)
via 27b18bf9b168319660bdde4632ac4c3f359666d6 (commit)
from f5bd38d4b168540ddb0e33db9e35a51ec5d60c17 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 27b18bf9b168319660bdde4632ac4c3f359666d6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 20 14:29:21 2016 -0400
9427: Add limits for connections, subscriptions, queued notifications, and
database connections to configuration. Check notify queue size on push instead
of on pop and close connection immediately.
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index f1c4dd0..0e005dc 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -70,6 +70,20 @@ common:
# websockets, otherwise none at all.
websocket_address: false
+ # Maximum number of websocket connections allowed
+ websocket_max_connections: 500
+
+ # Maximum number of events a single connection can be backlogged
+ websocket_max_notify_backlog: 1000
+
+ # Maximum number of subscriptions a single websocket connection can have
+ # active.
+ websocket_max_filters: 10
+
+ # When using multithreaded websocket server, set the size of the database
+ # connection pool.
+ websocket_db_pool: 50
+
# Git repositories must be readable by api server, or you won't be
# able to submit crunch jobs. To pass the test suites, put a clone
# of the arvados tree in {git_repositories_dir}/arvados.git or
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index ea1c210..7c452c3 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -13,6 +13,19 @@ Server::Application.configure do
:websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
}
Rails.logger.info "Websockets #{ENV['ARVADOS_WEBSOCKETS']}, running at /websocket"
+
+ Rails.application.config.after_initialize do
+ ActiveRecord::Base.connection_pool.disconnect!
+
+ ActiveSupport.on_load(:active_record) do
+ config = ActiveRecord::Base.configurations[Rails.env] ||
+ Rails.application.config.database_configuration[Rails.env]
+ config['pool'] = Rails.application.config.websocket_db_pool
+ ActiveRecord::Base.establish_connection(config)
+ Rails.logger.info "Database connection pool size #{Rails.application.config.websocket_db_pool}"
+ end
+ end
+
else
Rails.logger.info "Websockets disabled"
end
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 0c5df82..aaeebdc 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -25,17 +25,17 @@ end
module WebSocket
class Driver
- class Hybi < Driver
- alias_method :_frame, :frame
+ class Server
+ alias_method :_write, :write
- def frame(data, type = nil, code = nil)
+ def write(data)
# Most of the sending activity will be from the thread set up in
# on_connect. However, there is also some automatic activity in the
- # form of ping/pong messages, so ensure that the frame method (which
- # actually packs and sends one complete message to the underlying
- # socket) can only be called by one thread at a time.
- @socket.frame_mtx.synchronize do
- _frame(data, type, code)
+ # form of ping/pong messages, so ensure that the write method used to
+ # send one complete message to the underlying socket can only be
+ # called by one thread at a time.
+ self.frame_mtx.synchronize do
+ _write(data)
end
end
end
@@ -186,7 +186,7 @@ class EventBus
ws.sent_ids = Set.new
end
- if ws.filters.length < MAX_FILTERS
+ if ws.filters.length < Rails.configuration.websocket_max_filters
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
ws.filters << filter
@@ -195,7 +195,7 @@ class EventBus
# Send any pending events
push_events ws, nil
else
- ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+ ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
end
elsif p[:method] == 'unsubscribe'
@@ -220,14 +220,9 @@ class EventBus
end
end
- # Constant maximum number of filters, to avoid silly huge database queries.
- MAX_FILTERS = 16
- MAX_NOTIFY_BACKLOG = 1000
- MAX_CONNECTIONS = 500
-
def overloaded?
@mtx.synchronize do
- @connection_count >= MAX_CONNECTIONS
+ @connection_count >= Rails.configuration.websocket_max_connections
end
end
@@ -256,10 +251,13 @@ class EventBus
# Subscribe to internal postgres notifications through @channel and
# forward them to the thread associated with the connection.
sub = @channel.subscribe do |msg|
- if msg != :term
- ws.queue << [:notify, msg]
- else
+ if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
+ ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
ws.close
+ @channel.unsubscribe sub
+ ws.queue.clear
+ else
+ ws.queue << [:notify, msg]
end
end
@@ -284,12 +282,9 @@ class EventBus
# connections may not complete in a timely manner.
Thread.new do
# Loop and react to socket events.
- loop do
- eventType, msg = ws.queue.pop
- if ws.queue.length > MAX_NOTIFY_BACKLOG
- ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
- ws.close
- else
+ begin
+ loop do
+ eventType, msg = ws.queue.pop
if eventType == :message
handle_message ws, msg
elsif eventType == :notify
@@ -298,9 +293,10 @@ class EventBus
break
end
end
- end
- @mtx.synchronize do
- @connection_count -= 1
+ ensure
+ @mtx.synchronize do
+ @connection_count -= 1
+ end
end
end
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index d1b8c34..25e7592 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -31,7 +31,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
ws.on :open do |event|
opened = true
if timeout
- EM::Timer.new 4 do
+ EM::Timer.new 8 do
too_long = true if close_status.nil?
EM.stop_event_loop
end
@@ -597,10 +597,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
ws.on :message do |event|
d = Oj.strict_load event.data
case state
- when (1..EventBus::MAX_FILTERS)
+ when (1..Rails.configuration.websocket_max_filters)
assert_equal 200, d["status"]
state += 1
- when (EventBus::MAX_FILTERS+1)
+ when (Rails.configuration.websocket_max_filters+1)
assert_equal 403, d["status"]
ws.close
end
@@ -608,7 +608,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
end
- assert_equal 17, state
+ assert_equal Rails.configuration.websocket_max_filters+1, state
end
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list