[ARVADOS] updated: b421d5c4754315cdd8b70b6bbea5b5f23fb425de
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 29 17:10:41 EDT 2014
Summary of changes:
services/api/app/middlewares/arvados_api_token.rb | 11 +++++++-
services/api/app/middlewares/rack_socket.rb | 25 +++++++++++++++++
services/api/lib/eventbus.rb | 30 ++++++++++++++++----
3 files changed, 59 insertions(+), 7 deletions(-)
via b421d5c4754315cdd8b70b6bbea5b5f23fb425de (commit)
from e537bd8dd1ac786164f192374e0d076bdc0327f3 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b421d5c4754315cdd8b70b6bbea5b5f23fb425de
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Apr 29 17:10:37 2014 -0400
Adding more code documentation.
diff --git a/services/api/app/middlewares/arvados_api_token.rb b/services/api/app/middlewares/arvados_api_token.rb
index 24d013c..57d3ad0 100644
--- a/services/api/app/middlewares/arvados_api_token.rb
+++ b/services/api/app/middlewares/arvados_api_token.rb
@@ -1,10 +1,19 @@
+# Perform api_token checking very early in the request process. We want to do
+# this in the Rack stack instead of in ApplicationController because
+# websockets needs access to authentication but doesn't use any of the rails
+# active dispatch infrastructure.
class ArvadosApiToken
+
+ # Create a new ArvadosApiToken handler
+ # +app+ The next layer of the Rack stack.
def initialize(app = nil, options = nil)
@app = app if app.respond_to?(:call)
end
def call env
- # first, clean up just in case
+ # First, clean up just in case we have a multithreaded server and thread
+ # local variables are still set from a prior request. Also useful for
+ # tests that call this code to set up the environment.
Thread.current[:api_client_ip_address] = nil
Thread.current[:api_client_authorization] = nil
Thread.current[:api_client_uuid] = nil
diff --git a/services/api/app/middlewares/rack_socket.rb b/services/api/app/middlewares/rack_socket.rb
index e2ca570..892dbf5 100644
--- a/services/api/app/middlewares/rack_socket.rb
+++ b/services/api/app/middlewares/rack_socket.rb
@@ -2,15 +2,34 @@ require 'rack'
require 'faye/websocket'
require 'eventmachine'
+# A Rack middleware to handle inbound websocket connection requests and hand
+# them over to the faye websocket library.
class RackSocket
DEFAULT_ENDPOINT = '/websocket'
+ # Stop EventMachine on signal, this should give it a chance to to unwind any
+ # open connections.
def die_gracefully_on_signal
Signal.trap("INT") { EM.stop }
Signal.trap("TERM") { EM.stop }
end
+ # Create a new RackSocket handler
+ # +app+ The next layer of the Rack stack.
+ #
+ # Accepts options:
+ # +:handler+ (Required) A class to handle new connections. Initialize will
+ # call handler.new to create the actual handler instance object. When a new
+ # websocket connection is established, #on_connect on the handler instance
+ # object to notify it about the connection.
+ #
+ # +:mount+ The path for websocket connect requests, defaults to '/websocket'.
+ #
+ # +:websocket_only+ If true, the server will only handle websocket requests,
+ # and all other requests will result in an error. If false, unhandled
+ # non-websocket requests will be passed along on to 'app' in the usual Rack
+ # way.
def initialize(app = nil, options = nil)
@app = app if app.respond_to?(:call)
@options = [app, options].grep(Hash).first || {}
@@ -38,14 +57,20 @@ class RackSocket
}
end
+ # Create actual handler instance object from handler class.
@handler = @options[:handler].new
end
+ # Handle websocket connection request, or pass on to the next middleware
+ # supplied in +app+ initialize (unless +:websocket_only+ option is true, in
+ # which case return an error response.)
+ # +env+ the Rack environment with information about the request.
def call env
request = Rack::Request.new(env)
if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env)
+ # Notify handler about new connection
@handler.on_connect ws
# Return async Rack response
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index b36378b..8480cf4 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -4,6 +4,7 @@ require 'faye/websocket'
require 'record_filters'
require 'load_param'
+# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
module Faye
class WebSocket
attr_accessor :user
@@ -12,6 +13,8 @@ module Faye
end
end
+# Store the filters supplied by the user that will be applied to the logs table
+# to determine which events to return to the listener.
class Filter
include LoadParam
@@ -32,6 +35,8 @@ class Filter
end
end
+# Manages websocket connections, accepts subscription messages and publishes
+# log table events.
class EventBus
include CurrentApiClient
include RecordFilters
@@ -41,6 +46,7 @@ class EventBus
Log
end
+ # Initialize EventBus. Takes no parameters.
def initialize
@channel = EventMachine::Channel.new
@mtx = Mutex.new
@@ -48,11 +54,14 @@ class EventBus
@filter_id_counter = 0
end
+ # Allocate a new filter id
def alloc_filter_id
- (@filter_id_counter += 1)
+ @filter_id_counter += 1
end
- def push_events ws, msg = nil
+ # Push out any pending events to the connection +ws+
+ # +id+ the id of the most recent row in the log table, may be nil
+ def push_events ws, id = nil
begin
# Must have at least one filter set up to receive events
if ws.filters.length > 0
@@ -62,9 +71,9 @@ class EventBus
if ws.last_log_id
# Only interested in log rows that are new
logs = logs.where("logs.id > ?", ws.last_log_id)
- elsif msg
+ elsif id
# No last log id, so only look at the most recently changed row
- logs = logs.where("logs.id = ?", msg.to_i)
+ logs = logs.where("logs.id = ?", id.to_i)
else
return
end
@@ -88,9 +97,9 @@ class EventBus
ws.send(l.as_api_response.to_json)
ws.last_log_id = l.id
end
- elsif msg
+ elsif id
# No filters set up, so just record the sequence number
- ws.last_log_id = msg.to_i
+ ws.last_log_id = id.to_i
end
rescue Exception => e
puts "Error publishing event: #{$!}"
@@ -100,23 +109,32 @@ class EventBus
end
end
+ # Constant maximum number of filters, to avoid silly huge database queries.
MAX_FILTERS = 16
+ # Called by RackSocket when a new websocket connection has been established.
def on_connect ws
+
+ # Disconnect if no valid API token.
+ # current_user is included from CurrentApiClient
if not current_user
ws.send ({status: 401, message: "Valid API token required"}.to_json)
ws.close
return
end
+ # Initialize our custom fields on the websocket connection object.
ws.user = current_user
ws.filters = []
ws.last_log_id = nil
+ # Subscribe to internal postgres notifications through @channel. This will
+ # call push_events when a notification comes through.
sub = @channel.subscribe do |msg|
push_events ws, msg
end
+ # Set up callback for inbound message dispatch.
ws.on :message do |event|
begin
p = (Oj.load event.data).symbolize_keys
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list