[ARVADOS] updated: 4344941f8410ed21befead8d6b8d2471d291032f

Git user git at public.curoverse.com
Tue Apr 25 14:08:12 EDT 2017


Summary of changes:
 services/crunch-run/crunchrun.go |  13 ++---
 services/crunch-run/logging.go   | 104 +++++++++++++++++++++++++++++----------
 2 files changed, 81 insertions(+), 36 deletions(-)

       via  4344941f8410ed21befead8d6b8d2471d291032f (commit)
      from  9dabca0eedbc9f842d542fea3463a441140d590c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4344941f8410ed21befead8d6b8d2471d291032f
Author: radhika <radhika at curoverse.com>
Date:   Tue Apr 25 14:06:55 2017 -0400

    8019: load rate limiting config parameters from discovery document once per log writer creation.

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index b95ae1b..272ad0b 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -630,12 +630,8 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
 
 // Get and save the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
-	w := &ArvLogWriter{
-		ArvClient:     runner.ArvClient,
-		UUID:          runner.Container.UUID,
-		loggingStream: "container",
-		writeCloser:   runner.LogCollection.Open("container.json"),
-	}
+	w := NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "container",
+		runner.LogCollection.Open("container.json"))
 	// Get Container record JSON from the API Server
 	reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
 	if err != nil {
@@ -1061,8 +1057,7 @@ func (runner *ContainerRunner) CommitLogs() error {
 	// point, but re-open crunch log with ArvClient in case there are any
 	// other further (such as failing to write the log to Keep!) while
 	// shutting down
-	runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
-		UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+	runner.CrunchLog = NewThrottledLogger(NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "crunch-run", nil))
 
 	if runner.LogsPDH != nil {
 		// If we have already assigned something to LogsPDH,
@@ -1149,7 +1144,7 @@ func (runner *ContainerRunner) IsCancelled() bool {
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-	return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, writeCloser: runner.LogCollection.Open(name + ".txt")}
+	return NewArvLogWriter(runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name+".txt"))
 }
 
 // Run the full container lifecycle.
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index da6a9ff..22ba130 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -190,6 +190,72 @@ type ArvLogWriter struct {
 	logThrottleFirstPartialLine  bool
 	stderrBufToFlush             bytes.Buffer
 	stderrFlushedAt              time.Time
+
+	// rate limiting config parameters
+	crunchLimitLogBytesPerJob          int64
+	crunchLogThrottleBytes             int64
+	crunchLogThrottlePeriod            int
+	crunchLogThrottleLines             int64
+	crunchLogPartialLineThrottlePeriod int
+	crunchLogBytesPerEvent             int64
+	crunchLogSecondsBetweenEvents      int
+}
+
+// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
+func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
+	w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
+
+	// load the rate limit discovery config paramters
+	param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+	if err != nil {
+		w.crunchLimitLogBytesPerJob = 67108864
+	} else {
+		w.crunchLimitLogBytesPerJob = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogThrottleBytes")
+	if err != nil {
+		w.crunchLogThrottleBytes = 65536
+	} else {
+		w.crunchLogThrottleBytes = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogThrottlePeriod")
+	if err != nil {
+		w.crunchLogThrottlePeriod = 60
+	} else {
+		w.crunchLogThrottlePeriod = int(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogThrottleLines")
+	if err != nil {
+		w.crunchLogThrottleLines = 1024
+	} else {
+		w.crunchLogThrottleLines = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+	if err != nil {
+		w.crunchLogPartialLineThrottlePeriod = 5
+	} else {
+		w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogBytesPerEvent")
+	if err != nil {
+		w.crunchLogBytesPerEvent = 4096
+	} else {
+		w.crunchLogBytesPerEvent = int64(param.(float64))
+	}
+
+	param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+	if err != nil {
+		w.crunchLogSecondsBetweenEvents = 1
+	} else {
+		w.crunchLogSecondsBetweenEvents = int(param.(float64))
+	}
+
+	return w
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -200,13 +266,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 	}
 
 	// write to API after checking rate limit
-	crunchLogThrottlePeriod, err2 := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
-	crunchLogBytesPerEvent, err2 := arvlog.ArvClient.Discovery("crunchLogBytesPerEvent")
-	crunchLogSecondsBetweenEvents, err2 := arvlog.ArvClient.Discovery("crunchLogSecondsBetweenEvents")
-	if err2 != nil {
-		return 0, fmt.Errorf("%s ; %s", err1, err2)
-	}
-
 	now := time.Now()
 	bytesWritten := 0
 
@@ -217,7 +276,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 			arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped))
 		}
 
-		arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(int(crunchLogThrottlePeriod.(float64))))
+		arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
 		arvlog.logThrottleBytesSoFar = 0
 		arvlog.logThrottleLinesSoFar = 0
 		arvlog.logThrottleBytesSkipped = 0
@@ -246,8 +305,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 		arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
 	}
 
-	if arvlog.stderrBufToFlush.Len() > int(crunchLogBytesPerEvent.(float64)) ||
-		(time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(int64(crunchLogSecondsBetweenEvents.(float64)))) {
+	if int64(arvlog.stderrBufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
+		(time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
 		// write to API
 		lr := arvadosclient.Dict{"log": arvadosclient.Dict{
 			"object_uuid": arvlog.UUID,
@@ -287,21 +346,12 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
 	skipCounts := false
 
 	if arvlog.logThrottleIsOpen {
-		crunchLogPartialLineThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogPartialLineThrottlePeriod")
-		crunchLimitLogBytesPerJob, err := arvlog.ArvClient.Discovery("crunchLimitLogBytesPerJob")
-		crunchLogThrottleBytes, err := arvlog.ArvClient.Discovery("crunchLogThrottleBytes")
-		crunchLogThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
-		crunchLogThrottleLines, err := arvlog.ArvClient.Discovery("crunchLogThrottleLines")
-		if err != nil {
-			return false, []byte(""), err
-		}
-
 		matches := lineRegexp.FindStringSubmatch(string(line))
 
 		if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
 			partialLine = true
 
-			if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(int(crunchLogPartialLineThrottlePeriod.(float64))))) {
+			if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
 				arvlog.logThrottlePartialLineLastAt = time.Now()
 			} else {
 				skipCounts = true
@@ -314,24 +364,24 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
 			arvlog.bytesLogged += lineSize
 		}
 
-		if arvlog.bytesLogged > int64(crunchLimitLogBytesPerJob.(float64)) {
-			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), int(crunchLimitLogBytesPerJob.(float64)))
+		if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
+			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLimitLogBytesPerJob)
 			arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
 			arvlog.logThrottleIsOpen = false
 
-		} else if arvlog.logThrottleBytesSoFar > int64(crunchLogThrottleBytes.(float64)) {
+		} else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
 			remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
-			message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogThrottleBytes.(float64)), int(crunchLogThrottlePeriod.(float64)), remainingTime/time.Second)
+			message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
 			arvlog.logThrottleIsOpen = false
 
-		} else if arvlog.logThrottleLinesSoFar > int64(crunchLogThrottleLines.(float64)) {
+		} else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
 			remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
-			message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogThrottleLines.(float64)), int(crunchLogThrottlePeriod.(float64)), remainingTime/time.Second)
+			message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
 			arvlog.logThrottleIsOpen = false
 
 		} else if partialLine && arvlog.logThrottleFirstPartialLine {
 			arvlog.logThrottleFirstPartialLine = false
-			message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), int(crunchLogPartialLineThrottlePeriod.(float64)))
+			message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
 
 		}
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list