[ARVADOS] updated: c5c09df38966595b4f27c402d1e9ae5500d6d201

Git user git at public.curoverse.com
Tue Apr 25 16:44:44 EDT 2017


Summary of changes:
 services/crunch-run/crunchrun.go |  18 +++-
 services/crunch-run/logging.go   | 173 +++++++++++++++++++--------------------
 2 files changed, 100 insertions(+), 91 deletions(-)

       via  c5c09df38966595b4f27c402d1e9ae5500d6d201 (commit)
      from  455f862a19fe0bcc8ac3c6e685a96faf747ae623 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c5c09df38966595b4f27c402d1e9ae5500d6d201
Author: radhika <radhika at curoverse.com>
Date:   Tue Apr 25 16:43:47 2017 -0400

    8019: load log throttling config params during NewContainerRunner

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 272ad0b..c9c52ee 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -630,8 +630,13 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
 
 // Get and save the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
-	w := NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "container",
-		runner.LogCollection.Open("container.json"))
+	w := &ArvLogWriter{
+		ArvClient:     runner.ArvClient,
+		UUID:          runner.Container.UUID,
+		loggingStream: "container",
+		writeCloser:   runner.LogCollection.Open("container.json"),
+	}
+
 	// Get Container record JSON from the API Server
 	reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
 	if err != nil {
@@ -1057,7 +1062,8 @@ func (runner *ContainerRunner) CommitLogs() error {
 	// point, but re-open crunch log with ArvClient in case there are any
 	// other further (such as failing to write the log to Keep!) while
 	// shutting down
-	runner.CrunchLog = NewThrottledLogger(NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "crunch-run", nil))
+	runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+		UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
 
 	if runner.LogsPDH != nil {
 		// If we have already assigned something to LogsPDH,
@@ -1144,7 +1150,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-	return NewArvLogWriter(runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name+".txt"))
+	return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+		writeCloser: runner.LogCollection.Open(name + ".txt")}
 }
 
 // Run the full container lifecycle.
@@ -1284,6 +1291,9 @@ func NewContainerRunner(api IArvadosClient,
 	cr.Container.UUID = containerUUID
 	cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
 	cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+	loadLogThrottleParams(api)
+
 	return cr
 }
 
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 6e32d72..383d8ad 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -34,9 +34,9 @@ type ThrottledLogger struct {
 	*log.Logger
 	buf *bytes.Buffer
 	sync.Mutex
-	writer   io.WriteCloser
-	stopping chan struct{}
-	stopped  chan struct{}
+	writer  io.WriteCloser
+	flush   chan struct{}
+	stopped chan struct{}
 	Timestamper
 	Immediate *log.Logger
 }
@@ -81,13 +81,13 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
 func (tl *ThrottledLogger) flusher() {
-	ticker := time.NewTicker(time.Second)
+	ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
 	defer ticker.Stop()
 	for stopping := false; !stopping; {
 		select {
-		case <-tl.stopping:
-			// flush tl.buf, then exit the loop
-			stopping = true
+		case _, open := <-tl.flush:
+			// if !open, flush tl.buf and exit the loop
+			stopping = !open
 		case <-ticker.C:
 		}
 
@@ -108,10 +108,10 @@ func (tl *ThrottledLogger) flusher() {
 // underlying Writer.
 func (tl *ThrottledLogger) Close() error {
 	select {
-	case <-tl.stopping:
+	case <-tl.flush:
 		// already stopped
 	default:
-		close(tl.stopping)
+		close(tl.flush)
 	}
 	<-tl.stopped
 	return tl.writer.Close()
@@ -161,7 +161,7 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 // per second.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	tl := &ThrottledLogger{}
-	tl.stopping = make(chan struct{})
+	tl.flush = make(chan struct{}, 1)
 	tl.stopped = make(chan struct{})
 	tl.writer = writer
 	tl.Logger = log.New(tl, "", 0)
@@ -170,6 +170,15 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	return tl
 }
 
+// Log throttling rate limiting config parameters
+var crunchLimitLogBytesPerJob int64
+var crunchLogThrottleBytes int64
+var crunchLogThrottlePeriod int
+var crunchLogThrottleLines int64
+var crunchLogPartialLineThrottlePeriod int
+var crunchLogBytesPerEvent int64
+var crunchLogSecondsBetweenEvents int
+
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
 // CollectionFileWriter) and creating an Arvados log entry.
@@ -190,72 +199,6 @@ type ArvLogWriter struct {
 	logThrottleFirstPartialLine  bool
 	bufToFlush                   bytes.Buffer
 	bufFlushedAt                 time.Time
-
-	// rate limiting config parameters
-	crunchLimitLogBytesPerJob          int64
-	crunchLogThrottleBytes             int64
-	crunchLogThrottlePeriod            int
-	crunchLogThrottleLines             int64
-	crunchLogPartialLineThrottlePeriod int
-	crunchLogBytesPerEvent             int64
-	crunchLogSecondsBetweenEvents      int
-}
-
-// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
-func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
-	w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
-
-	// load the rate limit discovery config paramters
-	param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
-	if err != nil {
-		w.crunchLimitLogBytesPerJob = 67108864
-	} else {
-		w.crunchLimitLogBytesPerJob = int64(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogThrottleBytes")
-	if err != nil {
-		w.crunchLogThrottleBytes = 65536
-	} else {
-		w.crunchLogThrottleBytes = int64(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogThrottlePeriod")
-	if err != nil {
-		w.crunchLogThrottlePeriod = 60
-	} else {
-		w.crunchLogThrottlePeriod = int(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogThrottleLines")
-	if err != nil {
-		w.crunchLogThrottleLines = 1024
-	} else {
-		w.crunchLogThrottleLines = int64(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
-	if err != nil {
-		w.crunchLogPartialLineThrottlePeriod = 5
-	} else {
-		w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogBytesPerEvent")
-	if err != nil {
-		w.crunchLogBytesPerEvent = 4096
-	} else {
-		w.crunchLogBytesPerEvent = int64(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
-	if err != nil {
-		w.crunchLogSecondsBetweenEvents = 1
-	} else {
-		w.crunchLogSecondsBetweenEvents = int(param.(float64))
-	}
-
-	return w
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -276,7 +219,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 			arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
 		}
 
-		arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+		arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
 		arvlog.logThrottleBytesSoFar = 0
 		arvlog.logThrottleLinesSoFar = 0
 		arvlog.logThrottleBytesSkipped = 0
@@ -303,8 +246,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 		arvlog.logThrottleIsOpen = logOpen
 	}
 
-	if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
-		(now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+	if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+		(now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
 		// write to API
 		lr := arvadosclient.Dict{"log": arvadosclient.Dict{
 			"object_uuid": arvlog.UUID,
@@ -349,7 +292,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 		if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
 			partialLine = true
 
-			if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+			if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
 				arvlog.logThrottlePartialLineLastAt = now
 			} else {
 				skipCounts = true
@@ -362,24 +305,28 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 			arvlog.bytesLogged += lineSize
 		}
 
-		if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
-			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+		if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
+			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
+				RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
 			arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
 			arvlog.logThrottleIsOpen = false
 
-		} else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
+		} else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
 			remainingTime := arvlog.logThrottleResetTime.Sub(now)
-			message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+			message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
+				RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
 			arvlog.logThrottleIsOpen = false
 
-		} else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
+		} else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
 			remainingTime := arvlog.logThrottleResetTime.Sub(now)
-			message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+			message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
+				RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
 			arvlog.logThrottleIsOpen = false
 
 		} else if partialLine && arvlog.logThrottleFirstPartialLine {
 			arvlog.logThrottleFirstPartialLine = false
-			message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+			message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+				RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
 
 		}
 	}
@@ -400,3 +347,55 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 		return arvlog.logThrottleIsOpen, line
 	}
 }
+
+// load the rate limit discovery config paramters
+func loadLogThrottleParams(clnt IArvadosClient) {
+	param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+	if err != nil {
+		crunchLimitLogBytesPerJob = 67108864
+	} else {
+		crunchLimitLogBytesPerJob = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogThrottleBytes")
+	if err != nil {
+		crunchLogThrottleBytes = 65536
+	} else {
+		crunchLogThrottleBytes = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogThrottlePeriod")
+	if err != nil {
+		crunchLogThrottlePeriod = 60
+	} else {
+		crunchLogThrottlePeriod = int(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogThrottleLines")
+	if err != nil {
+		crunchLogThrottleLines = 1024
+	} else {
+		crunchLogThrottleLines = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+	if err != nil {
+		crunchLogPartialLineThrottlePeriod = 5
+	} else {
+		crunchLogPartialLineThrottlePeriod = int(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogBytesPerEvent")
+	if err != nil {
+		crunchLogBytesPerEvent = 4096
+	} else {
+		crunchLogBytesPerEvent = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+	if err != nil {
+		crunchLogSecondsBetweenEvents = 1
+	} else {
+		crunchLogSecondsBetweenEvents = int(param.(float64))
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list