[ARVADOS] updated: bd99df2e6c5f7bb0b75c7fa2f1a9c1c2defeca8a

git at public.curoverse.com git at public.curoverse.com
Wed Oct 1 17:12:58 EDT 2014


Summary of changes:
 .../app/controllers/arvados/v1/jobs_controller.rb  |   4 +-
 services/api/config/application.default.yml        |  25 ++-
 services/api/script/crunch-dispatch.rb             | 225 ++++++++++++---------
 3 files changed, 141 insertions(+), 113 deletions(-)

       via  bd99df2e6c5f7bb0b75c7fa2f1a9c1c2defeca8a (commit)
       via  e910d13fc5fc63a86f20be3b758c08db3d429bc6 (commit)
       via  3ff644b461b69c38418384e532e3741a07333daf (commit)
       via  9b16ff9eb231584fcfda5eed029b1c1b08a0b819 (commit)
       via  a4679fb93b4b835509cc77f9bf2fdce02d40520e (commit)
       via  4f41341675c108e9ffa4e5ba15b8bca521152a67 (commit)
       via  74bcc470f33cd47ca4106a7565871c07c40f9c00 (commit)
      from  54a951255316417a42a3bd8c77aaa0b58d180440 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit bd99df2e6c5f7bb0b75c7fa2f1a9c1c2defeca8a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 1 17:12:52 2014 -0400

    3769: Add rate_limit with log throttling logic.  Multiple configuration
    parameters to throttle excessive logging by bytes, lines, logs table events,
    and total logged output.

diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index 7185810..648d1ad 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -80,16 +80,6 @@ common:
   # Maximum number of log events that may be generated by a single job.
   crunch_limit_log_events_per_job: 65536
 
-  # Maximum number of total bytes that may be logged by a single job.
-  crunch_limit_log_event_bytes_per_job: 67108864
-
-  # The sample period for throttling logs, in seconds (see below)
-  crunch_limit_log_event_throttle_period: 60
-
-  # Maximum number of bytes that job can log over
-  # crunch_limit_log_event_throttle_period before being silenced
-  crunch_limit_log_event_throttle_rate: 65536
-
   # These two settings control how frequently log events are flushed
   # to the database.  If a job generates two or more events within
   # crunch_log_seconds_between_events, the log data is not flushed
@@ -97,6 +87,21 @@ common:
   crunch_log_bytes_per_event: 4096
   crunch_log_seconds_between_events: 1
 
+  # Maximum number of total bytes that may be logged by a single job.  Logs
+  # that are throttled (see below) are not counted against this total.
+  crunch_limit_log_bytes_per_job: 67108864
+
+  # The sample period for throttling logs, in seconds (see below)
+  crunch_log_throttle_period: 30
+
+  # Maximum number of bytes that job can log over
+  # crunch_limit_log_event_throttle_period before being silenced
+  crunch_log_throttle_bytes: 60000
+
+  # Maximum number of lines that job can log over
+  # crunch_limit_log_event_throttle_period before being silenced
+  crunch_log_throttle_lines: 1000
+
   # Path to /etc/dnsmasq.d, or false = do not update dnsmasq data.
   dnsmasq_conf_dir: false
 
diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb
index ff9d5f6..c9ce92c 100755
--- a/services/api/script/crunch-dispatch.rb
+++ b/services/api/script/crunch-dispatch.rb
@@ -334,17 +334,17 @@ class Dispatcher
         stderr: e,
         wait_thr: t,
         job: job,
-        buf: {:stdout => '', :stderr => ''},
+        buf: {:stderr => '', :stdout => ''},
         started: false,
         sent_int: 0,
         job_auth: job_auth,
         stderr_buf_to_flush: '',
-        stderr_flushed_at: 0,
+        stderr_flushed_at: Time.new(0),
         bytes_logged: 0,
         events_logged: 0,
-        log_truncated: false,
-        log_throttle_timestamp: 0,
+        log_throttle_timestamp: Time.new(0),
         log_throttle_bytes_so_far: 0,
+        log_throttle_lines_so_far: 0,
         log_throttle_bytes_skipped: 0,
       }
       i.close
@@ -352,10 +352,72 @@ class Dispatcher
     end
   end
 
+  # Test for hard cap on total output and for log throttling.  Returns whether
+  # the log line should go to output or not.  Modifies "line" in place to
+  # replace it with an error if a logging limit is tripped.
+  def rate_limit running_job, line
+    if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job
+      # Don't log anything if the hard cap has already been exceeded
+      return false
+    end
+
+    now = Time.now
+    throttle_period = Rails.configuration.crunch_log_throttle_period
+
+    #puts "Handle line at #{now - running_job[:log_throttle_timestamp]}, buf bytes #{line.size}, so far #{running_job[:log_throttle_bytes_so_far]}, throttled #{running_job[:log_throttle_bytes_skipped]}"
+
+    if running_job[:log_throttle_bytes_skipped] > 0
+      # We've skipped some log in the current time period already, so continue to
+      # skip the log
+      running_job[:log_throttle_bytes_skipped] += line.size
+      return false
+    end
+
+    # Count lines and bytes logged in this period, and total bytes logged for the job
+    running_job[:log_throttle_lines_so_far] += 1
+    running_job[:log_throttle_bytes_so_far] += line.size
+    running_job[:bytes_logged] += line.size
+
+    if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes or
+        running_job[:log_throttle_lines_so_far] > Rails.configuration.crunch_log_throttle_lines
+      # We've exceeded the per-period throttle, so start skipping
+      running_job[:log_throttle_bytes_skipped] += line.size
+
+      # Replace log line with a message about skipping the log
+      remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp])
+      if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes
+        line.replace "Exceeded crunch_log_throttle_bytes rate of #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time.round} seconds\n"
+      else
+        line.replace "Exceeded crunch_log_throttle_lines rate of #{Rails.configuration.crunch_log_throttle_lines} lines per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time.round} seconds\n"
+      end
+    end
+
+    if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job
+      # Replace log line with a message about truncating the log
+      line.replace "Exceed hard job log cap crunch_limit_log_bytes_per_job of #{Rails.configuration.crunch_limit_log_bytes_per_job}. Subsequent logs will be truncated."
+    end
+
+    true
+  end
+
   def read_pipes
     @running.each do |job_uuid, j|
       job = j[:job]
 
+      now = Time.now
+      if (now - j[:log_throttle_timestamp]) > Rails.configuration.crunch_log_throttle_period
+        # It has been more than throttle_period seconds since the last checkpoint so reset the
+        # throttle
+        if j[:log_throttle_bytes_skipped] > 0
+          j[:stderr_buf_to_flush] << "Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
+        end
+
+        j[:log_throttle_timestamp] = now
+        j[:log_throttle_bytes_so_far] = 0
+        j[:log_throttle_lines_so_far] = 0
+        j[:log_throttle_bytes_skipped] = 0
+      end
+
       j[:buf].each do |stream, streambuf|
         # Read some data from the child stream
         buf = false
@@ -365,7 +427,7 @@ class Dispatcher
         end
 
         if buf
-          # Add to a the buffer
+          # Add to the stream buffer
           streambuf << buf
 
           # Check for at least one complete line
@@ -373,30 +435,25 @@ class Dispatcher
             lines = streambuf.lines("\n").to_a
 
             # check if the last line is partial or not
-            j[:buf][stream] = if streambuf[-1] == "\n"
-                                # nope
-                                ''
+            streambuf.replace(if streambuf[-1] == "\n"
+                                ''        # ends on a newline
                               else
-                                # Put the partial line back into the buffer
-                                lines.pop
-                              end
+                                lines.pop # Put the partial line back into the buffer
+                              end)
 
             # Now spool the lines to the log output buffer
             lines.each do |line|
-              $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
-              $stderr.puts line
-              pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
-              if not j[:log_truncated]
+              # rate_limit returns true or false as to whether to actually log
+              # the line or not.  It also modifies "line" in place to replace
+              # it with an error if a logging limit is tripped.
+              if rate_limit j, line
+                $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
+                $stderr.puts line
+                pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
                 j[:stderr_buf_to_flush] << pub_msg
               end
-            end
-
-            # Now actually send the log output to the logs table
-            if not j[:log_truncated]
-              if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
-                  (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
-                write_log j
-              end
+              # Send log output to the logs table
+              write_log j
             end
           end
         end
@@ -445,6 +502,7 @@ class Dispatcher
 
     # Ensure every last drop of stdout and stderr is consumed
     read_pipes
+    j_done[:stderr_flushed_at] = Time.new(0) # reset flush timestamp to make sure log gets written
     write_log j_done # write any remaining logs
 
     j_done[:buf].each do |stream, streambuf|
@@ -540,15 +598,6 @@ class Dispatcher
 
   protected
 
-  def too_many_bytes_logged_for_job(j)
-    return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
-            Rails.configuration.crunch_limit_log_event_bytes_per_job)
-  end
-
-  def too_many_events_logged_for_job(j)
-    return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
-  end
-
   def did_recently(thing, min_interval)
     @did_recently ||= {}
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@@ -561,68 +610,34 @@ class Dispatcher
 
   # send message to log table. we want these records to be transient
   def write_log running_job
-    return if running_job[:log_truncated]
     return if running_job[:stderr_buf_to_flush] == ''
-    begin
-      now = Time.now
-      throttle_period = Rails.configuration.crunch_limit_log_event_throttle_period
+    return if running_job[:events_logged] > Rails.configuration.crunch_limit_log_events_per_job
 
-      if (now - running_job[:log_throttle_timestamp]) > throttle_period
-        # It has been more than throttle_period seconds since the last checkpoint so reset the
-        # throttle
-        if running_job[:log_throttle_bytes_skipped] > 0
-          running_job[:stderr_buf_to_flush] << "Skipped #{running_job[:log_throttle_bytes_skipped]} bytes of log"
+    # Send out to log event if buffer size exceeds the bytes per event or if
+    # it has been at least crunch_log_seconds_between_events seconds since
+    # the last flush.
+    if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
+        (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+      begin
+        # Just reached crunch_limit_log_events_per_job so replace log with notification.
+        if running_job[:events_logged] == Rails.configuration.crunch_limit_log_events_per_job
+          running_job[:stderr_buf_to_flush] =
+            "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
         end
-
-        running_job[:log_throttle_timestamp] = now
-        running_job[:log_throttle_bytes_so_far] = 0
-        running_job[:log_throttle_bytes_skipped] = 0
-      end
-
-      if running_job[:log_throttle_bytes_skipped] > 0
-        # We've skipped some log in this time period already, so continue to
-        # skip the log
-        running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
-        return
+        log = Log.new(object_uuid: running_job[:job].uuid,
+                      event_type: 'stderr',
+                      owner_uuid: running_job[:job].owner_uuid,
+                      properties: {"text" => running_job[:stderr_buf_to_flush]})
+        log.save!
+        running_job[:events_logged] += 1
+      rescue => exception
+        $stderr.puts "Failed to write logs"
+        $stderr.puts exception.backtrace
       end
-
-      # Record bytes logged so far in this period
-      running_job[:log_throttle_bytes_so_far] += running_job[:stderr_buf_to_flush].size
-
-      if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_limit_log_event_throttle_rate
-        # We've exceeded the throttle rate, so start skipping
-        running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
-
-        # Replace the message with a message about skipping the log and log that instead
-        remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp])
-        running_job[:stderr_buf_to_flush] = "Exceeded log rate of #{Rails.configuration.crunch_limit_log_event_throttle_rate} per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time} seconds\n"
-      end
-
-      # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
-      # or crunch_limit_log_events_per_job.
-      if (too_many_bytes_logged_for_job(running_job))
-        running_job[:log_truncated] = true
-        running_job[:stderr_buf_to_flush] =
-          "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
-      elsif (too_many_events_logged_for_job(running_job))
-        running_job[:log_truncated] = true
-        running_job[:stderr_buf_to_flush] =
-          "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
-      end
-      log = Log.new(object_uuid: running_job[:job].uuid,
-                    event_type: 'stderr',
-                    owner_uuid: running_job[:job].owner_uuid,
-                    properties: {"text" => running_job[:stderr_buf_to_flush]})
-      log.save!
-      running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
-      running_job[:events_logged] += 1
-    rescue
-      running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr]
+      running_job[:stderr_buf_to_flush] = ''
+      running_job[:stderr_flushed_at] = Time.now
     end
-    running_job[:stderr_buf_to_flush] = ''
-    running_job[:stderr_flushed_at] = Time.now.to_i
   end
-
 end
 
 # This is how crunch-job child procs know where the "refresh" trigger file is

commit e910d13fc5fc63a86f20be3b758c08db3d429bc6
Merge: 3ff644b a4679fb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 1 15:23:23 2014 -0400

    Merge branch '3052-crunch-log-stdout' into 3769-throttle-logs


commit 3ff644b461b69c38418384e532e3741a07333daf
Merge: 54a9512 9b16ff9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Oct 1 14:33:23 2014 -0400

    Merge branch 'master' into 3769-throttle-logs


-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list