[ARVADOS] updated: 7c12fc4d989dd2b4c47a174280a4f9526ecb0798

git at public.curoverse.com git at public.curoverse.com
Fri Apr 25 11:19:26 EDT 2014


Summary of changes:
 services/api/lib/eventbus.rb                    |  120 +++++--
 services/api/test/integration/websocket_test.rb |  475 +++++++++++++++++++++--
 2 files changed, 538 insertions(+), 57 deletions(-)

       via  7c12fc4d989dd2b4c47a174280a4f9526ecb0798 (commit)
      from  0a72fd5c9f764eba4fc295d8beb24ac3d01885c5 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7c12fc4d989dd2b4c47a174280a4f9526ecb0798
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Apr 25 11:19:22 2014 -0400

    Added tests, especially for error handling behavior.  Fixed bugs found by tests.

diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 0865567..ac023e5 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -15,17 +15,20 @@ end
 class Filter
   include LoadParam
 
-  def initialize p
-    @p = p
+  attr_accessor :filters
+
+  def initialize p, fid
+    @params = p
+    @filter_id = fid
     load_filters_param
   end
 
   def params
-    @p
+    @params
   end
 
-  def filters
-    @filters
+  def filter_id
+    @filter_id
   end
 end
 
@@ -33,42 +36,44 @@ class EventBus
   include CurrentApiClient
   include RecordFilters
 
+  # used in RecordFilters
+  def model_class
+    Log
+  end
+
+  # used in RecordFilters
+  def table_name
+    model_class.table_name
+  end
+
   def initialize
     @channel = EventMachine::Channel.new
     @mtx = Mutex.new
     @bgthread = false
+    @filter_id_counter = 0
   end
 
-  def on_connect ws
-    if not current_user
-      ws.send ({status: 401, message: "Valid API token required"}.to_json)
-      ws.close
-      return
-    end
-
-    ws.user = current_user
-    ws.filters = []
-    ws.last_log_id = nil
+  def alloc_filter_id
+    (@filter_id_counter += 1)
+  end
 
-    sub = @channel.subscribe do |msg|
+  def push_events ws, msg = nil
       begin
         # Must have at least one filter set up to receive events
         if ws.filters.length > 0
-
           # Start with log rows readable by user, sorted in ascending order
           logs = Log.readable_by(ws.user).order("id asc")
 
           if ws.last_log_id
-            # Only get log rows that are new
-            logs = logs.where("logs.id > ? and logs.id <= ?", ws.last_log_id, msg.to_i)
-          else
+            # Only interested in log rows that are new
+            logs = logs.where("logs.id > ?", ws.last_log_id)
+          elsif msg
             # No last log id, so only look at the most recently changed row
             logs = logs.where("logs.id = ?", msg.to_i)
+          else
+            return
           end
 
-          # Record the most recent row
-          ws.last_log_id = msg.to_i
-
           # Now process filters provided by client
           cond_out = []
           param_out = []
@@ -86,25 +91,76 @@ class EventBus
           # Finally execute query and send matching rows
           logs.each do |l|
             ws.send(l.as_api_response.to_json)
+            ws.last_log_id = l.id
           end
-        else
+        elsif msg
           # No filters set up, so just record the sequence number
-          ws.last_log_id.nil = msg.to_i
+          ws.last_log_id = msg.to_i
         end
       rescue Exception => e
-        puts "#{e}"
+        puts "Error publishing event: #{$!}"
+        puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+        ws.send ({status: 500, message: 'error'}.to_json)
         ws.close
       end
+  end
+
+  MAX_FILTERS = 16
+
+  def on_connect ws
+    if not current_user
+      ws.send ({status: 401, message: "Valid API token required"}.to_json)
+      ws.close
+      return
+    end
+
+    ws.user = current_user
+    ws.filters = []
+    ws.last_log_id = nil
+
+    sub = @channel.subscribe do |msg|
+      push_events ws, msg
     end
 
     ws.on :message do |event|
-      p = Oj.load event.data
-      if p["method"] == 'subscribe'
-        if p["starting_log_id"]
-          ws.last_log_id = p["starting_log_id"].to_i
+      begin
+        p = (Oj.load event.data).symbolize_keys
+        if p[:method] == 'subscribe'
+          if p[:last_log_id]
+            ws.last_log_id = p[:last_log_id].to_i
+          end
+
+          if ws.filters.length < MAX_FILTERS
+            filter_id = alloc_filter_id
+            ws.filters.push Filter.new(p, filter_id)
+            ws.send ({status: 200, message: 'subscribe ok', filter_id: filter_id}.to_json)
+            push_events ws
+          else
+            ws.send ({status: 403, message: "maximum of #{MAX_FILTERS} filters allowed per connection"}.to_json)
+          end
+        elsif p[:method] == 'unsubscribe'
+          if filter_id = p[:filter_id]
+            filter_id = filter_id.to_i
+            len = ws.filters.length
+            ws.filters = ws.filters.select { |f| f.filter_id != filter_id }
+            if ws.filters.length < len
+              ws.send ({status: 200, message: 'unsubscribe ok', filter_id: filter_id}.to_json)
+            else
+              ws.send ({status: 404, message: 'filter_id not found', filter_id: filter_id}.to_json)
+            end
+          else
+            ws.send ({status: 400, message: 'must provide filter_id'}.to_json)
+          end
+        else
+          ws.send ({status: 400, message: "missing or unrecognized method"}.to_json)
         end
-        ws.filters.push(Filter.new p)
-        ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+      rescue Oj::Error => e
+        ws.send ({status: 400, message: "malformed request"}.to_json)
+      rescue Exception => e
+        puts "Error handling message: #{$!}"
+        puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
+        ws.send ({status: 500, message: 'error'}.to_json)
+        ws.close
       end
     end
 
diff --git a/services/api/test/integration/websocket_test.rb b/services/api/test/integration/websocket_test.rb
index 8f28fc8..666175d 100644
--- a/services/api/test/integration/websocket_test.rb
+++ b/services/api/test/integration/websocket_test.rb
@@ -3,7 +3,7 @@ require 'websocket_runner'
 require 'oj'
 require 'database_cleaner'
 
-DatabaseCleaner.strategy = :deletion
+DatabaseCleaner.strategy = :truncation
 
 class WebsocketTest < ActionDispatch::IntegrationTest
   self.use_transactional_fixtures = false
@@ -16,8 +16,10 @@ class WebsocketTest < ActionDispatch::IntegrationTest
     DatabaseCleaner.clean
   end
 
-  def ws_helper (token = nil)
+  def ws_helper (token = nil, timeout = true)
+    opened = false
     close_status = nil
+    too_long = false
 
     EM.run {
       if token
@@ -26,31 +28,33 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
       end
 
-      ws.on :close do |event|
-        close_status = [:close, event.code, event.reason]
-        EM.stop_event_loop
+      ws.on :open do |event|
+        opened = true
+        if timeout
+          EM::Timer.new 3 do
+            too_long = true
+            EM.stop_event_loop
+          end
+        end
       end
 
-      EM::Timer.new 3 do
+      ws.on :close do |event|
+        close_status = [:close, event.code, event.reason]
         EM.stop_event_loop
       end
 
       yield ws
     }
 
-    assert_not_nil close_status, "Test took too long"
-    assert_equal 1000, close_status[1], "Server closed the connection unexpectedly (check server log for errors)"
+    assert opened, "Should have opened web socket"
+    assert (not too_long), "Test took too long"
+    assert_equal 1000, close_status[1], "Connection closed unexpectedly (check log for errors)"
   end
 
   test "connect with no token" do
-    opened = false
     status = nil
 
     ws_helper do |ws|
-      ws.on :open do |event|
-        opened = true
-      end
-
       ws.on :message do |event|
         d = Oj.load event.data
         status = d["status"]
@@ -58,18 +62,15 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
     end
 
-    assert opened, "Should have opened web socket"
     assert_equal 401, status
   end
 
 
   test "connect, subscribe and get response" do
-    opened = false
     status = nil
 
     ws_helper :admin do |ws|
       ws.on :open do |event|
-        opened = true
         ws.send ({method: 'subscribe'}.to_json)
       end
 
@@ -80,21 +81,18 @@ class WebsocketTest < ActionDispatch::IntegrationTest
       end
     end
 
-    assert opened, "Should have opened web socket"
     assert_equal 200, status
   end
 
   test "connect, subscribe, get event" do
-    opened = false
     state = 1
-    spec_uuid = nil
+    spec = nil
     ev_uuid = nil
 
     authorize_with :admin
 
     ws_helper :admin do |ws|
       ws.on :open do |event|
-        opened = true
         ws.send ({method: 'subscribe'}.to_json)
       end
 
@@ -104,8 +102,6 @@ class WebsocketTest < ActionDispatch::IntegrationTest
         when 1
           assert_equal 200, d["status"]
           spec = Specimen.create
-          spec.save
-          spec_uuid = spec.uuid
           state = 2
         when 2
           ev_uuid = d["object_uuid"]
@@ -115,9 +111,438 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
     end
 
-    assert opened, "Should have opened web socket"
-    assert_not_nil spec_uuid
-    assert_equal spec_uuid, ev_uuid
+    assert_not_nil spec
+    assert_equal spec.uuid, ev_uuid
+  end
+
+  test "connect, subscribe, get two events" do
+    state = 1
+    spec = nil
+    human = nil
+    spec_ev_uuid = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          state = 3
+        when 3
+          human_ev_uuid = d["object_uuid"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+  test "connect, subscribe, filter events" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          human = Human.create
+          state = 2
+        when 2
+          human_ev_uuid = d["object_uuid"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_not_nil human
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+
+  test "connect, subscribe, multiple filters" do
+    state = 1
+    spec = nil
+    human = nil
+    spec_ev_uuid = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#human']]}.to_json)
+        ws.send ({method: 'subscribe', filters: [['object_uuid', 'is_a', 'arvados#specimen']]}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          state = 2
+        when 2
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          human = Human.create
+          state = 3
+        when 3
+          spec_ev_uuid = d["object_uuid"]
+          state = 4
+        when 4
+          human_ev_uuid = d["object_uuid"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+  test "connect, subscribe, ask events starting at seq num" do
+    state = 1
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    lastid = logs(:log3).id
+    l1 = nil
+    l2 = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe', last_log_id: lastid}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          state = 2
+        when 2
+          l1 = d["object_uuid"]
+          state = 3
+        when 3
+          l2 = d["object_uuid"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_equal l1, logs(:log4).object_uuid
+    assert_equal l2, logs(:log5).object_uuid
+  end
+
+  test "connect, subscribe, get event, unsubscribe" do
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+    filter_id = nil
+
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+        EM::Timer.new 3 do
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          filter_id = d["filter_id"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe', filter_id: filter_id}.to_json)
+
+          EM::Timer.new 1 do
+            Human.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 200, d["status"]
+          state = 4
+        when 4
+          assert false, "Should not get any more events"
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_equal spec.uuid, spec_ev_uuid
+  end
+
+
+  test "connect, subscribe, get event, try to unsubscribe with bogus filter_id" do
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe', filter_id: 100000}.to_json)
+
+          EM::Timer.new 1 do
+            human = Human.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 404, d["status"]
+          state = 4
+        when 4
+          human_ev_uuid = d["object_uuid"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+  test "connect, subscribe, get event, try to unsubscribe with missing filter_id" do
+    state = 1
+    spec = nil
+    spec_ev_uuid = nil
+    human = nil
+    human_ev_uuid = nil
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          spec = Specimen.create
+          state = 2
+        when 2
+          spec_ev_uuid = d["object_uuid"]
+          ws.send ({method: 'unsubscribe'}.to_json)
+
+          EM::Timer.new 1 do
+            human = Human.create
+          end
+
+          state = 3
+        when 3
+          assert_equal 400, d["status"]
+          state = 4
+        when 4
+          human_ev_uuid = d["object_uuid"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_not_nil spec
+    assert_not_nil human
+    assert_equal spec.uuid, spec_ev_uuid
+    assert_equal human.uuid, human_ev_uuid
+  end
+
+
+  test "connected, not subscribed, no event" do
+    authorize_with :admin
+
+    ws_helper :admin, false do |ws|
+      ws.on :open do |event|
+        EM::Timer.new 1 do
+          Specimen.create
+        end
+
+        EM::Timer.new 3 do
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        assert false, "Should not get any messages, message was #{event.data}"
+      end
+    end
+  end
+
+  test "connected, not authorized to see event" do
+    state = 1
+
+    authorize_with :admin
+
+    ws_helper :active, false do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'subscribe'}.to_json)
+
+        EM::Timer.new 3 do
+          ws.close
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when 1
+          assert_equal 200, d["status"]
+          Specimen.create
+          state = 2
+        when 2
+          assert false, "Should not get any messages, message was #{event.data}"
+        end
+      end
+
+    end
+
+  end
+
+  test "connect, try bogus method" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({method: 'frobnabble'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+  test "connect, missing method" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send ({fizzbuzz: 'frobnabble'}.to_json)
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+  test "connect, send malformed request" do
+    status = nil
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        ws.send '<XML4EVER></XML4EVER>'
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        status = d["status"]
+        ws.close
+      end
+    end
+
+    assert_equal 400, status
+  end
+
+
+  test "connect, try subscribe too many filters" do
+    state = 1
+
+    authorize_with :admin
+
+    ws_helper :admin do |ws|
+      ws.on :open do |event|
+        (1..17).each do |i|
+          ws.send ({method: 'subscribe', filters: [['object_uuid', '=', i]]}.to_json)
+        end
+      end
+
+      ws.on :message do |event|
+        d = Oj.load event.data
+        case state
+        when (1..16)
+          assert_equal 200, d["status"]
+          state += 1
+        when 17
+          assert_equal 403, d["status"]
+          ws.close
+        end
+      end
+
+    end
+
+    assert_equal 17, state
+
   end
 
 end

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list