[ARVADOS] updated: 712968da0c8575de1ac6968772999c2439636e2a

Git user git at public.curoverse.com
Fri Apr 28 14:50:41 EDT 2017


Summary of changes:
 services/crunch-run/logging.go | 113 ++++++++++++++++++++---------------------
 1 file changed, 56 insertions(+), 57 deletions(-)

       via  712968da0c8575de1ac6968772999c2439636e2a (commit)
      from  3acd5d7f73c24a2ea2d686588be44efb9ac056b2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 712968da0c8575de1ac6968772999c2439636e2a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Apr 28 14:50:38 2017 -0400

    8019: Rework partial line throttling.  Fix sending flush when buffer is ready
    so it does not block.

diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 45dfc2e..cdf2d6e 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -61,11 +61,6 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 		tl.buf = &bytes.Buffer{}
 	}
 
-	//if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
-	//	tl.pendingFlush = true
-	//	tl.flush <- struct{}{}
-	//}
-
 	now := tl.Timestamper(time.Now().UTC())
 	sc := bufio.NewScanner(bytes.NewBuffer(p))
 	for err == nil && sc.Scan() {
@@ -81,6 +76,17 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 			n = len(p)
 		}
 	}
+
+	if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
+		// Non-blocking send.  Try send a flush if it is ready to
+		// accept it.  Otherwise do nothing because a flush is already
+		// pending.
+		select {
+		case tl.flush <- struct{}{}:
+		default:
+		}
+	}
+
 	return
 }
 
@@ -92,7 +98,7 @@ func (tl *ThrottledLogger) flusher() {
 	for stopping := false; !stopping; {
 		select {
 		case _, open := <-tl.flush:
-			// if !open, flush tl.buf and exit the loop
+			// if !open, will flush tl.buf and exit the loop
 			stopping = !open
 		case <-ticker.C:
 		}
@@ -101,7 +107,6 @@ func (tl *ThrottledLogger) flusher() {
 
 		tl.Mutex.Lock()
 		ready, tl.buf = tl.buf, &bytes.Buffer{}
-		tl.pendingFlush = false
 		tl.Mutex.Unlock()
 
 		if ready != nil && ready.Len() > 0 {
@@ -178,13 +183,13 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 }
 
 // Log throttling rate limiting config parameters
-var crunchLimitLogBytesPerJob int64
-var crunchLogThrottleBytes int64
-var crunchLogThrottlePeriod int
-var crunchLogThrottleLines int64
-var crunchLogPartialLineThrottlePeriod int
-var crunchLogBytesPerEvent int64
-var crunchLogSecondsBetweenEvents int
+var crunchLimitLogBytesPerJob int64 = 67108864
+var crunchLogThrottleBytes int64 = 65536
+var crunchLogThrottlePeriod time.Duration = time.Second * 60
+var crunchLogThrottleLines int64 = 1024
+var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
+var crunchLogBytesPerEvent int64 = 4096
+var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
 
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
@@ -202,7 +207,7 @@ type ArvLogWriter struct {
 	logThrottleBytesSoFar        int64
 	logThrottleBytesSkipped      int64
 	logThrottleIsOpen            bool
-	logThrottlePartialLineLastAt time.Time
+	logThrottlePartialLineNextAt time.Time
 	logThrottleFirstPartialLine  bool
 	bufToFlush                   bytes.Buffer
 	bufFlushedAt                 time.Time
@@ -226,13 +231,11 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 			arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
 		}
 
-		arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
+		arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
 		arvlog.logThrottleBytesSoFar = 0
 		arvlog.logThrottleLinesSoFar = 0
 		arvlog.logThrottleBytesSkipped = 0
 		arvlog.logThrottleIsOpen = true
-		arvlog.logThrottlePartialLineLastAt = time.Time{}
-		arvlog.logThrottleFirstPartialLine = true
 	}
 
 	lines := bytes.Split(p, []byte("\n"))
@@ -255,7 +258,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 	}
 
 	if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
-		(now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
+		(now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
 		// write to API
 		lr := arvadosclient.Dict{"log": arvadosclient.Dict{
 			"object_uuid": arvlog.UUID,
@@ -291,22 +294,37 @@ var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
 	message := ""
 	lineSize := int64(len(line))
-	partialLine := false
 
 	if arvlog.logThrottleIsOpen {
 		matches := lineRegexp.FindStringSubmatch(string(line))
 
 		if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
-			partialLine = true
-			if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
-				arvlog.logThrottlePartialLineLastAt = now
-				arvlog.logThrottleFirstPartialLine = true
+			// This is a partial line.
+
+			if arvlog.logThrottleFirstPartialLine {
+				// Partial should be suppressed.  First time this is happening for this line so provide a message instead.
+				arvlog.logThrottleFirstPartialLine = false
+				arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+				arvlog.logThrottleBytesSkipped += lineSize
+				return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+					RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
+			} else if now.After(arvlog.logThrottlePartialLineNextAt) {
+				// The throttle period has passed.  Update timestamp and let it through.
+				arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+			} else {
+				// Suppress line.
+				arvlog.logThrottleBytesSkipped += lineSize
+				return false, line
 			}
+		} else {
+			// Not a partial line so reset.
+			arvlog.logThrottlePartialLineNextAt = time.Time{}
+			arvlog.logThrottleFirstPartialLine = true
 		}
 
-		arvlog.logThrottleLinesSoFar += 1
-		arvlog.logThrottleBytesSoFar += lineSize
 		arvlog.bytesLogged += lineSize
+		arvlog.logThrottleBytesSoFar += lineSize
+		arvlog.logThrottleLinesSoFar += 1
 
 		if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
 			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
@@ -317,20 +335,15 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 		} else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
 			remainingTime := arvlog.logThrottleResetTime.Sub(now)
 			message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
-				RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
+				RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
 			arvlog.logThrottleIsOpen = false
 
 		} else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
 			remainingTime := arvlog.logThrottleResetTime.Sub(now)
 			message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
-				RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
+				RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
 			arvlog.logThrottleIsOpen = false
 
-		} else if partialLine && arvlog.logThrottleFirstPartialLine {
-			arvlog.logThrottleFirstPartialLine = false
-			message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
-				RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
-
 		}
 	}
 
@@ -352,51 +365,37 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 // load the rate limit discovery config paramters
 func loadLogThrottleParams(clnt IArvadosClient) {
 	param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
-	if err != nil {
-		crunchLimitLogBytesPerJob = 67108864
-	} else {
+	if err == nil {
 		crunchLimitLogBytesPerJob = int64(param.(float64))
 	}
 
 	param, err = clnt.Discovery("crunchLogThrottleBytes")
-	if err != nil {
-		crunchLogThrottleBytes = 65536
-	} else {
+	if err == nil {
 		crunchLogThrottleBytes = int64(param.(float64))
 	}
 
 	param, err = clnt.Discovery("crunchLogThrottlePeriod")
-	if err != nil {
-		crunchLogThrottlePeriod = 60
-	} else {
-		crunchLogThrottlePeriod = int(param.(float64))
+	if err == nil {
+		crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
 	}
 
 	param, err = clnt.Discovery("crunchLogThrottleLines")
-	if err != nil {
-		crunchLogThrottleLines = 1024
-	} else {
+	if err == nil {
 		crunchLogThrottleLines = int64(param.(float64))
 	}
 
 	param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
-	if err != nil {
-		crunchLogPartialLineThrottlePeriod = 5
-	} else {
-		crunchLogPartialLineThrottlePeriod = int(param.(float64))
+	if err == nil {
+		crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
 	}
 
 	param, err = clnt.Discovery("crunchLogBytesPerEvent")
-	if err != nil {
-		crunchLogBytesPerEvent = 4096
-	} else {
+	if err == nil {
 		crunchLogBytesPerEvent = int64(param.(float64))
 	}
 
 	param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
-	if err != nil {
-		crunchLogSecondsBetweenEvents = 1
-	} else {
-		crunchLogSecondsBetweenEvents = int(param.(float64))
+	if err == nil {
+		crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
 	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list