[ARVADOS] updated: 3acd5d7f73c24a2ea2d686588be44efb9ac056b2

Git user git at public.curoverse.com
Thu Apr 27 15:45:08 EDT 2017


Summary of changes:
 services/crunch-run/logging.go | 35 ++++++++++++++++++-----------------
 1 file changed, 18 insertions(+), 17 deletions(-)

       via  3acd5d7f73c24a2ea2d686588be44efb9ac056b2 (commit)
      from  c5c09df38966595b4f27c402d1e9ae5500d6d201 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 3acd5d7f73c24a2ea2d686588be44efb9ac056b2
Author: radhika <radhika at curoverse.com>
Date:   Thu Apr 27 15:44:37 2017 -0400

    8019: partial line throttling etc

diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 383d8ad..45dfc2e 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -38,7 +38,8 @@ type ThrottledLogger struct {
 	flush   chan struct{}
 	stopped chan struct{}
 	Timestamper
-	Immediate *log.Logger
+	Immediate    *log.Logger
+	pendingFlush bool
 }
 
 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
@@ -60,6 +61,11 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 		tl.buf = &bytes.Buffer{}
 	}
 
+	//if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
+	//	tl.pendingFlush = true
+	//	tl.flush <- struct{}{}
+	//}
+
 	now := tl.Timestamper(time.Now().UTC())
 	sc := bufio.NewScanner(bytes.NewBuffer(p))
 	for err == nil && sc.Scan() {
@@ -94,7 +100,8 @@ func (tl *ThrottledLogger) flusher() {
 		var ready *bytes.Buffer
 
 		tl.Mutex.Lock()
-		ready, tl.buf = tl.buf, nil
+		ready, tl.buf = tl.buf, &bytes.Buffer{}
+		tl.pendingFlush = false
 		tl.Mutex.Unlock()
 
 		if ready != nil && ready.Len() > 0 {
@@ -157,8 +164,8 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 
 // NewThrottledLogger creates a new thottled logger that
 // (a) prepends timestamps to each line
-// (b) batches log messages and only calls the underlying Writer at most once
-// per second.
+// (b) batches log messages and only calls the underlying Writer
+//  at most once per "crunchLogSecondsBetweenEvents" seconds.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	tl := &ThrottledLogger{}
 	tl.flush = make(chan struct{}, 1)
@@ -242,8 +249,9 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 
 		// check rateLimit
 		logOpen, msg := arvlog.rateLimit(line, now)
-		arvlog.bufToFlush.WriteString(string(msg) + "\n")
-		arvlog.logThrottleIsOpen = logOpen
+		if logOpen {
+			arvlog.bufToFlush.WriteString(string(msg) + "\n")
+		}
 	}
 
 	if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
@@ -284,26 +292,21 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 	message := ""
 	lineSize := int64(len(line))
 	partialLine := false
-	skipCounts := false
 
 	if arvlog.logThrottleIsOpen {
 		matches := lineRegexp.FindStringSubmatch(string(line))
 
 		if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
 			partialLine = true
-
 			if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
 				arvlog.logThrottlePartialLineLastAt = now
-			} else {
-				skipCounts = true
+				arvlog.logThrottleFirstPartialLine = true
 			}
 		}
 
-		if !skipCounts {
-			arvlog.logThrottleLinesSoFar += 1
-			arvlog.logThrottleBytesSoFar += lineSize
-			arvlog.bytesLogged += lineSize
-		}
+		arvlog.logThrottleLinesSoFar += 1
+		arvlog.logThrottleBytesSoFar += lineSize
+		arvlog.bytesLogged += lineSize
 
 		if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
 			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
@@ -341,8 +344,6 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 		// instead of the log message that exceeded the limit.
 		message += " A complete log is still being written to Keep, and will be available when the job finishes."
 		return true, []byte(message)
-	} else if partialLine {
-		return false, line
 	} else {
 		return arvlog.logThrottleIsOpen, line
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list