[ARVADOS] created: 65721c02e81f29cb8e40aab4b3d35bebc4f23cc6

Git user git at public.curoverse.com
Fri Jun 10 10:46:38 EDT 2016


        at  65721c02e81f29cb8e40aab4b3d35bebc4f23cc6 (commit)


commit 65721c02e81f29cb8e40aab4b3d35bebc4f23cc6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 10 10:45:19 2016 -0400

    9388: Record every log id sent and don't send duplicates.

diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 4988d59..58188ef 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -7,6 +7,7 @@ require 'oj'
 require 'faye/websocket'
 require 'record_filters'
 require 'load_param'
+require 'set'
 
 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
 module Faye
@@ -14,6 +15,7 @@ module Faye
     attr_accessor :user
     attr_accessor :last_log_id
     attr_accessor :filters
+    attr_accessor :sent_ids
   end
 end
 
@@ -114,7 +116,9 @@ class EventBus
 
         lastid = nil
         logs.limit(limit).each do |l|
-          ws.send(l.as_api_response.to_json)
+          if ws.sent_ids.add?(l.id) != nil
+            ws.send(l.as_api_response.to_json)
+          end
           lastid = l.id
           count += 1
         end
@@ -225,6 +229,7 @@ class EventBus
     ws.user = current_user
     ws.filters = []
     ws.last_log_id = nil
+    ws.sent_ids = Set.new
 
     # Subscribe to internal postgres notifications through @channel.  This will
     # call push_events when a notification comes through.

commit b0e9a800435e6231317e5123c9b6a1b5b6397a5e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 17:55:37 2016 -0400

    9388: Process each notify individually instead attempting to batch them up.
    
    Prior to this commit, websockets used to try to send log events out in batches,
    by getting all logs with an id greater than last log that was sent.
    Unfortunately, under concurrent database writes, logs from uncommited
    transactions may not appear in the query even if logs with larger ids do
    appear.  This results in the uncommitted log never being sent out because
    subsequent batch sends would not consider logs prior to the last log id that
    was sent (which, in this case, is higher than the log that was missed.)
    
    This commit eliminates the batching behavior.  Because NOTIFY includes the log
    id of a specific record that was committed, consider only the log record with
    that id and process events in the order that the NOTIFY events arrive.  This
    means events may be delivered out of numeric order (although they now more
    closely reflect the "actual" order, e.g. the order that the events were
    actually committed to the database).
    
    "Catch ups" where the client has specified a last_log_id and needs to have past
    logs replayed continue to be sent in batches.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 347075d..988ec96 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -15,6 +15,7 @@ def main(arguments=None):
     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
     parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+    parser.add_argument('-i', '--id', type=int, default="", help="Start from given log id.")
 
     group = parser.add_mutually_exclusive_group()
     group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
@@ -67,6 +68,9 @@ def main(arguments=None):
     else:
         last_log_id = None
 
+    if args.id:
+        last_log_id = args.id-1
+
     def on_message(ev):
         global filters
         global ws
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 9bf95f5..4988d59 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -67,11 +67,6 @@ class EventBus
   # push_events call if there are more log rows to send.
   def push_events ws, notify_id
     begin
-      if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id
-        # This notify is for a row we've handled already.
-        return
-      end
-
       # Must have at least one filter set up to receive events
       if ws.filters.length > 0
         # Start with log rows readable by user, sorted in ascending order
@@ -82,13 +77,12 @@ class EventBus
         param_out = []
 
         if !ws.last_log_id.nil?
-          # Client is only interested in log rows that are newer than the
-          # last log row seen by the client.
+          # We are catching up from some starting point.
           cond_id = "logs.id > ?"
           param_out << ws.last_log_id
         elsif !notify_id.nil?
-          # No last log id, so look at rows starting with notify id
-          cond_id = "logs.id >= ?"
+          # Get new row being notified.
+          cond_id = "logs.id = ?"
           param_out << notify_id
         else
           # No log id to start from, nothing to do, return
@@ -118,9 +112,10 @@ class EventBus
         count = 0
         limit = 10
 
+        lastid = nil
         logs.limit(limit).each do |l|
           ws.send(l.as_api_response.to_json)
-          ws.last_log_id = l.id
+          lastid = l.id
           count += 1
         end
 
@@ -128,13 +123,13 @@ class EventBus
           # Number of rows returned was capped by limit(), we need to schedule
           # another query to get more logs (will start from last_log_id
           # reported by current query)
+          ws.last_log_id = lastid
           EventMachine::next_tick do
-            push_events ws, nil
+            push_events ws, notify_id
           end
-        elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
-          # Number of rows returned was less than cap, but the notify id is
-          # higher than the last id visible to the client, so update last_log_id
-          ws.last_log_id = notify_id
+        elsif !ws.last_log_id.nil?
+          # Done catching up
+          ws.last_log_id = nil
         end
       elsif !notify_id.nil?
         # No filters set up, so just record the sequence number

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list