[ARVADOS] updated: 27b18bf9b168319660bdde4632ac4c3f359666d6

Git user git at public.curoverse.com
Mon Jun 20 14:29:27 EDT 2016


Summary of changes:
 services/api/config/application.default.yml     | 14 +++++++
 services/api/config/initializers/eventbus.rb    | 13 +++++++
 services/api/lib/eventbus.rb                    | 52 ++++++++++++-------------
 services/api/test/integration/websocket_test.rb |  8 ++--
 4 files changed, 55 insertions(+), 32 deletions(-)

       via  27b18bf9b168319660bdde4632ac4c3f359666d6 (commit)
      from  f5bd38d4b168540ddb0e33db9e35a51ec5d60c17 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 27b18bf9b168319660bdde4632ac4c3f359666d6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 20 14:29:21 2016 -0400

    9427: Add limits for connections, subscriptions, queued notifications, and
    database connections to configuration.  Check notify queue size on push instead
    of on pop and close connection immediately.

diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index f1c4dd0..0e005dc 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -70,6 +70,20 @@ common:
   # websockets, otherwise none at all.
   websocket_address: false
 
+  # Maximum number of websocket connections allowed
+  websocket_max_connections: 500
+
+  # Maximum number of events a single connection can be backlogged
+  websocket_max_notify_backlog: 1000
+
+  # Maximum number of subscriptions a single websocket connection can have
+  # active.
+  websocket_max_filters: 10
+
+  # When using multithreaded websocket server, set the size of the database
+  # connection pool.
+  websocket_db_pool: 50
+
   # Git repositories must be readable by api server, or you won't be
   # able to submit crunch jobs. To pass the test suites, put a clone
   # of the arvados tree in {git_repositories_dir}/arvados.git or
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index ea1c210..7c452c3 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -13,6 +13,19 @@ Server::Application.configure do
       :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
     }
     Rails.logger.info "Websockets #{ENV['ARVADOS_WEBSOCKETS']}, running at /websocket"
+
+    Rails.application.config.after_initialize do
+      ActiveRecord::Base.connection_pool.disconnect!
+
+      ActiveSupport.on_load(:active_record) do
+        config = ActiveRecord::Base.configurations[Rails.env] ||
+                 Rails.application.config.database_configuration[Rails.env]
+        config['pool'] = Rails.application.config.websocket_db_pool
+        ActiveRecord::Base.establish_connection(config)
+        Rails.logger.info "Database connection pool size #{Rails.application.config.websocket_db_pool}"
+      end
+    end
+
   else
     Rails.logger.info "Websockets disabled"
   end
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 0c5df82..aaeebdc 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -25,17 +25,17 @@ end
 module WebSocket
   class Driver
 
-    class Hybi < Driver
-      alias_method :_frame, :frame
+    class Server
+      alias_method :_write, :write
 
-      def frame(data, type = nil, code = nil)
+      def write(data)
         # Most of the sending activity will be from the thread set up in
         # on_connect.  However, there is also some automatic activity in the
-        # form of ping/pong messages, so ensure that the frame method (which
-        # actually packs and sends one complete message to the underlying
-        # socket) can only be called by one thread at a time.
-        @socket.frame_mtx.synchronize do
-          _frame(data, type, code)
+        # form of ping/pong messages, so ensure that the write method used to
+        # send one complete message to the underlying socket can only be
+        # called by one thread at a time.
+        self.frame_mtx.synchronize do
+          _write(data)
         end
       end
     end
@@ -186,7 +186,7 @@ class EventBus
           ws.sent_ids = Set.new
         end
 
-        if ws.filters.length < MAX_FILTERS
+        if ws.filters.length < Rails.configuration.websocket_max_filters
           # Add a filter.  This gets the :filters field which is the same
           # format as used for regular index queries.
           ws.filters << filter
@@ -195,7 +195,7 @@ class EventBus
           # Send any pending events
           push_events ws, nil
         else
-          ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+          ws.send ({status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"}.to_json)
         end
 
       elsif p[:method] == 'unsubscribe'
@@ -220,14 +220,9 @@ class EventBus
     end
   end
 
-  # Constant maximum number of filters, to avoid silly huge database queries.
-  MAX_FILTERS = 16
-  MAX_NOTIFY_BACKLOG = 1000
-  MAX_CONNECTIONS = 500
-
   def overloaded?
     @mtx.synchronize do
-      @connection_count >= MAX_CONNECTIONS
+      @connection_count >= Rails.configuration.websocket_max_connections
     end
   end
 
@@ -256,10 +251,13 @@ class EventBus
     # Subscribe to internal postgres notifications through @channel and
     # forward them to the thread associated with the connection.
     sub = @channel.subscribe do |msg|
-      if msg != :term
-        ws.queue << [:notify, msg]
-      else
+      if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
+        ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
         ws.close
+        @channel.unsubscribe sub
+        ws.queue.clear
+      else
+        ws.queue << [:notify, msg]
       end
     end
 
@@ -284,12 +282,9 @@ class EventBus
     # connections may not complete in a timely manner.
     Thread.new do
       # Loop and react to socket events.
-      loop do
-        eventType, msg = ws.queue.pop
-        if ws.queue.length > MAX_NOTIFY_BACKLOG
-          ws.send ({status: 500, message: 'Notify backlog too long'}.to_json)
-          ws.close
-        else
+      begin
+        loop do
+          eventType, msg = ws.queue.pop
           if eventType == :message
             handle_message ws, msg
           elsif eventType == :notify
@@ -298,9 +293,10 @@ class EventBus
             break
           end
         end
-      end
-      @mtx.synchronize do
-        @connection_count -= 1
+      ensure
+        @mtx.synchronize do
+          @connection_count -= 1
+        end
       end
     end
 
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index d1b8c34..25e7592 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -31,7 +31,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       ws.on :open do |event|
         opened = true
         if timeout
-          EM::Timer.new 4 do
+          EM::Timer.new 8 do
             too_long = true if close_status.nil?
             EM.stop_event_loop
           end
@@ -597,10 +597,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       ws.on :message do |event|
         d = Oj.strict_load event.data
         case state
-        when (1..EventBus::MAX_FILTERS)
+        when (1..Rails.configuration.websocket_max_filters)
           assert_equal 200, d["status"]
           state += 1
-        when (EventBus::MAX_FILTERS+1)
+        when (Rails.configuration.websocket_max_filters+1)
           assert_equal 403, d["status"]
           ws.close
         end
@@ -608,7 +608,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
     end
 
-    assert_equal 17, state
+    assert_equal Rails.configuration.websocket_max_filters+1, state
 
   end
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list