[ARVADOS] created: 2e76e6ae877fb528d666071075afe2bf4ab21035
git at public.curoverse.com
git at public.curoverse.com
Mon Oct 20 17:32:20 EDT 2014
at 2e76e6ae877fb528d666071075afe2bf4ab21035 (commit)
commit 2e76e6ae877fb528d666071075afe2bf4ab21035
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Oct 20 17:30:30 2014 -0400
4266: Fix log processing performance.
* Remove "max log table entries per job" limit (and its horrible
memory leak).
* When logs are coming fast, but under all throttle thresholds, group
them into fewer rows in the logs table.
* Optimize performance by making a short code path when any threshold
is exceeded. Now capable of ~200 MiB/s (of 100-character lines) on
lappy386.
* Bring stream buffer back up to a reasonable size, add a comment to
protect it from future style tweaks.
* Simplify time calculations by storing time-to-reopen-throttle
instead of time-throttle-last-opened.
* Log "# bytes skipped" to stderr (sysadmin logs) too, not just the
logs table.
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index b85df8c..0df93a0 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -77,9 +77,6 @@ common:
# crunch-job must be able to stat() it.
crunch_refresh_trigger: /tmp/crunch_refresh_trigger
- # Maximum number of log events that may be generated by a single job.
- crunch_limit_log_events_per_job: 65536
-
# These two settings control how frequently log events are flushed to the
# database. Log lines are buffered until either crunch_log_bytes_per_event
# has been reached or crunch_log_seconds_between_events has elapsed since
diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb
index bb20aef..c7ddaf2 100755
--- a/services/api/script/crunch-dispatch.rb
+++ b/services/api/script/crunch-dispatch.rb
@@ -342,7 +342,8 @@ class Dispatcher
stderr_flushed_at: Time.new(0),
bytes_logged: 0,
events_logged: 0,
- log_throttle_timestamp: Time.new(0),
+ log_throttle_is_open: true,
+ log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
log_throttle_bytes_so_far: 0,
log_throttle_lines_so_far: 0,
log_throttle_bytes_skipped: 0,
@@ -356,46 +357,46 @@ class Dispatcher
# the log line should go to output or not. Modifies "line" in place to
# replace it with an error if a logging limit is tripped.
def rate_limit running_job, line
- if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job
- # Don't log anything if the hard cap has already been exceeded
- return false
- end
-
- now = Time.now
- throttle_period = Rails.configuration.crunch_log_throttle_period
-
- if running_job[:log_throttle_bytes_skipped] > 0
- # We've skipped some log in the current time period already, so continue to
- # skip the log
- running_job[:log_throttle_bytes_skipped] += line.size
- return false
- end
-
- # Count lines and bytes logged in this period, and total bytes logged for the job
- running_job[:log_throttle_lines_so_far] += 1
- running_job[:log_throttle_bytes_so_far] += line.size
- running_job[:bytes_logged] += line.size
-
- if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes or
- running_job[:log_throttle_lines_so_far] > Rails.configuration.crunch_log_throttle_lines
- # We've exceeded the per-period throttle, so start skipping
- running_job[:log_throttle_bytes_skipped] += line.size
-
- # Replace log line with a message about skipping the log
- remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp])
- if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes
- line.replace "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{throttle_period} seconds (crunch_log_throttle_bytes), logging will be silenced for the next #{remaining_time.round} seconds\n"
- else
- line.replace "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds\n"
+ message = false
+ linesize = line.size
+ if running_job[:log_throttle_is_open]
+ running_job[:log_throttle_lines_so_far] += 1
+ running_job[:log_throttle_bytes_so_far] += linesize
+ running_job[:bytes_logged] += linesize
+
+ if (running_job[:bytes_logged] >
+ Rails.configuration.crunch_limit_log_bytes_per_job)
+ message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
+ running_job[:log_throttle_reset_time] = Time.now + 100.years
+ running_job[:log_throttle_is_open] = false
+
+ elsif (running_job[:log_throttle_bytes_so_far] >
+ Rails.configuration.crunch_log_throttle_bytes)
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ running_job[:log_throttle_is_open] = false
+
+ elsif (running_job[:log_throttle_lines_so_far] >
+ Rails.configuration.crunch_log_throttle_lines)
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ running_job[:log_throttle_is_open] = false
end
end
- if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job
- # Replace log line with a message about truncating the log
- line.replace "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
+ if not running_job[:log_throttle_is_open]
+ # Don't log anything if any limit has been exceeded. Just count lossage.
+ running_job[:log_throttle_bytes_skipped] += linesize
end
- true
+ if message
+ # Yes, write to logs, but use our "rate exceeded" message
+ # instead of the log message that exceeded the limit.
+ line.replace message
+ true
+ else
+ running_job[:log_throttle_is_open]
+ end
end
def read_pipes
@@ -403,59 +404,79 @@ class Dispatcher
job = j[:job]
now = Time.now
- if (now - j[:log_throttle_timestamp]) > Rails.configuration.crunch_log_throttle_period
- # It has been more than throttle_period seconds since the last checkpoint so reset the
- # throttle
+ if now > j[:log_throttle_reset_time]
+ # It has been more than throttle_period seconds since the last
+ # checkpoint so reset the throttle
if j[:log_throttle_bytes_skipped] > 0
- j[:stderr_buf_to_flush] << "Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
+ message = "#{job_uuid} ! Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
+ $stderr.puts message
+ j[:stderr_buf_to_flush] << "#{Time.now.ctime.to_s} #{message}\n"
end
- j[:log_throttle_timestamp] = now
+ j[:log_throttle_reset_time] = now + Rails.configuration.crunch_log_throttle_period
j[:log_throttle_bytes_so_far] = 0
j[:log_throttle_lines_so_far] = 0
j[:log_throttle_bytes_skipped] = 0
+ j[:log_throttle_is_open] = true
end
j[:buf].each do |stream, streambuf|
# Read some data from the child stream
- buf = false
+ buf = ''
begin
- buf = j[stream].read_nonblock(2**16)
+ # It's important to use a big enough buffer here. When we're
+ # being flooded with logs, we must read and discard many
+ # bytes at once. Otherwise, we can easily peg a CPU with
+ # time-checking and other loop overhead. (Quick tests show a
+ # 1MiB buffer working 2.5x as fast as a 64 KiB buffer.)
+ #
+ # So don't reduce this buffer size!
+ buf = j[stream].read_nonblock(2**20)
rescue Errno::EAGAIN, EOFError
end
- if buf
- # Add to the stream buffer
- streambuf << buf
-
- # Check for at least one complete line
- if streambuf.index "\n"
- lines = streambuf.lines("\n").to_a
-
- # check if the last line is partial or not
- streambuf.replace(if streambuf[-1] == "\n"
- '' # ends on a newline
- else
- lines.pop # Put the partial line back into the buffer
- end)
-
- # Now spool the lines to the log output buffer
- lines.each do |line|
- # rate_limit returns true or false as to whether to actually log
- # the line or not. It also modifies "line" in place to replace
- # it with an error if a logging limit is tripped.
- if rate_limit j, line
- $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
- $stderr.puts line
- pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
- j[:stderr_buf_to_flush] << pub_msg
- end
- # Send log output to the logs table
- write_log j
+ # Short circuit the counting code if we're just going to throw
+ # away the data anyway.
+ if not j[:log_throttle_is_open]
+ j[:log_throttle_bytes_skipped] += streambuf.size + buf.size
+ streambuf.replace ''
+ elsif buf == ''
+ next
+ end
+
+ # Add to the stream buffer
+ streambuf << buf
+
+ # Check for at least one complete line
+ if streambuf.index "\n"
+ lines = streambuf.lines("\n").to_a
+
+ # check if the last line is partial or not
+ streambuf.replace(if streambuf[-1] == "\n"
+ '' # ends on a newline
+ else
+ lines.pop # Put the partial line back into the buffer
+ end)
+
+ # Now spool the lines to the log output buffer
+ lines.each do |line|
+ # rate_limit returns true or false as to whether to actually log
+ # the line or not. It also modifies "line" in place to replace
+ # it with an error if a logging limit is tripped.
+ if rate_limit j, line
+ $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
+ $stderr.puts line
+ pub_msg = "#{Time.now.ctime.to_s} #{line.strip}\n"
+ j[:stderr_buf_to_flush] << pub_msg
end
end
end
end
+ # Flush buffered logs to the logs table, if appropriate. We have
+ # to do this even if we didn't collect any new logs this time:
+ # otherwise, buffered data older than seconds_between_events
+ # won't get flushed until new data arrives.
+ write_log j
end
end
@@ -498,10 +519,12 @@ class Dispatcher
$stderr.puts "dispatch: child #{pid_done} exit"
$stderr.puts "dispatch: job #{job_done.uuid} end"
- # Ensure every last drop of stdout and stderr is consumed
+ # Ensure every last drop of stdout and stderr is consumed.
read_pipes
- j_done[:stderr_flushed_at] = Time.new(0) # reset flush timestamp to make sure log gets written
- write_log j_done # write any remaining logs
+ # Reset flush timestamp to make sure log gets written.
+ j_done[:stderr_flushed_at] = Time.new(0)
+ # Write any remaining logs.
+ write_log j_done
j_done[:buf].each do |stream, streambuf|
if streambuf != ''
@@ -611,7 +634,6 @@ class Dispatcher
# send message to log table. we want these records to be transient
def write_log running_job
return if running_job[:stderr_buf_to_flush] == ''
- return if running_job[:events_logged] > Rails.configuration.crunch_limit_log_events_per_job
# Send out to log event if buffer size exceeds the bytes per event or if
# it has been at least crunch_log_seconds_between_events seconds since
@@ -619,11 +641,6 @@ class Dispatcher
if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
(Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
begin
- # Just reached crunch_limit_log_events_per_job so replace log with notification.
- if running_job[:events_logged] == Rails.configuration.crunch_limit_log_events_per_job
- running_job[:stderr_buf_to_flush] =
- "Exceeded live log limit #{Rails.configuration.crunch_limit_log_events_per_job} events (crunch_limit_log_events_per_job). Live log will be truncated."
- end
log = Log.new(object_uuid: running_job[:job].uuid,
event_type: 'stderr',
owner_uuid: running_job[:job].owner_uuid,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list