[ARVADOS] updated: 943feeec776bb75c685cbdee9466f8db3cdf5da7
git at public.curoverse.com
git at public.curoverse.com
Wed Apr 30 11:40:18 EDT 2014
Summary of changes:
.../app/assets/javascripts/application.js | 7 +
.../app/assets/javascripts/collections.js | 59 ++++++++++
.../app/assets/javascripts/collections.js.coffee | 3 -
.../app/assets/stylesheets/collections.css.scss | 21 +++-
.../app/controllers/collections_controller.rb | 34 ++++++-
apps/workbench/app/controllers/users_controller.rb | 36 +++---
apps/workbench/app/helpers/application_helper.rb | 18 ++--
.../app/views/collections/_index_tbody.html.erb | 10 +--
.../app/views/collections/_show_files.html.erb | 12 ++
.../app/views/collections/_show_recent.html.erb | 8 +-
.../app/views/collections/_toggle_persist.html.erb | 3 +
apps/workbench/app/views/users/_tables.html.erb | 120 ++++++++++----------
apps/workbench/config/routes.rb | 4 +-
.../pipeline_instances_controller_test.rb | 18 +++-
.../workbench/test/integration/collections_test.rb | 42 +++++++
sdk/ruby/lib/arvados.rb | 36 +++++--
.../api/app/controllers/application_controller.rb | 20 +++-
.../v1/api_client_authorizations_controller.rb | 32 ++++-
.../arvados/v1/keep_disks_controller.rb | 2 +-
.../app/controllers/arvados/v1/nodes_controller.rb | 4 +-
.../controllers/arvados/v1/schema_controller.rb | 9 +-
.../arvados/v1/virtual_machines_controller.rb | 16 ---
services/api/app/controllers/static_controller.rb | 2 +-
.../app/controllers/user_sessions_controller.rb | 2 +-
services/api/app/middlewares/rack_socket.rb | 7 +-
services/api/app/models/arvados_model.rb | 15 ++-
services/api/app/models/node.rb | 2 +-
services/api/config/application.default.yml | 3 -
services/api/config/application.yml.example | 7 +
services/api/config/initializers/eventbus.rb | 15 ++-
.../20140421151939_rename_auth_keys_user_index.rb | 11 ++
.../db/migrate/20140423133559_new_scope_format.rb | 48 ++++++++
services/api/db/schema.rb | 4 +-
services/api/lib/current_api_client.rb | 19 ++--
services/api/lib/eventbus.rb | 122 +++++++++++---------
services/api/lib/load_param.rb | 16 ++-
services/api/lib/record_filters.rb | 15 ++-
.../test/fixtures/api_client_authorizations.yml | 37 ++++++
services/api/test/fixtures/links.yml | 14 +++
.../api_client_authorizations_controller_test.rb | 30 +++++-
.../functional/arvados/v1/nodes_controller_test.rb | 9 ++
.../api_client_authorizations_scopes_test.rb | 103 +++++++++++++++++
.../api/test/integration/login_workflow_test.rb | 25 ++++
services/api/test/integration/websocket_test.rb | 64 +++++++----
services/api/test/websocket_runner.rb | 2 +-
45 files changed, 821 insertions(+), 265 deletions(-)
create mode 100644 apps/workbench/app/assets/javascripts/collections.js
delete mode 100644 apps/workbench/app/assets/javascripts/collections.js.coffee
create mode 100644 apps/workbench/app/views/collections/_toggle_persist.html.erb
create mode 100644 apps/workbench/test/integration/collections_test.rb
create mode 100644 services/api/db/migrate/20140421151939_rename_auth_keys_user_index.rb
create mode 100644 services/api/db/migrate/20140423133559_new_scope_format.rb
create mode 100644 services/api/test/integration/api_client_authorizations_scopes_test.rb
create mode 100644 services/api/test/integration/login_workflow_test.rb
via 943feeec776bb75c685cbdee9466f8db3cdf5da7 (commit)
via 1433693e08e2d4052cc94f6a5902523b08bbc1ac (commit)
via e20c9587ad703ae8e1f81251a6e209834a52d448 (commit)
via a4724fe92e651abb06acf8c5e75184561a55c854 (commit)
via 208e172287aba3be7cb988a2c416e4281a2e5f60 (commit)
via b0c30631bceb2d1837b52d1d3475e52aee4c9c43 (commit)
via 840e7d7f96f763ae139545dca5d6dfa5a54f6cc6 (commit)
via 8adde5132926e8f9cc1b01d79f9307614cc6021e (commit)
via 8eaad00b025167a7505ba11ad6a05b52a43c2399 (commit)
via c18bde8300e115b215e58d6930d0495b2c33b49f (commit)
via 4e3ac7f8bafde72ae397f79bfc36409e682b13e5 (commit)
via 7b9cff04eb463c666b8126ebc6c4dfcc00a536c0 (commit)
via f845c0645d9136a1e4ad993ecf34a156367e73b7 (commit)
via 03e570095885982d23e234bce8e1c068314b63af (commit)
via 5c2758ca38d01a905253f86e63be3a5fe03a3871 (commit)
via 52c4f2b7fe631f3d7ad16105cb2f86cf6c004fc8 (commit)
via e54bdba73b65e31b03fd1d43bfe69d0f43bbd8d8 (commit)
via 0eb59e3acf9f13e89bd010f7f65a4d31554183fc (commit)
via 915de6c854cd559ebd029b24939149e37b18c8cf (commit)
via 71b1b7b045419817d1c9dc62a3a296b746d9117c (commit)
via 7fbcd989af9949b11ddfec0c9ebfaa96a655eef4 (commit)
via c3457ad20bfd00c99facef396f1dbdbcbdbad241 (commit)
via 4fa5a3d3958eae7ecce681e0ea7a4ac8cc43e48d (commit)
via a28be6d77750a6cbded994612d95ee973cb4c01e (commit)
via 58dd340f19c3e0225ad189e60a58872d3aa3f7c2 (commit)
via 1c5176d87df0dbd25db6ff1fb2ab82ae17472145 (commit)
via 5d356c84dcf8f3a2129cce0f4fbfc50f16a8a339 (commit)
via f6f07b1856fc0aa32cdf1f96d5bdeb12e9e314ef (commit)
via 919cf4c2c9f803f0cdb80619bcbf5d23b7c02d25 (commit)
via 6c0b68255c0c66d90c0473561461d9215860f018 (commit)
via 9e97504e5f65c8a3a6f1f34da678ac7e1828e8e1 (commit)
via 62db3393df8d6bc7c48a3c5101468361cb2f3cf3 (commit)
via e062efe37ddf066af1c5c762b126bb72766fee25 (commit)
via 20334fa95bb7d554c09225c02fae3d4e83c6c6c5 (commit)
via 7bc20af4935fee1caa566e86a074022f0b60d166 (commit)
from b421d5c4754315cdd8b70b6bbea5b5f23fb425de (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 943feeec776bb75c685cbdee9466f8db3cdf5da7
Merge: 1433693 840e7d7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Apr 30 11:40:09 2014 -0400
Merge remote-tracking branch 'origin/master' into origin-2608-websocket-event-bus-alt2
Conflicts:
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/db/schema.rb
diff --cc services/api/app/controllers/application_controller.rb
index 1368eeb,d40c6de..3891bde
--- a/services/api/app/controllers/application_controller.rb
+++ b/services/api/app/controllers/application_controller.rb
@@@ -9,9 -4,11 +9,10 @@@ class ApplicationController < ActionCon
respond_to :json
protect_from_forgery
- around_filter :thread_with_auth_info, :except => [:render_error, :render_not_found]
+ before_filter :respond_with_json_by_default
before_filter :remote_ip
- before_filter :require_auth_scope_all, :except => :render_not_found
+ before_filter :require_auth_scope, :except => :render_not_found
before_filter :catch_redirect_hint
before_filter :find_object_by_uuid, :except => [:index, :create,
@@@ -290,13 -413,74 +294,17 @@@
end
end
- def require_auth_scope_all
- require_login and require_auth_scope(['all'])
+ def require_auth_scope
+ return false unless require_login
+ unless current_api_client_auth_has_scope("#{request.method} #{request.path}")
+ render :json => { errors: ['Forbidden'] }.to_json, status: 403
+ end
end
- def require_auth_scope(ok_scopes)
- unless current_api_client_auth_has_scope(ok_scopes)
- render :json => { errors: ['Forbidden'] }.to_json, status: 403
- def thread_with_auth_info
- Thread.current[:request_starttime] = Time.now
- Thread.current[:api_url_base] = root_url.sub(/\/$/,'') + '/arvados/v1'
- begin
- user = nil
- api_client = nil
- api_client_auth = nil
- supplied_token =
- params[:api_token] ||
- params[:oauth_token] ||
- request.headers["Authorization"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
- if supplied_token
- api_client_auth = ApiClientAuthorization.
- includes(:api_client, :user).
- where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
- first
- if api_client_auth.andand.user
- session[:user_id] = api_client_auth.user.id
- session[:api_client_uuid] = api_client_auth.api_client.andand.uuid
- session[:api_client_authorization_id] = api_client_auth.id
- user = api_client_auth.user
- api_client = api_client_auth.api_client
- else
- # Token seems valid, but points to a non-existent (deleted?) user.
- api_client_auth = nil
- end
- elsif session[:user_id]
- user = User.find(session[:user_id]) rescue nil
- api_client = ApiClient.
- where('uuid=?',session[:api_client_uuid]).
- first rescue nil
- if session[:api_client_authorization_id] then
- api_client_auth = ApiClientAuthorization.
- find session[:api_client_authorization_id]
- end
- end
- Thread.current[:api_client_ip_address] = remote_ip
- Thread.current[:api_client_authorization] = api_client_auth
- Thread.current[:api_client_uuid] = api_client.andand.uuid
- Thread.current[:api_client] = api_client
- Thread.current[:user] = user
- if api_client_auth
- api_client_auth.last_used_at = Time.now
- api_client_auth.last_used_by_ip_address = remote_ip
- api_client_auth.save validate: false
- end
- yield
- ensure
- Thread.current[:api_client_ip_address] = nil
- Thread.current[:api_client_authorization] = nil
- Thread.current[:api_client_uuid] = nil
- Thread.current[:api_client] = nil
- Thread.current[:user] = nil
- end
- end
- # /Authentication
-
+ def respond_with_json_by_default
+ html_index = request.accepts.index(Mime::HTML)
+ if html_index.nil? or request.accepts[0...html_index].include?(Mime::JSON)
+ request.format = :json
end
end
diff --cc services/api/lib/load_param.rb
index 9c85757,0000000..e01063d
mode 100644,000000..100644
--- a/services/api/lib/load_param.rb
+++ b/services/api/lib/load_param.rb
@@@ -1,84 -1,0 +1,88 @@@
+# Mixin module for reading out query parameters from request params.
+#
+# Expects:
+# +params+ Hash
+# Sets:
+# @where, @filters, @limit, @offset, @orders
+module LoadParam
+
+ # Default limit on number of rows to return in a single query.
+ DEFAULT_LIMIT = 100
+
+ # Load params[:where] into @where
+ def load_where_param
+ if params[:where].nil? or params[:where] == ""
+ @where = {}
+ elsif params[:where].is_a? Hash
+ @where = params[:where]
+ elsif params[:where].is_a? String
+ begin
+ @where = Oj.load(params[:where])
+ raise unless @where.is_a? Hash
+ rescue
+ raise ArgumentError.new("Could not parse \"where\" param as an object")
+ end
+ end
+ @where = @where.with_indifferent_access
+ end
+
+ # Load params[:filters] into @filters
+ def load_filters_param
+ @filters ||= []
+ if params[:filters].is_a? Array
+ @filters += params[:filters]
+ elsif params[:filters].is_a? String and !params[:filters].empty?
+ begin
+ f = Oj.load params[:filters]
+ raise unless f.is_a? Array
+ @filters += f
+ rescue
+ raise ArgumentError.new("Could not parse \"filters\" param as an array")
+ end
+ end
+ end
+
++ def default_orders
++ ["#{table_name}.modified_at desc"]
++ end
++
+ # Load params[:limit], params[:offset] and params[:order]
+ # into @limit, @offset, @orders
+ def load_limit_offset_order_params
+ if params[:limit]
+ unless params[:limit].to_s.match(/^\d+$/)
+ raise ArgumentError.new("Invalid value for limit parameter")
+ end
+ @limit = params[:limit].to_i
+ else
+ @limit = DEFAULT_LIMIT
+ end
+
+ if params[:offset]
+ unless params[:offset].to_s.match(/^\d+$/)
+ raise ArgumentError.new("Invalid value for offset parameter")
+ end
+ @offset = params[:offset].to_i
+ else
+ @offset = 0
+ end
+
+ @orders = []
+ if params[:order]
+ params[:order].split(',').each do |order|
+ attr, direction = order.strip.split " "
+ direction ||= 'asc'
+ if attr.match /^[a-z][_a-z0-9]+$/ and
+ model_class.columns.collect(&:name).index(attr) and
+ ['asc','desc'].index direction.downcase
+ @orders << "#{table_name}.#{attr} #{direction.downcase}"
+ end
+ end
+ end
+ if @orders.empty?
- @orders << "#{table_name}.modified_at desc"
++ @orders << default_orders
+ end
+ end
+
+
+end
commit 1433693e08e2d4052cc94f6a5902523b08bbc1ac
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Apr 30 11:26:58 2014 -0400
Moved message handling out to a separate method for clarity in eventbus.
Fixing name mismatch /websockets => /websocket in configuration
Added guards to various tests to ensure no more messages are processed after
the test is supposed to be done.
diff --git a/services/api/app/controllers/arvados/v1/schema_controller.rb b/services/api/app/controllers/arvados/v1/schema_controller.rb
index c9b18ae..6cdedf6 100644
--- a/services/api/app/controllers/arvados/v1/schema_controller.rb
+++ b/services/api/app/controllers/arvados/v1/schema_controller.rb
@@ -73,7 +73,7 @@ class Arvados::V1::SchemaController < ApplicationController
if Rails.application.config.websocket_address
discovery[:websocketUrl] = Rails.application.config.websocket_address
elsif ENV['ARVADOS_WEBSOCKETS']
- discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websockets"
+ discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websocket"
end
ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
diff --git a/services/api/config/application.yml.example b/services/api/config/application.yml.example
index b2b9238..2705fa1 100644
--- a/services/api/config/application.yml.example
+++ b/services/api/config/application.yml.example
@@ -45,4 +45,4 @@ common:
# the websocket server. When you do this, you need to set the following
# configuration variable so that the primary server can give out the correct
# address of the dedicated websocket server:
- #websocket_address: wss://websocket.local/websockets
+ #websocket_address: wss://websocket.local/websocket
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index e04f1fe..7da8ade 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -7,7 +7,7 @@ Server::Application.configure do
if ENV['ARVADOS_WEBSOCKETS']
config.middleware.insert_after ArvadosApiToken, RackSocket, {
:handler => EventBus,
- :mount => "/websockets",
+ :mount => "/websocket",
:websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
}
end
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 79315aa..0b8cae2 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -99,6 +99,57 @@ class EventBus
end
end
+ # Handle inbound subscribe or unsubscribe message.
+ def handle_message ws, event
+ begin
+ # Parse event data as JSON
+ p = (Oj.load event.data).symbolize_keys
+
+ if p[:method] == 'subscribe'
+ # Handle subscribe event
+
+ if p[:last_log_id]
+ # Set or reset the last_log_id. The event bus only reports events
+ # for rows that come after last_log_id.
+ ws.last_log_id = p[:last_log_id].to_i
+ end
+
+ if ws.filters.length < MAX_FILTERS
+ # Add a filter. This gets the :filters field which is the same
+ # format as used for regular index queries.
+ ws.filters << Filter.new(p)
+ ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+
+ # Send any pending events
+ push_events ws
+ else
+ ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+ end
+
+ elsif p[:method] == 'unsubscribe'
+ # Handle unsubscribe event
+
+ len = ws.filters.length
+ ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
+ if ws.filters.length < len
+ ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+ else
+ ws.send ({status: 404, message: 'filter not found'}.to_json)
+ end
+
+ else
+ ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+ end
+ rescue Oj::Error => e
+ ws.send ({status: 400, message: "malformed request"}.to_json)
+ rescue Exception => e
+ puts "Error handling message: #{$!}"
+ puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+ ws.send ({status: 500, message: 'error'}.to_json)
+ ws.close
+ end
+ end
+
# Constant maximum number of filters, to avoid silly huge database queries.
MAX_FILTERS = 16
@@ -126,53 +177,7 @@ class EventBus
# Set up callback for inbound message dispatch.
ws.on :message do |event|
- begin
- # Parse event data as JSON
- p = (Oj.load event.data).symbolize_keys
-
- if p[:method] == 'subscribe'
- # Handle subscribe event
-
- if p[:last_log_id]
- # Set or reset the last_log_id. The event bus only reports events
- # for rows that come after last_log_id.
- ws.last_log_id = p[:last_log_id].to_i
- end
-
- if ws.filters.length < MAX_FILTERS
- # Add a filter. This gets the :filters field which is the same
- # format as used for regular index queries.
- ws.filters << Filter.new(p)
- ws.send ({status: 200, message: 'subscribe ok'}.to_json)
-
- # Send any pending events
- push_events ws
- else
- ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
- end
-
- elsif p[:method] == 'unsubscribe'
- # Handle unsubscribe event
-
- len = ws.filters.length
- ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
- if ws.filters.length < len
- ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
- else
- ws.send ({status: 404, message: 'filter not found'}.to_json)
- end
-
- else
- ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
- end
- rescue Oj::Error => e
- ws.send ({status: 400, message: "malformed request"}.to_json)
- rescue Exception => e
- puts "Error handling message: #{$!}"
- puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
- ws.send ({status: 500, message: 'error'}.to_json)
- ws.close
- end
+ handle_message ws, event
end
# Set up socket close callback
@@ -209,6 +214,7 @@ class EventBus
conn.async_exec "UNLISTEN *"
end
end
+ @bgthread = false
end
end
end
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index 4761800..9ce53a6 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -142,7 +142,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
state = 3
when 3
human_ev_uuid = d["object_uuid"]
+ state = 4
ws.close
+ when 4
+ assert false, "Should not get any more events"
end
end
@@ -176,7 +179,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
state = 2
when 2
human_ev_uuid = d["object_uuid"]
+ state = 3
ws.close
+ when 3
+ assert false, "Should not get any more events"
end
end
@@ -219,7 +225,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
state = 4
when 4
human_ev_uuid = d["object_uuid"]
+ state = 5
ws.close
+ when 5
+ assert false, "Should not get any more events"
end
end
@@ -260,7 +269,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
when 3
l2 = d["object_uuid"]
assert_not_nil l2, "Unexpected message: #{d}"
+ state = 4
ws.close
+ when 4
+ assert false, "Should not get any more events"
end
end
@@ -403,7 +415,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
state = 4
when 4
human_ev_uuid = d["object_uuid"]
+ state = 5
ws.close
+ when 5
+ assert false, "Should not get any more events"
end
end
commit e20c9587ad703ae8e1f81251a6e209834a52d448
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Apr 30 10:56:22 2014 -0400
Short-circuts #readable_by when the user is admin.
diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb
index c87a909..23d0222 100644
--- a/services/api/app/models/arvados_model.rb
+++ b/services/api/app/models/arvados_model.rb
@@ -75,6 +75,11 @@ class ArvadosModel < ActiveRecord::Base
# end
def self.readable_by user
+ if user.is_admin
+ # Admins can read anything, so return immediately.
+ return self
+ end
+
uuid_list = [user.uuid, *user.groups_i_can(:read)]
sanitized_uuid_list = uuid_list.
collect { |uuid| sanitize(uuid) }.join(', ')
@@ -100,8 +105,6 @@ class ArvadosModel < ActiveRecord::Base
# Link is any permission link ('write' and 'manage' implicitly include 'read')
# The existence of such a link is tested in the where clause as permissions.head_uuid IS NOT NULL.
# or
- # User is admin
- # or
# This row is owned by this user, or owned by a group readable by this user
# or
# This is the users table
@@ -115,8 +118,8 @@ class ArvadosModel < ActiveRecord::Base
# This object described by this row is owned by this user, or owned by a group readable by this user
joins("LEFT JOIN links permissions ON permissions.head_uuid in (#{table_name}.owner_uuid, #{table_name}.uuid #{or_object_uuid}) AND permissions.tail_uuid in (#{sanitized_uuid_list}) AND permissions.link_class='permission'").
- where("permissions.head_uuid IS NOT NULL OR ?=? OR #{table_name}.owner_uuid in (?) #{or_row_is_me} #{or_references_me} #{or_object_owner}",
- user.is_admin, true, uuid_list).uniq
+ where("permissions.head_uuid IS NOT NULL OR #{table_name}.owner_uuid in (?) #{or_row_is_me} #{or_references_me} #{or_object_owner}",
+ uuid_list).uniq
end
def logged_attributes
commit a4724fe92e651abb06acf8c5e75184561a55c854
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Apr 30 10:56:04 2014 -0400
Unsubscribe message now takes filter definition instead of filter_id, reducing
the state both the client and server have to maintain in order to support
unsubscribing. Added code comments and updated tests.
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 8480cf4..79315aa 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -20,19 +20,14 @@ class Filter
attr_accessor :filters
- def initialize p, fid
+ def initialize p
@params = p
- @filter_id = fid
load_filters_param
end
def params
@params
end
-
- def filter_id
- @filter_id
- end
end
# Manages websocket connections, accepts subscription messages and publishes
@@ -51,12 +46,6 @@ class EventBus
@channel = EventMachine::Channel.new
@mtx = Mutex.new
@bgthread = false
- @filter_id_counter = 0
- end
-
- # Allocate a new filter id
- def alloc_filter_id
- @filter_id_counter += 1
end
# Push out any pending events to the connection +ws+
@@ -69,7 +58,8 @@ class EventBus
logs = Log.readable_by(ws.user).order("id asc")
if ws.last_log_id
- # Only interested in log rows that are new
+ # Client is only interested in log rows that are newer than the
+ # last log row seen by the client.
logs = logs.where("logs.id > ?", ws.last_log_id)
elsif id
# No last log id, so only look at the most recently changed row
@@ -92,7 +82,7 @@ class EventBus
logs = logs.where(cond_out.join(' OR '), *param_out)
end
- # Finally execute query and send matching rows
+ # Finally execute query and actually send the matching log rows
logs.each do |l|
ws.send(l.as_api_response.to_json)
ws.last_log_id = l.id
@@ -137,33 +127,41 @@ class EventBus
# Set up callback for inbound message dispatch.
ws.on :message do |event|
begin
+ # Parse event data as JSON
p = (Oj.load event.data).symbolize_keys
+
if p[:method] == 'subscribe'
+ # Handle subscribe event
+
if p[:last_log_id]
+ # Set or reset the last_log_id. The event bus only reports events
+ # for rows that come after last_log_id.
ws.last_log_id = p[:last_log_id].to_i
end
if ws.filters.length < MAX_FILTERS
- filter_id = alloc_filter_id
- ws.filters.push Filter.new(p, filter_id)
- ws.send ({status: 200, message: 'subscribe ok', filter_id: filter_id}.to_json)
+ # Add a filter. This gets the :filters field which is the same
+ # format as used for regular index queries.
+ ws.filters << Filter.new(p)
+ ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+
+ # Send any pending events
push_events ws
else
ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
end
+
elsif p[:method] == 'unsubscribe'
- if filter_id = p[:filter_id]
- filter_id = filter_id.to_i
- len = ws.filters.length
- ws.filters = ws.filters.select { |f| f.filter_id != filter_id }
- if ws.filters.length < len
- ws.send ({status: 200, message: 'unsubscribe ok', filter_id: filter_id}.to_json)
- else
- ws.send ({status: 404, message: 'filter_id not found', filter_id: filter_id}.to_json)
- end
+ # Handle unsubscribe event
+
+ len = ws.filters.length
+ ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
+ if ws.filters.length < len
+ ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
else
- ws.send ({status: 400, message: 'must provide filter_id'}.to_json)
+ ws.send ({status: 404, message: 'filter not found'}.to_json)
end
+
else
ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
end
@@ -177,11 +175,13 @@ class EventBus
end
end
+ # Set up socket close callback
ws.on :close do |event|
@channel.unsubscribe sub
ws = nil
end
+ # Start up thread to monitor the Postgres database, if none exists already.
@mtx.synchronize do
unless @bgthread
@bgthread = true
@@ -192,6 +192,12 @@ class EventBus
begin
conn.async_exec "LISTEN logs"
while true
+ # wait_for_notify will block until there is a change
+ # notification from Postgres about the logs table, then push
+ # the notification into the EventMachine channel. Each
+ # websocket connection subscribes to the other end of the
+ # channel and calls #push_events to actually dispatch the
+ # events to the client.
conn.wait_for_notify do |channel, pid, payload|
@channel.push payload
end
@@ -206,5 +212,9 @@ class EventBus
end
end
end
+
+ # Since EventMachine is an asynchronous event based dispatcher, #on_connect
+ # does not block but instead returns immediately after having set up the
+ # websocket and notification channel callbacks.
end
end
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index 3bba0ef..4761800 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -211,6 +211,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
when 2
assert_equal 200, d["status"]
spec = Specimen.create
+ Trait.create # not part of filters, should not be received
human = Human.create
state = 3
when 3
@@ -281,6 +282,9 @@ class WebsocketTest < ActionDispatch::IntegrationTest
ws.on :open do |event|
ws.send ({method: 'subscribe'}.to_json)
EM::Timer.new 3 do
+ # Set a time limit on the test because after unsubscribing the server
+ # still has to process the next event (and then hopefully correctly
+ # decides not to send it because we unsubscribed.)
ws.close
end
end
@@ -290,15 +294,14 @@ class WebsocketTest < ActionDispatch::IntegrationTest
case state
when 1
assert_equal 200, d["status"]
- filter_id = d["filter_id"]
spec = Specimen.create
state = 2
when 2
spec_ev_uuid = d["object_uuid"]
- ws.send ({method: 'unsubscribe', filter_id: filter_id}.to_json)
+ ws.send ({method: 'unsubscribe'}.to_json)
EM::Timer.new 1 do
- Human.create
+ Specimen.create
end
state = 3
@@ -316,19 +319,22 @@ class WebsocketTest < ActionDispatch::IntegrationTest
assert_equal spec.uuid, spec_ev_uuid
end
-
- test "connect, subscribe, get event, try to unsubscribe with bogus filter_id" do
+ test "connect, subscribe, get event, unsubscribe with filter" do
state = 1
spec = nil
spec_ev_uuid = nil
- human = nil
- human_ev_uuid = nil
authorize_with :admin
- ws_helper :admin do |ws|
+ ws_helper :admin, false do |ws|
ws.on :open do |event|
- ws.send ({method: 'subscribe'}.to_json)
+ ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+ EM::Timer.new 3 do
+ # Set a time limit on the test because after unsubscribing the server
+ # still has to process the next event (and then hopefully correctly
+ # decides not to send it because we unsubscribed.)
+ ws.close
+ end
end
ws.on :message do |event|
@@ -336,35 +342,33 @@ class WebsocketTest < ActionDispatch::IntegrationTest
case state
when 1
assert_equal 200, d["status"]
- spec = Specimen.create
+ spec = Human.create
state = 2
when 2
spec_ev_uuid = d["object_uuid"]
- ws.send ({method: 'unsubscribe', filter_id: 100000}.to_json)
+ ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
EM::Timer.new 1 do
- human = Human.create
+ Human.create
end
state = 3
when 3
- assert_equal 404, d["status"]
+ assert_equal 200, d["status"]
state = 4
when 4
- human_ev_uuid = d["object_uuid"]
- ws.close
+ assert false, "Should not get any more events"
end
end
end
assert_not_nil spec
- assert_not_nil human
assert_equal spec.uuid, spec_ev_uuid
- assert_equal human.uuid, human_ev_uuid
end
- test "connect, subscribe, get event, try to unsubscribe with missing filter_id" do
+
+ test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
state = 1
spec = nil
spec_ev_uuid = nil
@@ -387,7 +391,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
state = 2
when 2
spec_ev_uuid = d["object_uuid"]
- ws.send ({method: 'unsubscribe'}.to_json)
+ ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
EM::Timer.new 1 do
human = Human.create
@@ -395,7 +399,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
state = 3
when 3
- assert_equal 400, d["status"]
+ assert_equal 404, d["status"]
state = 4
when 4
human_ev_uuid = d["object_uuid"]
@@ -412,6 +416,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
end
+
test "connected, not subscribed, no event" do
authorize_with :admin
@@ -532,10 +537,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
ws.on :message do |event|
d = Oj.load event.data
case state
- when (1..16)
+ when (1..EventBus::MAX_FILTERS)
assert_equal 200, d["status"]
state += 1
- when 17
+ when (EventBus::MAX_FILTERS+1)
assert_equal 403, d["status"]
ws.close
end
diff --git a/services/api/test/websocket_runner.rb b/services/api/test/websocket_runner.rb
index c35938e..df72e24 100644
--- a/services/api/test/websocket_runner.rb
+++ b/services/api/test/websocket_runner.rb
@@ -6,7 +6,7 @@ SERVER_PID_PATH = 'tmp/pids/passenger.3002.pid'
class WebsocketTestRunner < MiniTest::Unit
def _system(*cmd)
Bundler.with_clean_env do
- if not system({'ARVADOS_WEBSOCKETS' => '1', 'RAILS_ENV' => 'test'}, *cmd)
+ if not system({'ARVADOS_WEBSOCKETS' => 'ws-only', 'RAILS_ENV' => 'test'}, *cmd)
raise RuntimeError, "#{cmd[0]} returned exit code #{$?.exitstatus}"
end
end
commit 208e172287aba3be7cb988a2c416e4281a2e5f60
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Apr 30 10:54:37 2014 -0400
Added code comments.
diff --git a/services/api/app/middlewares/rack_socket.rb b/services/api/app/middlewares/rack_socket.rb
index 892dbf5..795df4a 100644
--- a/services/api/app/middlewares/rack_socket.rb
+++ b/services/api/app/middlewares/rack_socket.rb
@@ -19,12 +19,13 @@ class RackSocket
# +app+ The next layer of the Rack stack.
#
# Accepts options:
- # +:handler+ (Required) A class to handle new connections. Initialize will
+ # +:handler+ (Required) A class to handle new connections. #initialize will
# call handler.new to create the actual handler instance object. When a new
# websocket connection is established, #on_connect on the handler instance
- # object to notify it about the connection.
+ # object will be called with the new connection.
#
- # +:mount+ The path for websocket connect requests, defaults to '/websocket'.
+ # +:mount+ The HTTP request path that will be recognized for websocket
+ # connect requests, defaults to '/websocket'.
#
# +:websocket_only+ If true, the server will only handle websocket requests,
# and all other requests will result in an error. If false, unhandled
diff --git a/services/api/lib/load_param.rb b/services/api/lib/load_param.rb
index dba0582..9c85757 100644
--- a/services/api/lib/load_param.rb
+++ b/services/api/lib/load_param.rb
@@ -1,12 +1,15 @@
+# Mixin module for reading out query parameters from request params.
+#
# Expects:
# +params+ Hash
# Sets:
-# @where, @filters
-
+# @where, @filters, @limit, @offset, @orders
module LoadParam
+ # Default limit on number of rows to return in a single query.
DEFAULT_LIMIT = 100
+ # Load params[:where] into @where
def load_where_param
if params[:where].nil? or params[:where] == ""
@where = {}
@@ -23,6 +26,7 @@ module LoadParam
@where = @where.with_indifferent_access
end
+ # Load params[:filters] into @filters
def load_filters_param
@filters ||= []
if params[:filters].is_a? Array
@@ -38,6 +42,8 @@ module LoadParam
end
end
+ # Load params[:limit], params[:offset] and params[:order]
+ # into @limit, @offset, @orders
def load_limit_offset_order_params
if params[:limit]
unless params[:limit].to_s.match(/^\d+$/)
diff --git a/services/api/lib/record_filters.rb b/services/api/lib/record_filters.rb
index 3f0a845..d7e556b 100644
--- a/services/api/lib/record_filters.rb
+++ b/services/api/lib/record_filters.rb
@@ -1,11 +1,20 @@
+# Mixin module providing a method to convert filters into a list of SQL
+# fragments suitable to be fed to ActiveRecord #where.
+#
# Expects:
-# @where
-# @filters
-# +model_class+
+# model_class
# Operates on:
# @objects
module RecordFilters
+ # Input:
+ # +filters+ Arvados filters as list of lists.
+ # +ar_table_name+ name of SQL table
+ #
+ # Output:
+ # Hash with two keys:
+ # :cond_out array of SQL fragments for each filter expression
+ # :param_out array of values for parameter substitution in cond_out
def record_filters filters, ar_table_name
cond_out = []
param_out = []
commit b0c30631bceb2d1837b52d1d3475e52aee4c9c43
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Apr 30 10:53:35 2014 -0400
Now supports both websocket integrated (ARVADOS_WEBSOCKETS defined) and
websocket-only (ARVADOS_WEBSOCKETS=ws-only) server modes. Added comment to
application.yml.example about setting websocket_address when running in
websocket-only mode.
diff --git a/services/api/app/controllers/arvados/v1/schema_controller.rb b/services/api/app/controllers/arvados/v1/schema_controller.rb
index b8f0594..c9b18ae 100644
--- a/services/api/app/controllers/arvados/v1/schema_controller.rb
+++ b/services/api/app/controllers/arvados/v1/schema_controller.rb
@@ -21,7 +21,6 @@ class Arvados::V1::SchemaController < ApplicationController
documentationLink: "http://doc.arvados.org/api/index.html",
protocol: "rest",
baseUrl: root_url + "/arvados/v1/",
- websocketUrl: Rails.application.config.websocket_address,
basePath: "/arvados/v1/",
rootUrl: root_url,
servicePath: "arvados/v1/",
@@ -71,6 +70,12 @@ class Arvados::V1::SchemaController < ApplicationController
resources: {}
}
+ if Rails.application.config.websocket_address
+ discovery[:websocketUrl] = Rails.application.config.websocket_address
+ elsif ENV['ARVADOS_WEBSOCKETS']
+ discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websockets"
+ end
+
ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
begin
ctl_class = "Arvados::V1::#{k.to_s.pluralize}Controller".constantize
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index c05b4af..37bb1c3 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -90,9 +90,6 @@ common:
# Visitors to the API server will be redirected to the workbench
workbench_address: https://workbench.local:3001/
- # Websocket endpoint
- websocket_address: wss://localhost:3002/websocket
-
# The e-mail address of the user you would like to become marked as an admin
# user on their first login.
# In the default configuration, authentication happens through the Arvados SSO
diff --git a/services/api/config/application.yml.example b/services/api/config/application.yml.example
index 9162fc4..b2b9238 100644
--- a/services/api/config/application.yml.example
+++ b/services/api/config/application.yml.example
@@ -39,3 +39,10 @@ test:
common:
#git_repositories_dir: /var/cache/git
#git_internal_dir: /var/cache/arvados/internal.git
+
+ # You can run the websocket server separately from the regular HTTP service
+ # by setting "ARVADOS_WEBSOCKETS=ws-only" in the environment before running
+ # the websocket server. When you do this, you need to set the following
+ # configuration variable so that the primary server can give out the correct
+ # address of the dedicated websocket server:
+ #websocket_address: wss://websocket.local/websockets
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index 2902403..e04f1fe 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -1,7 +1,18 @@
require 'eventbus'
Server::Application.configure do
- if ENV['ARVADOS_WEBSOCKETS'] == '1'
- config.middleware.insert_after ArvadosApiToken, RackSocket, {:handler => EventBus, :websocket_only => true }
+ # Enables websockets if ARVADOS_WEBSOCKETS is defined with any value. If
+ # ARVADOS_WEBSOCKETS=ws-only, server will only accept websocket connections
+ # and return an error response for all other requests.
+ if ENV['ARVADOS_WEBSOCKETS']
+ config.middleware.insert_after ArvadosApiToken, RackSocket, {
+ :handler => EventBus,
+ :mount => "/websockets",
+ :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
+ }
end
+
+ # Define websocket_address configuration option, can be overridden in config files.
+ # See application.yml.example for details.
+ config.websocket_address = nil
end
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list