[ARVADOS] updated: 712968da0c8575de1ac6968772999c2439636e2a
Git user
git at public.curoverse.com
Fri Apr 28 14:50:41 EDT 2017
Summary of changes:
services/crunch-run/logging.go | 113 ++++++++++++++++++++---------------------
1 file changed, 56 insertions(+), 57 deletions(-)
via 712968da0c8575de1ac6968772999c2439636e2a (commit)
from 3acd5d7f73c24a2ea2d686588be44efb9ac056b2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 712968da0c8575de1ac6968772999c2439636e2a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Apr 28 14:50:38 2017 -0400
8019: Rework partial line throttling. Fix sending flush when buffer is ready
so it does not block.
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 45dfc2e..cdf2d6e 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -61,11 +61,6 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
tl.buf = &bytes.Buffer{}
- //if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
- // tl.pendingFlush = true
- // tl.flush <- struct{}{}
- //}
now := tl.Timestamper(time.Now().UTC())
sc := bufio.NewScanner(bytes.NewBuffer(p))
for err == nil && sc.Scan() {
@@ -81,6 +76,17 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
n = len(p)
+ if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
+ // Non-blocking send. Try send a flush if it is ready to
+ // accept it. Otherwise do nothing because a flush is already
+ // pending.
+ select {
+ case tl.flush <- struct{}{}:
+ default:
+ }
+ }
@@ -92,7 +98,7 @@ func (tl *ThrottledLogger) flusher() {
for stopping := false; !stopping; {
select {
case _, open := <-tl.flush:
- // if !open, flush tl.buf and exit the loop
+ // if !open, will flush tl.buf and exit the loop
stopping = !open
case <-ticker.C:
@@ -101,7 +107,6 @@ func (tl *ThrottledLogger) flusher() {
ready, tl.buf = tl.buf, &bytes.Buffer{}
- tl.pendingFlush = false
if ready != nil && ready.Len() > 0 {
@@ -178,13 +183,13 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
// Log throttling rate limiting config parameters
-var crunchLimitLogBytesPerJob int64
-var crunchLogThrottleBytes int64
-var crunchLogThrottlePeriod int
-var crunchLogThrottleLines int64
-var crunchLogPartialLineThrottlePeriod int
-var crunchLogBytesPerEvent int64
-var crunchLogSecondsBetweenEvents int
+var crunchLimitLogBytesPerJob int64 = 67108864
+var crunchLogThrottleBytes int64 = 65536
+var crunchLogThrottlePeriod time.Duration = time.Second * 60
+var crunchLogThrottleLines int64 = 1024
+var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
+var crunchLogBytesPerEvent int64 = 4096
+var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
@@ -202,7 +207,7 @@ type ArvLogWriter struct {
logThrottleBytesSoFar int64
logThrottleBytesSkipped int64
logThrottleIsOpen bool
- logThrottlePartialLineLastAt time.Time
+ logThrottlePartialLineNextAt time.Time
logThrottleFirstPartialLine bool
bufToFlush bytes.Buffer
bufFlushedAt time.Time
@@ -226,13 +231,11 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
- arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
+ arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
arvlog.logThrottleBytesSoFar = 0
arvlog.logThrottleLinesSoFar = 0
arvlog.logThrottleBytesSkipped = 0
arvlog.logThrottleIsOpen = true
- arvlog.logThrottlePartialLineLastAt = time.Time{}
- arvlog.logThrottleFirstPartialLine = true
lines := bytes.Split(p, []byte("\n"))
@@ -255,7 +258,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
- (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
+ (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
// write to API
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
"object_uuid": arvlog.UUID,
@@ -291,22 +294,37 @@ var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
message := ""
lineSize := int64(len(line))
- partialLine := false
if arvlog.logThrottleIsOpen {
matches := lineRegexp.FindStringSubmatch(string(line))
if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
- partialLine = true
- if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
- arvlog.logThrottlePartialLineLastAt = now
- arvlog.logThrottleFirstPartialLine = true
+ // This is a partial line.
+ if arvlog.logThrottleFirstPartialLine {
+ // Partial should be suppressed. First time this is happening for this line so provide a message instead.
+ arvlog.logThrottleFirstPartialLine = false
+ arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+ arvlog.logThrottleBytesSkipped += lineSize
+ return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
+ } else if now.After(arvlog.logThrottlePartialLineNextAt) {
+ // The throttle period has passed. Update timestamp and let it through.
+ arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+ } else {
+ // Suppress line.
+ arvlog.logThrottleBytesSkipped += lineSize
+ return false, line
+ } else {
+ // Not a partial line so reset.
+ arvlog.logThrottlePartialLineNextAt = time.Time{}
+ arvlog.logThrottleFirstPartialLine = true
- arvlog.logThrottleLinesSoFar += 1
- arvlog.logThrottleBytesSoFar += lineSize
arvlog.bytesLogged += lineSize
+ arvlog.logThrottleBytesSoFar += lineSize
+ arvlog.logThrottleLinesSoFar += 1
if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
@@ -317,20 +335,15 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
} else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
remainingTime := arvlog.logThrottleResetTime.Sub(now)
message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
- RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
} else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
remainingTime := arvlog.logThrottleResetTime.Sub(now)
message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
- RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
- } else if partialLine && arvlog.logThrottleFirstPartialLine {
- arvlog.logThrottleFirstPartialLine = false
- message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
- RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
@@ -352,51 +365,37 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
// load the rate limit discovery config paramters
func loadLogThrottleParams(clnt IArvadosClient) {
param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
- if err != nil {
- crunchLimitLogBytesPerJob = 67108864
- } else {
+ if err == nil {
crunchLimitLogBytesPerJob = int64(param.(float64))
param, err = clnt.Discovery("crunchLogThrottleBytes")
- if err != nil {
- crunchLogThrottleBytes = 65536
- } else {
+ if err == nil {
crunchLogThrottleBytes = int64(param.(float64))
param, err = clnt.Discovery("crunchLogThrottlePeriod")
- if err != nil {
- crunchLogThrottlePeriod = 60
- } else {
- crunchLogThrottlePeriod = int(param.(float64))
+ if err == nil {
+ crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
param, err = clnt.Discovery("crunchLogThrottleLines")
- if err != nil {
- crunchLogThrottleLines = 1024
- } else {
+ if err == nil {
crunchLogThrottleLines = int64(param.(float64))
param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
- if err != nil {
- crunchLogPartialLineThrottlePeriod = 5
- } else {
- crunchLogPartialLineThrottlePeriod = int(param.(float64))
+ if err == nil {
+ crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
param, err = clnt.Discovery("crunchLogBytesPerEvent")
- if err != nil {
- crunchLogBytesPerEvent = 4096
- } else {
+ if err == nil {
crunchLogBytesPerEvent = int64(param.(float64))
param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
- if err != nil {
- crunchLogSecondsBetweenEvents = 1
- } else {
- crunchLogSecondsBetweenEvents = int(param.(float64))
+ if err == nil {
+ crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
More information about the arvados-commits
mailing list