[ARVADOS] created: aa920f1659aa830861c196617fe664f35b5c12ef

git at public.curoverse.com git at public.curoverse.com
Mon Apr 21 17:12:55 EDT 2014


        at  aa920f1659aa830861c196617fe664f35b5c12ef (commit)


commit aa920f1659aa830861c196617fe664f35b5c12ef
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Apr 21 16:53:42 2014 -0400

    Added code to properly initialize EventMachine to deal with Passenger forking.
    Added postgres NOTIFY to ArvadosModel#log_change
    Added postgres LISTEN to background thread, which posts a message to the websocket.
    Notification works!

diff --git a/services/api/app/middlewares/rack_socket.rb b/services/api/app/middlewares/rack_socket.rb
index 7fcba83..779c675 100644
--- a/services/api/app/middlewares/rack_socket.rb
+++ b/services/api/app/middlewares/rack_socket.rb
@@ -1,38 +1,101 @@
+require 'rack'
+require 'faye/websocket'
+require 'oj'
+require 'eventmachine'
 
-  require 'faye/websocket'
+class RackSocket
 
-  class RackSocket
+  DEFAULT_ENDPOINT  = '/websocket'
 
-    DEFAULT_ENDPOINT  = '/websocket'
+  def die_gracefully_on_signal
+    Signal.trap("INT") { EM.stop }
+    Signal.trap("TERM") { EM.stop }
+  end
+
+  def initialize(app = nil, options = nil)
+    @app = app if app.respond_to?(:call)
+    @options = [app, options].grep(Hash).first || {}
+    @endpoint = @options[:mount] || DEFAULT_ENDPOINT
 
-    def initialize(app = nil, options = nil)
-      @app = app if app.respond_to?(:call)
-      @options = [app, options].grep(Hash).first || {}
-      @endpoint = @options[:mount] || DEFAULT_ENDPOINT
+    # from https://gist.github.com/eatenbyagrue/1338545#file-eventmachine-rb
+    if defined?(PhusionPassenger)
+      PhusionPassenger.on_event(:starting_worker_process) do |forked|
+        # for passenger, we need to avoid orphaned threads
+        if forked && EM.reactor_running?
+          EM.stop
+        end
+        Thread.new {
+          EM.run
+        }
+        die_gracefully_on_signal
+      end
+    else
+      # faciliates debugging
+      Thread.abort_on_exception = true
+      # just spawn a thread and start it up
+      Thread.new {
+        EM.run
+      }
     end
 
-    def call env
-      request = Rack::Request.new(env)
-      if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
-        ws = Faye::WebSocket.new(env)
+    @channel = EventMachine::Channel.new
+    @bgthread = nil
+  end
 
-        ws.on :message do |event|
-          puts "got #{event.data}"
-          ws.send(event.data)
-        end
+  def call env
+    request = Rack::Request.new(env)
+    if request.path_info == @endpoint and Faye::WebSocket.websocket?(env)
+      ws = Faye::WebSocket.new(env)
 
-        ws.on :close do |event|
-          p [:close, event.code, event.reason]
-          ws = nil
-        end
+      sub = @channel.subscribe do |msg|
+        puts "sending #{msg}"
+        ws.send({:message => "log"}.to_json)
+      end
 
-        # Return async Rack response
-        ws.rack_response
-      else
-        @app.call env
+      ws.on :message do |event|
+        puts "got #{event.data}"
+        ws.send(event.data)
+      end
+
+      ws.on :close do |event|
+        p [:close, event.code, event.reason]
+        @channel.unsubscribe sub
+        ws = nil
+      end
+
+      unless @bgthread
+        @bgthread = true
+        Thread.new do
+          # from http://stackoverflow.com/questions/16405520/postgres-listen-notify-rails
+          ActiveRecord::Base.connection_pool.with_connection do |connection|
+            conn = connection.instance_variable_get(:@connection)
+            begin
+              conn.async_exec "LISTEN logs"
+              while true
+                conn.wait_for_notify do |channel, pid, payload|
+                  puts "Received a NOTIFY on channel #{channel}"
+                  puts "from PG backend #{pid}"
+                  puts "saying #{payload}"
+                  @channel.push true
+                end
+              end
+            ensure
+              # Don't want the connection to still be listening once we return
+              # it to the pool - could result in weird behavior for the next
+              # thread to check it out.
+              conn.async_exec "UNLISTEN *"
+            end
+          end
+        end
       end
-    end
 
+      # Return async Rack response
+      ws.rack_response
+    else
+      @app.call env
+    end
   end
 
+end
+
 
diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb
index 25d7317..f0f846c 100644
--- a/services/api/app/models/arvados_model.rb
+++ b/services/api/app/models/arvados_model.rb
@@ -295,6 +295,7 @@ class ArvadosModel < ActiveRecord::Base
     log = Log.new(event_type: event_type).fill_object(self)
     yield log
     log.save!
+    connection.execute "NOTIFY logs"
     log_start_state
   end
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list