[ARVADOS] updated: 943feeec776bb75c685cbdee9466f8db3cdf5da7

git at public.curoverse.com git at public.curoverse.com
Wed Apr 30 11:40:18 EDT 2014


Summary of changes:
 .../app/assets/javascripts/application.js          |    7 +
 .../app/assets/javascripts/collections.js          |   59 ++++++++++
 .../app/assets/javascripts/collections.js.coffee   |    3 -
 .../app/assets/stylesheets/collections.css.scss    |   21 +++-
 .../app/controllers/collections_controller.rb      |   34 ++++++-
 apps/workbench/app/controllers/users_controller.rb |   36 +++---
 apps/workbench/app/helpers/application_helper.rb   |   18 ++--
 .../app/views/collections/_index_tbody.html.erb    |   10 +--
 .../app/views/collections/_show_files.html.erb     |   12 ++
 .../app/views/collections/_show_recent.html.erb    |    8 +-
 .../app/views/collections/_toggle_persist.html.erb |    3 +
 apps/workbench/app/views/users/_tables.html.erb    |  120 ++++++++++----------
 apps/workbench/config/routes.rb                    |    4 +-
 .../pipeline_instances_controller_test.rb          |   18 +++-
 .../workbench/test/integration/collections_test.rb |   42 +++++++
 sdk/ruby/lib/arvados.rb                            |   36 +++++--
 .../api/app/controllers/application_controller.rb  |   20 +++-
 .../v1/api_client_authorizations_controller.rb     |   32 ++++-
 .../arvados/v1/keep_disks_controller.rb            |    2 +-
 .../app/controllers/arvados/v1/nodes_controller.rb |    4 +-
 .../controllers/arvados/v1/schema_controller.rb    |    9 +-
 .../arvados/v1/virtual_machines_controller.rb      |   16 ---
 services/api/app/controllers/static_controller.rb  |    2 +-
 .../app/controllers/user_sessions_controller.rb    |    2 +-
 services/api/app/middlewares/rack_socket.rb        |    7 +-
 services/api/app/models/arvados_model.rb           |   15 ++-
 services/api/app/models/node.rb                    |    2 +-
 services/api/config/application.default.yml        |    3 -
 services/api/config/application.yml.example        |    7 +
 services/api/config/initializers/eventbus.rb       |   15 ++-
 .../20140421151939_rename_auth_keys_user_index.rb  |   11 ++
 .../db/migrate/20140423133559_new_scope_format.rb  |   48 ++++++++
 services/api/db/schema.rb                          |    4 +-
 services/api/lib/current_api_client.rb             |   19 ++--
 services/api/lib/eventbus.rb                       |  122 +++++++++++---------
 services/api/lib/load_param.rb                     |   16 ++-
 services/api/lib/record_filters.rb                 |   15 ++-
 .../test/fixtures/api_client_authorizations.yml    |   37 ++++++
 services/api/test/fixtures/links.yml               |   14 +++
 .../api_client_authorizations_controller_test.rb   |   30 +++++-
 .../functional/arvados/v1/nodes_controller_test.rb |    9 ++
 .../api_client_authorizations_scopes_test.rb       |  103 +++++++++++++++++
 .../api/test/integration/login_workflow_test.rb    |   25 ++++
 services/api/test/integration/websocket_test.rb    |   64 +++++++----
 services/api/test/websocket_runner.rb              |    2 +-
 45 files changed, 821 insertions(+), 265 deletions(-)
 create mode 100644 apps/workbench/app/assets/javascripts/collections.js
 delete mode 100644 apps/workbench/app/assets/javascripts/collections.js.coffee
 create mode 100644 apps/workbench/app/views/collections/_toggle_persist.html.erb
 create mode 100644 apps/workbench/test/integration/collections_test.rb
 create mode 100644 services/api/db/migrate/20140421151939_rename_auth_keys_user_index.rb
 create mode 100644 services/api/db/migrate/20140423133559_new_scope_format.rb
 create mode 100644 services/api/test/integration/api_client_authorizations_scopes_test.rb
 create mode 100644 services/api/test/integration/login_workflow_test.rb

       via  943feeec776bb75c685cbdee9466f8db3cdf5da7 (commit)
       via  1433693e08e2d4052cc94f6a5902523b08bbc1ac (commit)
       via  e20c9587ad703ae8e1f81251a6e209834a52d448 (commit)
       via  a4724fe92e651abb06acf8c5e75184561a55c854 (commit)
       via  208e172287aba3be7cb988a2c416e4281a2e5f60 (commit)
       via  b0c30631bceb2d1837b52d1d3475e52aee4c9c43 (commit)
       via  840e7d7f96f763ae139545dca5d6dfa5a54f6cc6 (commit)
       via  8adde5132926e8f9cc1b01d79f9307614cc6021e (commit)
       via  8eaad00b025167a7505ba11ad6a05b52a43c2399 (commit)
       via  c18bde8300e115b215e58d6930d0495b2c33b49f (commit)
       via  4e3ac7f8bafde72ae397f79bfc36409e682b13e5 (commit)
       via  7b9cff04eb463c666b8126ebc6c4dfcc00a536c0 (commit)
       via  f845c0645d9136a1e4ad993ecf34a156367e73b7 (commit)
       via  03e570095885982d23e234bce8e1c068314b63af (commit)
       via  5c2758ca38d01a905253f86e63be3a5fe03a3871 (commit)
       via  52c4f2b7fe631f3d7ad16105cb2f86cf6c004fc8 (commit)
       via  e54bdba73b65e31b03fd1d43bfe69d0f43bbd8d8 (commit)
       via  0eb59e3acf9f13e89bd010f7f65a4d31554183fc (commit)
       via  915de6c854cd559ebd029b24939149e37b18c8cf (commit)
       via  71b1b7b045419817d1c9dc62a3a296b746d9117c (commit)
       via  7fbcd989af9949b11ddfec0c9ebfaa96a655eef4 (commit)
       via  c3457ad20bfd00c99facef396f1dbdbcbdbad241 (commit)
       via  4fa5a3d3958eae7ecce681e0ea7a4ac8cc43e48d (commit)
       via  a28be6d77750a6cbded994612d95ee973cb4c01e (commit)
       via  58dd340f19c3e0225ad189e60a58872d3aa3f7c2 (commit)
       via  1c5176d87df0dbd25db6ff1fb2ab82ae17472145 (commit)
       via  5d356c84dcf8f3a2129cce0f4fbfc50f16a8a339 (commit)
       via  f6f07b1856fc0aa32cdf1f96d5bdeb12e9e314ef (commit)
       via  919cf4c2c9f803f0cdb80619bcbf5d23b7c02d25 (commit)
       via  6c0b68255c0c66d90c0473561461d9215860f018 (commit)
       via  9e97504e5f65c8a3a6f1f34da678ac7e1828e8e1 (commit)
       via  62db3393df8d6bc7c48a3c5101468361cb2f3cf3 (commit)
       via  e062efe37ddf066af1c5c762b126bb72766fee25 (commit)
       via  20334fa95bb7d554c09225c02fae3d4e83c6c6c5 (commit)
       via  7bc20af4935fee1caa566e86a074022f0b60d166 (commit)
      from  b421d5c4754315cdd8b70b6bbea5b5f23fb425de (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 943feeec776bb75c685cbdee9466f8db3cdf5da7
Merge: 1433693 840e7d7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Apr 30 11:40:09 2014 -0400

    Merge remote-tracking branch 'origin/master' into origin-2608-websocket-event-bus-alt2
    
    Conflicts:
    	services/api/app/controllers/application_controller.rb
    	services/api/app/controllers/arvados/v1/schema_controller.rb
    	services/api/db/schema.rb

diff --cc services/api/app/controllers/application_controller.rb
index 1368eeb,d40c6de..3891bde
--- a/services/api/app/controllers/application_controller.rb
+++ b/services/api/app/controllers/application_controller.rb
@@@ -9,9 -4,11 +9,10 @@@ class ApplicationController < ActionCon
  
    respond_to :json
    protect_from_forgery
 -  around_filter :thread_with_auth_info, :except => [:render_error, :render_not_found]
  
+   before_filter :respond_with_json_by_default
    before_filter :remote_ip
-   before_filter :require_auth_scope_all, :except => :render_not_found
+   before_filter :require_auth_scope, :except => :render_not_found
    before_filter :catch_redirect_hint
  
    before_filter :find_object_by_uuid, :except => [:index, :create,
@@@ -290,13 -413,74 +294,17 @@@
      end
    end
  
-   def require_auth_scope_all
-     require_login and require_auth_scope(['all'])
+   def require_auth_scope
+     return false unless require_login
+     unless current_api_client_auth_has_scope("#{request.method} #{request.path}")
+       render :json => { errors: ['Forbidden'] }.to_json, status: 403
+     end
    end
  
-   def require_auth_scope(ok_scopes)
-     unless current_api_client_auth_has_scope(ok_scopes)
-       render :json => { errors: ['Forbidden'] }.to_json, status: 403
 -  def thread_with_auth_info
 -    Thread.current[:request_starttime] = Time.now
 -    Thread.current[:api_url_base] = root_url.sub(/\/$/,'') + '/arvados/v1'
 -    begin
 -      user = nil
 -      api_client = nil
 -      api_client_auth = nil
 -      supplied_token =
 -        params[:api_token] ||
 -        params[:oauth_token] ||
 -        request.headers["Authorization"].andand.match(/OAuth2 ([a-z0-9]+)/).andand[1]
 -      if supplied_token
 -        api_client_auth = ApiClientAuthorization.
 -          includes(:api_client, :user).
 -          where('api_token=? and (expires_at is null or expires_at > CURRENT_TIMESTAMP)', supplied_token).
 -          first
 -        if api_client_auth.andand.user
 -          session[:user_id] = api_client_auth.user.id
 -          session[:api_client_uuid] = api_client_auth.api_client.andand.uuid
 -          session[:api_client_authorization_id] = api_client_auth.id
 -          user = api_client_auth.user
 -          api_client = api_client_auth.api_client
 -        else
 -          # Token seems valid, but points to a non-existent (deleted?) user.
 -          api_client_auth = nil
 -        end
 -      elsif session[:user_id]
 -        user = User.find(session[:user_id]) rescue nil
 -        api_client = ApiClient.
 -          where('uuid=?',session[:api_client_uuid]).
 -          first rescue nil
 -        if session[:api_client_authorization_id] then
 -          api_client_auth = ApiClientAuthorization.
 -            find session[:api_client_authorization_id]
 -        end
 -      end
 -      Thread.current[:api_client_ip_address] = remote_ip
 -      Thread.current[:api_client_authorization] = api_client_auth
 -      Thread.current[:api_client_uuid] = api_client.andand.uuid
 -      Thread.current[:api_client] = api_client
 -      Thread.current[:user] = user
 -      if api_client_auth
 -        api_client_auth.last_used_at = Time.now
 -        api_client_auth.last_used_by_ip_address = remote_ip
 -        api_client_auth.save validate: false
 -      end
 -      yield
 -    ensure
 -      Thread.current[:api_client_ip_address] = nil
 -      Thread.current[:api_client_authorization] = nil
 -      Thread.current[:api_client_uuid] = nil
 -      Thread.current[:api_client] = nil
 -      Thread.current[:user] = nil
 -    end
 -  end
 -  # /Authentication
 -
+   def respond_with_json_by_default
+     html_index = request.accepts.index(Mime::HTML)
+     if html_index.nil? or request.accepts[0...html_index].include?(Mime::JSON)
+       request.format = :json
      end
    end
  
diff --cc services/api/lib/load_param.rb
index 9c85757,0000000..e01063d
mode 100644,000000..100644
--- a/services/api/lib/load_param.rb
+++ b/services/api/lib/load_param.rb
@@@ -1,84 -1,0 +1,88 @@@
 +# Mixin module for reading out query parameters from request params.
 +#
 +# Expects:
 +#   +params+ Hash
 +# Sets:
 +#   @where, @filters, @limit, @offset, @orders
 +module LoadParam
 +
 +  # Default limit on number of rows to return in a single query.
 +  DEFAULT_LIMIT = 100
 +
 +  # Load params[:where] into @where
 +  def load_where_param
 +    if params[:where].nil? or params[:where] == ""
 +      @where = {}
 +    elsif params[:where].is_a? Hash
 +      @where = params[:where]
 +    elsif params[:where].is_a? String
 +      begin
 +        @where = Oj.load(params[:where])
 +        raise unless @where.is_a? Hash
 +      rescue
 +        raise ArgumentError.new("Could not parse \"where\" param as an object")
 +      end
 +    end
 +    @where = @where.with_indifferent_access
 +  end
 +
 +  # Load params[:filters] into @filters
 +  def load_filters_param
 +    @filters ||= []
 +    if params[:filters].is_a? Array
 +      @filters += params[:filters]
 +    elsif params[:filters].is_a? String and !params[:filters].empty?
 +      begin
 +        f = Oj.load params[:filters]
 +        raise unless f.is_a? Array
 +        @filters += f
 +      rescue
 +        raise ArgumentError.new("Could not parse \"filters\" param as an array")
 +      end
 +    end
 +  end
 +
++  def default_orders
++    ["#{table_name}.modified_at desc"]
++  end
++
 +  # Load params[:limit], params[:offset] and params[:order]
 +  # into @limit, @offset, @orders
 +  def load_limit_offset_order_params
 +    if params[:limit]
 +      unless params[:limit].to_s.match(/^\d+$/)
 +        raise ArgumentError.new("Invalid value for limit parameter")
 +      end
 +      @limit = params[:limit].to_i
 +    else
 +      @limit = DEFAULT_LIMIT
 +    end
 +
 +    if params[:offset]
 +      unless params[:offset].to_s.match(/^\d+$/)
 +        raise ArgumentError.new("Invalid value for offset parameter")
 +      end
 +      @offset = params[:offset].to_i
 +    else
 +      @offset = 0
 +    end
 +
 +    @orders = []
 +    if params[:order]
 +      params[:order].split(',').each do |order|
 +        attr, direction = order.strip.split " "
 +        direction ||= 'asc'
 +        if attr.match /^[a-z][_a-z0-9]+$/ and
 +            model_class.columns.collect(&:name).index(attr) and
 +            ['asc','desc'].index direction.downcase
 +          @orders << "#{table_name}.#{attr} #{direction.downcase}"
 +        end
 +      end
 +    end
 +    if @orders.empty?
-       @orders << "#{table_name}.modified_at desc"
++      @orders << default_orders
 +    end
 +  end
 +
 +
 +end

commit 1433693e08e2d4052cc94f6a5902523b08bbc1ac
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Apr 30 11:26:58 2014 -0400

    Moved message handling out to a separate method for clarity in eventbus.
    Fixing name mismatch /websockets => /websocket in configuration
    Added guards to various tests to ensure no more messages are processed after
    the test is supposed to be done.

diff --git a/services/api/app/controllers/arvados/v1/schema_controller.rb b/services/api/app/controllers/arvados/v1/schema_controller.rb
index c9b18ae..6cdedf6 100644
--- a/services/api/app/controllers/arvados/v1/schema_controller.rb
+++ b/services/api/app/controllers/arvados/v1/schema_controller.rb
@@ -73,7 +73,7 @@ class Arvados::V1::SchemaController < ApplicationController
       if Rails.application.config.websocket_address
         discovery[:websocketUrl] = Rails.application.config.websocket_address
       elsif ENV['ARVADOS_WEBSOCKETS']
-        discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websockets"
+        discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websocket"
       end
 
       ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
diff --git a/services/api/config/application.yml.example b/services/api/config/application.yml.example
index b2b9238..2705fa1 100644
--- a/services/api/config/application.yml.example
+++ b/services/api/config/application.yml.example
@@ -45,4 +45,4 @@ common:
   # the websocket server.  When you do this, you need to set the following
   # configuration variable so that the primary server can give out the correct
   # address of the dedicated websocket server:
-  #websocket_address: wss://websocket.local/websockets
+  #websocket_address: wss://websocket.local/websocket
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index e04f1fe..7da8ade 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -7,7 +7,7 @@ Server::Application.configure do
   if ENV['ARVADOS_WEBSOCKETS']
     config.middleware.insert_after ArvadosApiToken, RackSocket, {
       :handler => EventBus,
-      :mount => "/websockets",
+      :mount => "/websocket",
       :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
     }
   end
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 79315aa..0b8cae2 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -99,6 +99,57 @@ class EventBus
       end
   end
 
+  # Handle inbound subscribe or unsubscribe message.
+  def handle_message ws, event
+    begin
+      # Parse event data as JSON
+      p = (Oj.load event.data).symbolize_keys
+
+      if p[:method] == 'subscribe'
+        # Handle subscribe event
+
+        if p[:last_log_id]
+          # Set or reset the last_log_id.  The event bus only reports events
+          # for rows that come after last_log_id.
+          ws.last_log_id = p[:last_log_id].to_i
+        end
+
+        if ws.filters.length < MAX_FILTERS
+          # Add a filter.  This gets the :filters field which is the same
+          # format as used for regular index queries.
+          ws.filters << Filter.new(p)
+          ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+
+          # Send any pending events
+          push_events ws
+        else
+          ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+        end
+
+      elsif p[:method] == 'unsubscribe'
+        # Handle unsubscribe event
+
+        len = ws.filters.length
+        ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
+        if ws.filters.length < len
+          ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
+        else
+          ws.send ({status: 404, message: 'filter not found'}.to_json)
+        end
+
+      else
+        ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
+      end
+    rescue Oj::Error => e
+      ws.send ({status: 400, message: "malformed request"}.to_json)
+    rescue Exception => e
+      puts "Error handling message: #{$!}"
+      puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+      ws.send ({status: 500, message: 'error'}.to_json)
+      ws.close
+    end
+  end
+
   # Constant maximum number of filters, to avoid silly huge database queries.
   MAX_FILTERS = 16
 
@@ -126,53 +177,7 @@ class EventBus
 
     # Set up callback for inbound message dispatch.
     ws.on :message do |event|
-      begin
-        # Parse event data as JSON
-        p = (Oj.load event.data).symbolize_keys
-
-        if p[:method] == 'subscribe'
-          # Handle subscribe event
-
-          if p[:last_log_id]
-            # Set or reset the last_log_id.  The event bus only reports events
-            # for rows that come after last_log_id.
-            ws.last_log_id = p[:last_log_id].to_i
-          end
-
-          if ws.filters.length < MAX_FILTERS
-            # Add a filter.  This gets the :filters field which is the same
-            # format as used for regular index queries.
-            ws.filters << Filter.new(p)
-            ws.send ({status: 200, message: 'subscribe ok'}.to_json)
-
-            # Send any pending events
-            push_events ws
-          else
-            ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
-          end
-
-        elsif p[:method] == 'unsubscribe'
-          # Handle unsubscribe event
-
-          len = ws.filters.length
-          ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
-          if ws.filters.length < len
-            ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
-          else
-            ws.send ({status: 404, message: 'filter not found'}.to_json)
-          end
-
-        else
-          ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
-        end
-      rescue Oj::Error => e
-        ws.send ({status: 400, message: "malformed request"}.to_json)
-      rescue Exception => e
-        puts "Error handling message: #{$!}"
-        puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
-        ws.send ({status: 500, message: 'error'}.to_json)
-        ws.close
-      end
+      handle_message ws, event
     end
 
     # Set up socket close callback
@@ -209,6 +214,7 @@ class EventBus
               conn.async_exec "UNLISTEN *"
             end
           end
+          @bgthread = false
         end
       end
     end
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index 4761800..9ce53a6 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -142,7 +142,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 3
         when 3
           human_ev_uuid = d["object_uuid"]
+          state = 4
           ws.close
+        when 4
+          assert false, "Should not get any more events"
         end
       end
 
@@ -176,7 +179,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 2
         when 2
           human_ev_uuid = d["object_uuid"]
+          state = 3
           ws.close
+        when 3
+          assert false, "Should not get any more events"
         end
       end
 
@@ -219,7 +225,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 4
         when 4
           human_ev_uuid = d["object_uuid"]
+          state = 5
           ws.close
+        when 5
+          assert false, "Should not get any more events"
         end
       end
 
@@ -260,7 +269,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         when 3
           l2 = d["object_uuid"]
           assert_not_nil l2, "Unexpected message: #{d}"
+          state = 4
           ws.close
+        when 4
+          assert false, "Should not get any more events"
         end
       end
 
@@ -403,7 +415,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 4
         when 4
           human_ev_uuid = d["object_uuid"]
+          state = 5
           ws.close
+        when 5
+          assert false, "Should not get any more events"
         end
       end
 

commit e20c9587ad703ae8e1f81251a6e209834a52d448
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Apr 30 10:56:22 2014 -0400

    Short-circuts #readable_by when the user is admin.

diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb
index c87a909..23d0222 100644
--- a/services/api/app/models/arvados_model.rb
+++ b/services/api/app/models/arvados_model.rb
@@ -75,6 +75,11 @@ class ArvadosModel < ActiveRecord::Base
   # end
 
   def self.readable_by user
+    if user.is_admin
+      # Admins can read anything, so return immediately.
+      return self
+    end
+
     uuid_list = [user.uuid, *user.groups_i_can(:read)]
     sanitized_uuid_list = uuid_list.
       collect { |uuid| sanitize(uuid) }.join(', ')
@@ -100,8 +105,6 @@ class ArvadosModel < ActiveRecord::Base
     # Link is any permission link ('write' and 'manage' implicitly include 'read')
     # The existence of such a link is tested in the where clause as permissions.head_uuid IS NOT NULL.
     # or
-    # User is admin
-    # or
     # This row is owned by this user, or owned by a group readable by this user
     # or
     # This is the users table
@@ -115,8 +118,8 @@ class ArvadosModel < ActiveRecord::Base
     # This object described by this row is owned by this user, or owned by a group readable by this user
 
     joins("LEFT JOIN links permissions ON permissions.head_uuid in (#{table_name}.owner_uuid, #{table_name}.uuid #{or_object_uuid}) AND permissions.tail_uuid in (#{sanitized_uuid_list}) AND permissions.link_class='permission'").
-      where("permissions.head_uuid IS NOT NULL OR ?=? OR #{table_name}.owner_uuid in (?) #{or_row_is_me} #{or_references_me} #{or_object_owner}",
-            user.is_admin, true, uuid_list).uniq
+      where("permissions.head_uuid IS NOT NULL OR #{table_name}.owner_uuid in (?) #{or_row_is_me} #{or_references_me} #{or_object_owner}",
+            uuid_list).uniq
   end
 
   def logged_attributes

commit a4724fe92e651abb06acf8c5e75184561a55c854
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Apr 30 10:56:04 2014 -0400

    Unsubscribe message now takes filter definition instead of filter_id, reducing
    the state both the client and server have to maintain in order to support
    unsubscribing.  Added code comments and updated tests.

diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 8480cf4..79315aa 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -20,19 +20,14 @@ class Filter
 
   attr_accessor :filters
 
-  def initialize p, fid
+  def initialize p
     @params = p
-    @filter_id = fid
     load_filters_param
   end
 
   def params
     @params
   end
-
-  def filter_id
-    @filter_id
-  end
 end
 
 # Manages websocket connections, accepts subscription messages and publishes
@@ -51,12 +46,6 @@ class EventBus
     @channel = EventMachine::Channel.new
     @mtx = Mutex.new
     @bgthread = false
-    @filter_id_counter = 0
-  end
-
-  # Allocate a new filter id
-  def alloc_filter_id
-    @filter_id_counter += 1
   end
 
   # Push out any pending events to the connection +ws+
@@ -69,7 +58,8 @@ class EventBus
           logs = Log.readable_by(ws.user).order("id asc")
 
           if ws.last_log_id
-            # Only interested in log rows that are new
+            # Client is only interested in log rows that are newer than the
+            # last log row seen by the client.
             logs = logs.where("logs.id > ?", ws.last_log_id)
           elsif id
             # No last log id, so only look at the most recently changed row
@@ -92,7 +82,7 @@ class EventBus
             logs = logs.where(cond_out.join(' OR '), *param_out)
           end
 
-          # Finally execute query and send matching rows
+          # Finally execute query and actually send the matching log rows
           logs.each do |l|
             ws.send(l.as_api_response.to_json)
             ws.last_log_id = l.id
@@ -137,33 +127,41 @@ class EventBus
     # Set up callback for inbound message dispatch.
     ws.on :message do |event|
       begin
+        # Parse event data as JSON
         p = (Oj.load event.data).symbolize_keys
+
         if p[:method] == 'subscribe'
+          # Handle subscribe event
+
           if p[:last_log_id]
+            # Set or reset the last_log_id.  The event bus only reports events
+            # for rows that come after last_log_id.
             ws.last_log_id = p[:last_log_id].to_i
           end
 
           if ws.filters.length < MAX_FILTERS
-            filter_id = alloc_filter_id
-            ws.filters.push Filter.new(p, filter_id)
-            ws.send ({status: 200, message: 'subscribe ok', filter_id: filter_id}.to_json)
+            # Add a filter.  This gets the :filters field which is the same
+            # format as used for regular index queries.
+            ws.filters << Filter.new(p)
+            ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+
+            # Send any pending events
             push_events ws
           else
             ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
           end
+
         elsif p[:method] == 'unsubscribe'
-          if filter_id = p[:filter_id]
-            filter_id = filter_id.to_i
-            len = ws.filters.length
-            ws.filters = ws.filters.select { |f| f.filter_id != filter_id }
-            if ws.filters.length < len
-              ws.send ({status: 200, message: 'unsubscribe ok', filter_id: filter_id}.to_json)
-            else
-              ws.send ({status: 404, message: 'filter_id not found', filter_id: filter_id}.to_json)
-            end
+          # Handle unsubscribe event
+
+          len = ws.filters.length
+          ws.filters.select! { |f| not ((f.filters == p[:filters]) or (f.filters.empty? and p[:filters].nil?)) }
+          if ws.filters.length < len
+            ws.send ({status: 200, message: 'unsubscribe ok'}.to_json)
           else
-            ws.send ({status: 400, message: 'must provide filter_id'}.to_json)
+            ws.send ({status: 404, message: 'filter not found'}.to_json)
           end
+
         else
           ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
         end
@@ -177,11 +175,13 @@ class EventBus
       end
     end
 
+    # Set up socket close callback
     ws.on :close do |event|
       @channel.unsubscribe sub
       ws = nil
     end
 
+    # Start up thread to monitor the Postgres database, if none exists already.
     @mtx.synchronize do
       unless @bgthread
         @bgthread = true
@@ -192,6 +192,12 @@ class EventBus
             begin
               conn.async_exec "LISTEN logs"
               while true
+                # wait_for_notify will block until there is a change
+                # notification from Postgres about the logs table, then push
+                # the notification into the EventMachine channel.  Each
+                # websocket connection subscribes to the other end of the
+                # channel and calls #push_events to actually dispatch the
+                # events to the client.
                 conn.wait_for_notify do |channel, pid, payload|
                   @channel.push payload
                 end
@@ -206,5 +212,9 @@ class EventBus
         end
       end
     end
+
+    # Since EventMachine is an asynchronous event based dispatcher, #on_connect
+    # does not block but instead returns immediately after having set up the
+    # websocket and notification channel callbacks.
   end
 end
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index 3bba0ef..4761800 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -211,6 +211,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         when 2
           assert_equal 200, d["status"]
           spec = Specimen.create
+          Trait.create # not part of filters, should not be received
           human = Human.create
           state = 3
         when 3
@@ -281,6 +282,9 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       ws.on :open do |event|
         ws.send ({method: 'subscribe'}.to_json)
         EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
           ws.close
         end
       end
@@ -290,15 +294,14 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         case state
         when 1
           assert_equal 200, d["status"]
-          filter_id = d["filter_id"]
           spec = Specimen.create
           state = 2
         when 2
           spec_ev_uuid = d["object_uuid"]
-          ws.send ({method: 'unsubscribe', filter_id: filter_id}.to_json)
+          ws.send ({method: 'unsubscribe'}.to_json)
 
           EM::Timer.new 1 do
-            Human.create
+            Specimen.create
           end
 
           state = 3
@@ -316,19 +319,22 @@ class WebsocketTest < ActionDispatch::IntegrationTest
     assert_equal spec.uuid, spec_ev_uuid
   end
 
-
-  test "connect, subscribe, get event, try to unsubscribe with bogus filter_id" do
+  test "connect, subscribe, get event, unsubscribe with filter" do
     state = 1
     spec = nil
     spec_ev_uuid = nil
-    human = nil
-    human_ev_uuid = nil
 
     authorize_with :admin
 
-    ws_helper :admin do |ws|
+    ws_helper :admin, false do |ws|
       ws.on :open do |event|
-        ws.send ({method: 'subscribe'}.to_json)
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+        EM::Timer.new 3 do
+          # Set a time limit on the test because after unsubscribing the server
+          # still has to process the next event (and then hopefully correctly
+          # decides not to send it because we unsubscribed.)
+          ws.close
+        end
       end
 
       ws.on :message do |event|
@@ -336,35 +342,33 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         case state
         when 1
           assert_equal 200, d["status"]
-          spec = Specimen.create
+          spec = Human.create
           state = 2
         when 2
           spec_ev_uuid = d["object_uuid"]
-          ws.send ({method: 'unsubscribe', filter_id: 100000}.to_json)
+          ws.send ({method: 'unsubscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
 
           EM::Timer.new 1 do
-            human = Human.create
+            Human.create
           end
 
           state = 3
         when 3
-          assert_equal 404, d["status"]
+          assert_equal 200, d["status"]
           state = 4
         when 4
-          human_ev_uuid = d["object_uuid"]
-          ws.close
+          assert false, "Should not get any more events"
         end
       end
 
     end
 
     assert_not_nil spec
-    assert_not_nil human
     assert_equal spec.uuid, spec_ev_uuid
-    assert_equal human.uuid, human_ev_uuid
   end
 
-  test "connect, subscribe, get event, try to unsubscribe with missing filter_id" do
+
+  test "connect, subscribe, get event, try to unsubscribe with bogus filter" do
     state = 1
     spec = nil
     spec_ev_uuid = nil
@@ -387,7 +391,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
           state = 2
         when 2
           spec_ev_uuid = d["object_uuid"]
-          ws.send ({method: 'unsubscribe'}.to_json)
+          ws.send ({method: 'unsubscribe', filters: [['foo', 'bar', 'baz']]}.to_json)
 
           EM::Timer.new 1 do
             human = Human.create
@@ -395,7 +399,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
           state = 3
         when 3
-          assert_equal 400, d["status"]
+          assert_equal 404, d["status"]
           state = 4
         when 4
           human_ev_uuid = d["object_uuid"]
@@ -412,6 +416,7 @@ class WebsocketTest < ActionDispatch::IntegrationTest
   end
 
 
+
   test "connected, not subscribed, no event" do
     authorize_with :admin
 
@@ -532,10 +537,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       ws.on :message do |event|
         d = Oj.load event.data
         case state
-        when (1..16)
+        when (1..EventBus::MAX_FILTERS)
           assert_equal 200, d["status"]
           state += 1
-        when 17
+        when (EventBus::MAX_FILTERS+1)
           assert_equal 403, d["status"]
           ws.close
         end
diff --git a/services/api/test/websocket_runner.rb b/services/api/test/websocket_runner.rb
index c35938e..df72e24 100644
--- a/services/api/test/websocket_runner.rb
+++ b/services/api/test/websocket_runner.rb
@@ -6,7 +6,7 @@ SERVER_PID_PATH = 'tmp/pids/passenger.3002.pid'
 class WebsocketTestRunner < MiniTest::Unit
   def _system(*cmd)
     Bundler.with_clean_env do
-      if not system({'ARVADOS_WEBSOCKETS' => '1', 'RAILS_ENV' => 'test'}, *cmd)
+      if not system({'ARVADOS_WEBSOCKETS' => 'ws-only', 'RAILS_ENV' => 'test'}, *cmd)
         raise RuntimeError, "#{cmd[0]} returned exit code #{$?.exitstatus}"
       end
     end

commit 208e172287aba3be7cb988a2c416e4281a2e5f60
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Apr 30 10:54:37 2014 -0400

    Added code comments.

diff --git a/services/api/app/middlewares/rack_socket.rb b/services/api/app/middlewares/rack_socket.rb
index 892dbf5..795df4a 100644
--- a/services/api/app/middlewares/rack_socket.rb
+++ b/services/api/app/middlewares/rack_socket.rb
@@ -19,12 +19,13 @@ class RackSocket
   # +app+  The next layer of the Rack stack.
   #
   # Accepts options:
-  # +:handler+ (Required) A class to handle new connections.  Initialize will
+  # +:handler+ (Required) A class to handle new connections.  #initialize will
   # call handler.new to create the actual handler instance object.  When a new
   # websocket connection is established, #on_connect on the handler instance
-  # object to notify it about the connection.
+  # object will be called with the new connection.
   #
-  # +:mount+  The path for websocket connect requests, defaults to '/websocket'.
+  # +:mount+ The HTTP request path that will be recognized for websocket
+  # connect requests, defaults to '/websocket'.
   #
   # +:websocket_only+  If true, the server will only handle websocket requests,
   # and all other requests will result in an error.  If false, unhandled
diff --git a/services/api/lib/load_param.rb b/services/api/lib/load_param.rb
index dba0582..9c85757 100644
--- a/services/api/lib/load_param.rb
+++ b/services/api/lib/load_param.rb
@@ -1,12 +1,15 @@
+# Mixin module for reading out query parameters from request params.
+#
 # Expects:
 #   +params+ Hash
 # Sets:
-#   @where, @filters
-
+#   @where, @filters, @limit, @offset, @orders
 module LoadParam
 
+  # Default limit on number of rows to return in a single query.
   DEFAULT_LIMIT = 100
 
+  # Load params[:where] into @where
   def load_where_param
     if params[:where].nil? or params[:where] == ""
       @where = {}
@@ -23,6 +26,7 @@ module LoadParam
     @where = @where.with_indifferent_access
   end
 
+  # Load params[:filters] into @filters
   def load_filters_param
     @filters ||= []
     if params[:filters].is_a? Array
@@ -38,6 +42,8 @@ module LoadParam
     end
   end
 
+  # Load params[:limit], params[:offset] and params[:order]
+  # into @limit, @offset, @orders
   def load_limit_offset_order_params
     if params[:limit]
       unless params[:limit].to_s.match(/^\d+$/)
diff --git a/services/api/lib/record_filters.rb b/services/api/lib/record_filters.rb
index 3f0a845..d7e556b 100644
--- a/services/api/lib/record_filters.rb
+++ b/services/api/lib/record_filters.rb
@@ -1,11 +1,20 @@
+# Mixin module providing a method to convert filters into a list of SQL
+# fragments suitable to be fed to ActiveRecord #where.
+#
 # Expects:
-#   @where
-#   @filters
-#   +model_class+
+#   model_class
 # Operates on:
 #   @objects
 module RecordFilters
 
+  # Input:
+  # +filters+  Arvados filters as list of lists.
+  # +ar_table_name+  name of SQL table
+  #
+  # Output:
+  # Hash with two keys:
+  # :cond_out  array of SQL fragments for each filter expression
+  # :param_out  array of values for parameter substitution in cond_out
   def record_filters filters, ar_table_name
     cond_out = []
     param_out = []

commit b0c30631bceb2d1837b52d1d3475e52aee4c9c43
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Apr 30 10:53:35 2014 -0400

    Now supports both websocket integrated (ARVADOS_WEBSOCKETS defined) and
    websocket-only (ARVADOS_WEBSOCKETS=ws-only) server modes.  Added comment to
    application.yml.example about setting websocket_address when running in
    websocket-only mode.

diff --git a/services/api/app/controllers/arvados/v1/schema_controller.rb b/services/api/app/controllers/arvados/v1/schema_controller.rb
index b8f0594..c9b18ae 100644
--- a/services/api/app/controllers/arvados/v1/schema_controller.rb
+++ b/services/api/app/controllers/arvados/v1/schema_controller.rb
@@ -21,7 +21,6 @@ class Arvados::V1::SchemaController < ApplicationController
         documentationLink: "http://doc.arvados.org/api/index.html",
         protocol: "rest",
         baseUrl: root_url + "/arvados/v1/",
-        websocketUrl: Rails.application.config.websocket_address,
         basePath: "/arvados/v1/",
         rootUrl: root_url,
         servicePath: "arvados/v1/",
@@ -71,6 +70,12 @@ class Arvados::V1::SchemaController < ApplicationController
         resources: {}
       }
 
+      if Rails.application.config.websocket_address
+        discovery[:websocketUrl] = Rails.application.config.websocket_address
+      elsif ENV['ARVADOS_WEBSOCKETS']
+        discovery[:websocketUrl] = (root_url.sub /^http/, 'ws') + "/websockets"
+      end
+
       ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |k|
         begin
           ctl_class = "Arvados::V1::#{k.to_s.pluralize}Controller".constantize
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index c05b4af..37bb1c3 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -90,9 +90,6 @@ common:
   # Visitors to the API server will be redirected to the workbench
   workbench_address: https://workbench.local:3001/
 
-  # Websocket endpoint
-  websocket_address: wss://localhost:3002/websocket
-
   # The e-mail address of the user you would like to become marked as an admin
   # user on their first login.
   # In the default configuration, authentication happens through the Arvados SSO
diff --git a/services/api/config/application.yml.example b/services/api/config/application.yml.example
index 9162fc4..b2b9238 100644
--- a/services/api/config/application.yml.example
+++ b/services/api/config/application.yml.example
@@ -39,3 +39,10 @@ test:
 common:
   #git_repositories_dir: /var/cache/git
   #git_internal_dir: /var/cache/arvados/internal.git
+
+  # You can run the websocket server separately from the regular HTTP service
+  # by setting "ARVADOS_WEBSOCKETS=ws-only" in the environment before running
+  # the websocket server.  When you do this, you need to set the following
+  # configuration variable so that the primary server can give out the correct
+  # address of the dedicated websocket server:
+  #websocket_address: wss://websocket.local/websockets
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index 2902403..e04f1fe 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -1,7 +1,18 @@
 require 'eventbus'
 
 Server::Application.configure do
-  if ENV['ARVADOS_WEBSOCKETS'] == '1'
-    config.middleware.insert_after ArvadosApiToken, RackSocket, {:handler => EventBus, :websocket_only => true }
+  # Enables websockets if ARVADOS_WEBSOCKETS is defined with any value.  If
+  # ARVADOS_WEBSOCKETS=ws-only, server will only accept websocket connections
+  # and return an error response for all other requests.
+  if ENV['ARVADOS_WEBSOCKETS']
+    config.middleware.insert_after ArvadosApiToken, RackSocket, {
+      :handler => EventBus,
+      :mount => "/websockets",
+      :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
+    }
   end
+
+  # Define websocket_address configuration option, can be overridden in config files.
+  # See application.yml.example for details.
+  config.websocket_address = nil
 end

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list