[ARVADOS] updated: b421d5c4754315cdd8b70b6bbea5b5f23fb425de

git at public.curoverse.com git at public.curoverse.com
Tue Apr 29 17:10:41 EDT 2014


Summary of changes:
 services/api/app/middlewares/arvados_api_token.rb |   11 +++++++-
 services/api/app/middlewares/rack_socket.rb       |   25 +++++++++++++++++
 services/api/lib/eventbus.rb                      |   30 ++++++++++++++++----
 3 files changed, 59 insertions(+), 7 deletions(-)

       via  b421d5c4754315cdd8b70b6bbea5b5f23fb425de (commit)
      from  e537bd8dd1ac786164f192374e0d076bdc0327f3 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b421d5c4754315cdd8b70b6bbea5b5f23fb425de
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Apr 29 17:10:37 2014 -0400

    Adding more code documentation.

diff --git a/services/api/app/middlewares/arvados_api_token.rb b/services/api/app/middlewares/arvados_api_token.rb
index 24d013c..57d3ad0 100644
--- a/services/api/app/middlewares/arvados_api_token.rb
+++ b/services/api/app/middlewares/arvados_api_token.rb
@@ -1,10 +1,19 @@
+# Perform api_token checking very early in the request process.  We want to do
+# this in the Rack stack instead of in ApplicationController because
+# websockets needs access to authentication but doesn't use any of the rails
+# active dispatch infrastructure.
 class ArvadosApiToken
+
+  # Create a new ArvadosApiToken handler
+  # +app+  The next layer of the Rack stack.
   def initialize(app = nil, options = nil)
     @app = app if app.respond_to?(:call)
   end
 
   def call env
-    # first, clean up just in case
+    # First, clean up just in case we have a multithreaded server and thread
+    # local variables are still set from a prior request.  Also useful for
+    # tests that call this code to set up the environment.
     Thread.current[:api_client_ip_address] = nil
     Thread.current[:api_client_authorization] = nil
     Thread.current[:api_client_uuid] = nil
diff --git a/services/api/app/middlewares/rack_socket.rb b/services/api/app/middlewares/rack_socket.rb
index e2ca570..892dbf5 100644
--- a/services/api/app/middlewares/rack_socket.rb
+++ b/services/api/app/middlewares/rack_socket.rb
@@ -2,15 +2,34 @@ require 'rack'
 require 'faye/websocket'
 require 'eventmachine'
 
+# A Rack middleware to handle inbound websocket connection requests and hand
+# them over to the faye websocket library.
 class RackSocket
 
   DEFAULT_ENDPOINT  = '/websocket'
 
+  # Stop EventMachine on signal, this should give it a chance to to unwind any
+  # open connections.
   def die_gracefully_on_signal
     Signal.trap("INT") { EM.stop }
     Signal.trap("TERM") { EM.stop }
   end
 
+  # Create a new RackSocket handler
+  # +app+  The next layer of the Rack stack.
+  #
+  # Accepts options:
+  # +:handler+ (Required) A class to handle new connections.  Initialize will
+  # call handler.new to create the actual handler instance object.  When a new
+  # websocket connection is established, #on_connect on the handler instance
+  # object to notify it about the connection.
+  #
+  # +:mount+  The path for websocket connect requests, defaults to '/websocket'.
+  #
+  # +:websocket_only+  If true, the server will only handle websocket requests,
+  # and all other requests will result in an error.  If false, unhandled
+  # non-websocket requests will be passed along on to 'app' in the usual Rack
+  # way.
   def initialize(app = nil, options = nil)
     @app = app if app.respond_to?(:call)
     @options = [app, options].grep(Hash).first || {}
@@ -38,14 +57,20 @@ class RackSocket
       }
     end
 
+    # Create actual handler instance object from handler class.
     @handler = @options[:handler].new
   end
 
+  # Handle websocket connection request, or pass on to the next middleware
+  # supplied in +app+ initialize (unless +:websocket_only+ option is true, in
+  # which case return an error response.)
+  # +env+ the Rack environment with information about the request.
   def call env
     request = Rack::Request.new(env)
     if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
       ws = Faye::WebSocket.new(env)
 
+      # Notify handler about new connection
       @handler.on_connect ws
 
       # Return async Rack response
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index b36378b..8480cf4 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -4,6 +4,7 @@ require 'faye/websocket'
 require 'record_filters'
 require 'load_param'
 
+# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
 module Faye
   class WebSocket
     attr_accessor :user
@@ -12,6 +13,8 @@ module Faye
   end
 end
 
+# Store the filters supplied by the user that will be applied to the logs table
+# to determine which events to return to the listener.
 class Filter
   include LoadParam
 
@@ -32,6 +35,8 @@ class Filter
   end
 end
 
+# Manages websocket connections, accepts subscription messages and publishes
+# log table events.
 class EventBus
   include CurrentApiClient
   include RecordFilters
@@ -41,6 +46,7 @@ class EventBus
     Log
   end
 
+  # Initialize EventBus.  Takes no parameters.
   def initialize
     @channel = EventMachine::Channel.new
     @mtx = Mutex.new
@@ -48,11 +54,14 @@ class EventBus
     @filter_id_counter = 0
   end
 
+  # Allocate a new filter id
   def alloc_filter_id
-    (@filter_id_counter += 1)
+    @filter_id_counter += 1
   end
 
-  def push_events ws, msg = nil
+  # Push out any pending events to the connection +ws+
+  # +id+  the id of the most recent row in the log table, may be nil
+  def push_events ws, id = nil
       begin
         # Must have at least one filter set up to receive events
         if ws.filters.length > 0
@@ -62,9 +71,9 @@ class EventBus
           if ws.last_log_id
             # Only interested in log rows that are new
             logs = logs.where("logs.id > ?", ws.last_log_id)
-          elsif msg
+          elsif id
             # No last log id, so only look at the most recently changed row
-            logs = logs.where("logs.id = ?", msg.to_i)
+            logs = logs.where("logs.id = ?", id.to_i)
           else
             return
           end
@@ -88,9 +97,9 @@ class EventBus
             ws.send(l.as_api_response.to_json)
             ws.last_log_id = l.id
           end
-        elsif msg
+        elsif id
           # No filters set up, so just record the sequence number
-          ws.last_log_id = msg.to_i
+          ws.last_log_id = id.to_i
         end
       rescue Exception => e
         puts "Error publishing event: #{$!}"
@@ -100,23 +109,32 @@ class EventBus
       end
   end
 
+  # Constant maximum number of filters, to avoid silly huge database queries.
   MAX_FILTERS = 16
 
+  # Called by RackSocket when a new websocket connection has been established.
   def on_connect ws
+
+    # Disconnect if no valid API token.
+    # current_user is included from CurrentApiClient
     if not current_user
       ws.send ({status: 401, message: "Valid API token required"}.to_json)
       ws.close
       return
     end
 
+    # Initialize our custom fields on the websocket connection object.
     ws.user = current_user
     ws.filters = []
     ws.last_log_id = nil
 
+    # Subscribe to internal postgres notifications through @channel.  This will
+    # call push_events when a notification comes through.
     sub = @channel.subscribe do |msg|
       push_events ws, msg
     end
 
+    # Set up callback for inbound message dispatch.
     ws.on :message do |event|
       begin
         p = (Oj.load event.data).symbolize_keys

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list