[ARVADOS] updated: de083a9fec0ca08afda5a9369c6cd32dbdcd0965

Git user git at public.curoverse.com
Tue Apr 11 10:45:36 EDT 2017


Summary of changes:
 services/api/config/initializers/eventbus.rb | 33 ++++++++++++++++++----------
 1 file changed, 22 insertions(+), 11 deletions(-)

  discards  1fad1bfb72fc2e97cf0299413bb70219f1aeffdc (commit)
  discards  e9bf021dad9b0047a480ad1da0ec6c54fc42aa71 (commit)
       via  de083a9fec0ca08afda5a9369c6cd32dbdcd0965 (commit)
       via  f5d09a4904e609b5df20edd0194a9f1ade40c28a (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (1fad1bfb72fc2e97cf0299413bb70219f1aeffdc)
            \
             N -- N -- N (de083a9fec0ca08afda5a9369c6cd32dbdcd0965)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit de083a9fec0ca08afda5a9369c6cd32dbdcd0965
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Apr 10 16:56:37 2017 -0400

    7709: Clear deliveries before each test. Fixes flaky test.

diff --git a/services/api/test/functional/arvados/v1/users_controller_test.rb b/services/api/test/functional/arvados/v1/users_controller_test.rb
index 98643a9..f98e482 100644
--- a/services/api/test/functional/arvados/v1/users_controller_test.rb
+++ b/services/api/test/functional/arvados/v1/users_controller_test.rb
@@ -8,6 +8,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
   setup do
     @initial_link_count = Link.count
     @vm_uuid = virtual_machines(:testvm).uuid
+    ActionMailer::Base.deliveries = []
   end
 
   test "activate a user after signing UA" do

commit f5d09a4904e609b5df20edd0194a9f1ade40c28a
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Apr 10 14:57:56 2017 -0400

    7709: Remove Ruby websocket server.

diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index d10e60c..4c6c2d3 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -314,10 +314,7 @@ def run(leave_running_atexit=False):
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
     env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
-    if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'):
-        env.pop('ARVADOS_WEBSOCKETS', None)
-    else:
-        env['ARVADOS_WEBSOCKETS'] = 'yes'
+    env.pop('ARVADOS_WEBSOCKETS', None)
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
diff --git a/services/api/app/controllers/arvados/v1/schema_controller.rb b/services/api/app/controllers/arvados/v1/schema_controller.rb
index 443c650..11269d2 100644
--- a/services/api/app/controllers/arvados/v1/schema_controller.rb
+++ b/services/api/app/controllers/arvados/v1/schema_controller.rb
@@ -38,6 +38,7 @@ class Arvados::V1::SchemaController < ApplicationController
         blobSignatureTtl: Rails.application.config.blob_signature_ttl,
         maxRequestSize: Rails.application.config.max_request_size,
         dockerImageFormats: Rails.application.config.docker_image_formats,
+        websocketUrl: Rails.application.config.websocket_address,
         parameters: {
           alt: {
             type: "string",
@@ -83,12 +84,6 @@ class Arvados::V1::SchemaController < ApplicationController
         resources: {}
       }
 
-      if Rails.application.config.websocket_address
-        discovery[:websocketUrl] = Rails.application.config.websocket_address
-      elsif ENV['ARVADOS_WEBSOCKETS']
-        discovery[:websocketUrl] = root_url.sub(/^http/, 'ws') + "websocket"
-      end
-
       ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
         begin
           ctl_class = "Arvados::V1::#{k.to_s.pluralize}Controller".constantize
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index a5c9de0..85955be 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -46,28 +46,16 @@ common:
   # to log in.
   workbench_address: false
 
-  # The ARVADOS_WEBSOCKETS environment variable determines whether to
-  # serve http, websockets, or both.
+  # Client-facing URI for websocket service. Nginx should be
+  # configured to proxy this URI to arvados-ws; see
+  # http://doc.arvados.org/install/install-ws.html
   #
-  # If ARVADOS_WEBSOCKETS="true", http and websockets are both served
-  # from the same process.
+  # If websocket_address is false (which is the default), no websocket
+  # server will be advertised to clients. This configuration is not
+  # supported.
   #
-  # If ARVADOS_WEBSOCKETS="ws-only", only websockets is served.
-  #
-  # If ARVADOS_WEBSOCKETS="false" or not set at all, only http is
-  # served. In this case, you should have a separate process serving
-  # websockets, and the address of that service should be given here
-  # as websocket_address.
-  #
-  # If websocket_address is false (which is the default), the
-  # discovery document will tell clients to use the current server as
-  # the websocket service, or (if the current server does not have
-  # websockets enabled) not to use websockets at all.
-  #
-  # Example: Clients will connect to the specified endpoint.
-  #websocket_address: wss://127.0.0.1:3333/websocket
-  # Default: Clients will connect to this server if it's running
-  # websockets, otherwise none at all.
+  # Example:
+  #websocket_address: wss://ws.zzzzz.arvadosapi.com/websocket
   websocket_address: false
 
   # Maximum number of websocket connections allowed
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index ea1c210..ad0120f 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -1,19 +1,27 @@
-require 'eventbus'
+if ENV['ARVADOS_WEBSOCKETS']
+  Server::Application.configure do
+    Rails.logger.error "Built-in websocket server is disabled. See note (2017-03-23, e8cc0d7) at https://dev.arvados.org/projects/arvados/wiki/Upgrading_to_master"
 
-# See application.yml for details about configuring the websocket service.
+    class EventBusRemoved
+      def overloaded?
+        false
+      end
+      def on_connect ws
+        ws.on :open do |e|
+          EM::Timer.new 1 do
+            ws.send(SafeJSON.dump({status: 501, message: "Server misconfigured? see http://doc.arvados.org/install/install-ws.html"}))
+          end
+          EM::Timer.new 3 do
+            ws.close
+          end
+        end
+      end
+    end
 
-Server::Application.configure do
-  # Enables websockets if ARVADOS_WEBSOCKETS is defined with any value.  If
-  # ARVADOS_WEBSOCKETS=ws-only, server will only accept websocket connections
-  # and return an error response for all other requests.
-  if ENV['ARVADOS_WEBSOCKETS']
-    config.middleware.insert_after ArvadosApiToken, RackSocket, {
-      :handler => EventBus,
-      :mount => "/websocket",
-      :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
-    }
-    Rails.logger.info "Websockets #{ENV['ARVADOS_WEBSOCKETS']}, running at /websocket"
-  else
-    Rails.logger.info "Websockets disabled"
+    config.middleware.insert_after(ArvadosApiToken, RackSocket, {
+                                     handler: EventBusRemoved,
+                                     mount: "/websocket",
+                                     websocket_only: (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
+                                   })
   end
 end
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
deleted file mode 100644
index 11b178d..0000000
--- a/services/api/lib/eventbus.rb
+++ /dev/null
@@ -1,358 +0,0 @@
-# If any threads raise an unhandled exception, make them all die.
-# We trust a supervisor like runit to restart the server in this case.
-Thread.abort_on_exception = true
-
-require 'eventmachine'
-require 'faye/websocket'
-require 'load_param'
-require 'oj'
-require 'record_filters'
-require 'safe_json'
-require 'set'
-require 'thread'
-
-# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
-module Faye
-  class WebSocket
-    attr_accessor :user
-    attr_accessor :last_log_id
-    attr_accessor :filters
-    attr_accessor :sent_ids
-    attr_accessor :queue
-    attr_accessor :frame_mtx
-  end
-end
-
-module WebSocket
-  class Driver
-
-    class Server
-      alias_method :_write, :write
-
-      def write(data)
-        # Most of the sending activity will be from the thread set up in
-        # on_connect.  However, there is also some automatic activity in the
-        # form of ping/pong messages, so ensure that the write method used to
-        # send one complete message to the underlying socket can only be
-        # called by one thread at a time.
-        self.frame_mtx.synchronize do
-          _write(data)
-        end
-      end
-    end
-  end
-end
-
-# Store the filters supplied by the user that will be applied to the logs table
-# to determine which events to return to the listener.
-class Filter
-  include LoadParam
-
-  attr_accessor :filters
-
-  def initialize p
-    @params = p
-    load_filters_param
-  end
-
-  def params
-    @params
-  end
-end
-
-# Manages websocket connections, accepts subscription messages and publishes
-# log table events.
-class EventBus
-  include CurrentApiClient
-  include RecordFilters
-
-  # used in RecordFilters
-  def model_class
-    Log
-  end
-
-  # Initialize EventBus.  Takes no parameters.
-  def initialize
-    @channel = EventMachine::Channel.new
-    @mtx = Mutex.new
-    @bgthread = false
-    @connection_count = 0
-  end
-
-  def send_message(ws, obj)
-    ws.send(SafeJSON.dump(obj))
-  end
-
-  # Push out any pending events to the connection +ws+
-  # +notify_id+  the id of the most recent row in the log table, may be nil
-  #
-  # This accepts a websocket and a notify_id (this is the row id from Postgres
-  # LISTEN/NOTIFY, it may be nil if called from somewhere else)
-  #
-  # It queries the database for log rows that are either
-  #  a) greater than ws.last_log_id, which is the last log id which was a candidate to be sent out
-  #  b) if ws.last_log_id is nil, then it queries the row notify_id
-  #
-  # Regular Arvados permissions are applied using readable_by() and filters using record_filters().
-  def push_events ws, notify_id
-    begin
-      # Must have at least one filter set up to receive events
-      if ws.filters.length > 0
-        # Start with log rows readable by user
-        logs = Log.readable_by(ws.user)
-
-        cond_id = nil
-        cond_out = []
-        param_out = []
-
-        if not ws.last_log_id.nil?
-          # We are catching up from some starting point.
-          cond_id = "logs.id > ?"
-          param_out << ws.last_log_id
-        elsif not notify_id.nil?
-          # Get next row being notified.
-          cond_id = "logs.id = ?"
-          param_out << notify_id
-        else
-          # No log id to start from, nothing to do, return
-          return
-        end
-
-        # Now build filters provided by client
-        ws.filters.each do |filter|
-          ft = record_filters filter.filters, Log
-          if ft[:cond_out].any?
-            # Join the clauses within a single subscription filter with AND
-            # so it is consistent with regular queries
-            cond_out << "(#{ft[:cond_out].join ') AND ('})"
-            param_out += ft[:param_out]
-          end
-        end
-
-        # Add filters to query
-        if cond_out.any?
-          # Join subscriptions with OR
-          logs = logs.where(cond_id + " AND ((#{cond_out.join ') OR ('}))", *param_out)
-        else
-          logs = logs.where(cond_id, *param_out)
-        end
-
-        # Execute query and actually send the matching log rows. Load
-        # the full log records only when we're ready to send them,
-        # though: otherwise, (1) postgres has to build the whole
-        # result set and return it to us before we can send the first
-        # event, and (2) we store lots of records in memory while
-        # waiting to spool them out to the client. Both of these are
-        # troublesome when log records are large (e.g., a collection
-        # update contains both old and new manifest_text).
-        #
-        # Note: find_each implies order('id asc'), which is what we
-        # want.
-        logs.select('logs.id').find_each do |l|
-          if not ws.sent_ids.include?(l.id)
-            # only send if not a duplicate
-            send_message(ws, Log.find(l.id).as_api_response)
-          end
-          if not ws.last_log_id.nil?
-            # record ids only when sending "catchup" messages, not notifies
-            ws.sent_ids << l.id
-          end
-        end
-        ws.last_log_id = nil
-      end
-    rescue ArgumentError => e
-      # There was some kind of user error.
-      Rails.logger.warn "Error publishing event: #{$!}"
-      send_message(ws, {status: 500, message: $!})
-      ws.close
-    rescue => e
-      Rails.logger.warn "Error publishing event: #{$!}"
-      Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-      send_message(ws, {status: 500, message: $!})
-      ws.close
-      # These exceptions typically indicate serious server trouble:
-      # out of memory issues, database connection problems, etc.  Go ahead and
-      # crash; we expect that a supervisor service like runit will restart us.
-      raise
-    end
-  end
-
-  # Handle inbound subscribe or unsubscribe message.
-  def handle_message ws, event
-    begin
-      begin
-        # Parse event data as JSON
-        p = SafeJSON.load(event.data).symbolize_keys
-        filter = Filter.new(p)
-      rescue Oj::Error => e
-        send_message(ws, {status: 400, message: "malformed request"})
-        return
-      end
-
-      if p[:method] == 'subscribe'
-        # Handle subscribe event
-
-        if p[:last_log_id]
-          # Set or reset the last_log_id.  The event bus only reports events
-          # for rows that come after last_log_id.
-          ws.last_log_id = p[:last_log_id].to_i
-          # Reset sent_ids for consistency
-          # (always re-deliver all matching messages following last_log_id)
-          ws.sent_ids = Set.new
-        end
-
-        if ws.filters.length < Rails.configuration.websocket_max_filters
-          # Add a filter.  This gets the :filters field which is the same
-          # format as used for regular index queries.
-          ws.filters << filter
-          send_message(ws, {status: 200, message: 'subscribe ok', filter: p})
-
-          # Send any pending events
-          push_events ws, nil
-        else
-          send_message(ws, {status: 403, message: "maximum of #{Rails.configuration.websocket_max_filters} filters allowed per connection"})
-        end
-
-      elsif p[:method] == 'unsubscribe'
-        # Handle unsubscribe event
-
-        len = ws.filters.length
-        ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
-        if ws.filters.length < len
-          send_message(ws, {status: 200, message: 'unsubscribe ok'})
-        else
-          send_message(ws, {status: 404, message: 'filter not found'})
-        end
-
-      else
-        send_message(ws, {status: 400, message: "missing or unrecognized method"})
-      end
-    rescue => e
-      Rails.logger.warn "Error handling message: #{$!}"
-      Rails.logger.warn "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-      send_message(ws, {status: 500, message: 'error'})
-      ws.close
-    end
-  end
-
-  def overloaded?
-    @mtx.synchronize do
-      @connection_count >= Rails.configuration.websocket_max_connections
-    end
-  end
-
-  # Called by RackSocket when a new websocket connection has been established.
-  def on_connect ws
-    # Disconnect if no valid API token.
-    # current_user is included from CurrentApiClient
-    if not current_user
-      send_message(ws, {status: 401, message: "Valid API token required"})
-      # Wait for the handshake to complete before closing the
-      # socket. Otherwise, nginx responds with HTTP 502 Bad gateway,
-      # and the client never sees our real error message.
-      ws.on :open do |event|
-        ws.close
-      end
-      return
-    end
-
-    # Initialize our custom fields on the websocket connection object.
-    ws.user = current_user
-    ws.filters = []
-    ws.last_log_id = nil
-    ws.sent_ids = Set.new
-    ws.queue = Queue.new
-    ws.frame_mtx = Mutex.new
-
-    @mtx.synchronize do
-      @connection_count += 1
-    end
-
-    # Subscribe to internal postgres notifications through @channel and
-    # forward them to the thread associated with the connection.
-    sub = @channel.subscribe do |msg|
-      if ws.queue.length > Rails.configuration.websocket_max_notify_backlog
-        send_message(ws, {status: 500, message: 'Notify backlog too long'})
-        ws.close
-        @channel.unsubscribe sub
-        ws.queue.clear
-      else
-        ws.queue << [:notify, msg]
-      end
-    end
-
-    # Set up callback for inbound message dispatch.
-    ws.on :message do |event|
-      ws.queue << [:message, event]
-    end
-
-    # Set up socket close callback
-    ws.on :close do |event|
-      @channel.unsubscribe sub
-      ws.queue.clear
-      ws.queue << [:close, nil]
-    end
-
-    # Spin off a new thread to handle sending events to the client.  We need a
-    # separate thread per connection so that a slow client doesn't interfere
-    # with other clients.
-    #
-    # We don't want the loop in the request thread because on a TERM signal,
-    # Puma waits for outstanding requests to complete, and long-lived websocket
-    # connections may not complete in a timely manner.
-    Thread.new do
-      # Loop and react to socket events.
-      begin
-        loop do
-          eventType, msg = ws.queue.pop
-          if eventType == :message
-            handle_message ws, msg
-          elsif eventType == :notify
-            push_events ws, msg
-          elsif eventType == :close
-            break
-          end
-        end
-      ensure
-        @mtx.synchronize do
-          @connection_count -= 1
-        end
-        ActiveRecord::Base.connection.close
-      end
-    end
-
-    # Start up thread to monitor the Postgres database, if none exists already.
-    @mtx.synchronize do
-      unless @bgthread
-        @bgthread = true
-        Thread.new do
-          # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
-          ActiveRecord::Base.connection_pool.with_connection do |connection|
-            conn = connection.instance_variable_get(:@connection)
-            begin
-              conn.async_exec "LISTEN logs"
-              while true
-                # wait_for_notify will block until there is a change
-                # notification from Postgres about the logs table, then push
-                # the notification into the EventMachine channel.  Each
-                # websocket connection subscribes to the other end of the
-                # channel and calls #push_events to actually dispatch the
-                # events to the client.
-                conn.wait_for_notify do |channel, pid, payload|
-                  @channel.push payload.to_i
-                end
-              end
-            ensure
-              # Don't want the connection to still be listening once we return
-              # it to the pool - could result in weird behavior for the next
-              # thread to check it out.
-              conn.async_exec "UNLISTEN *"
-            end
-          end
-          @bgthread = false
-        end
-      end
-    end
-
-  end
-end

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list