[ARVADOS] updated: f1fb39153bc234b49ca77674a29abc029b12774a

Git user git at public.curoverse.com
Fri Apr 21 17:25:47 EDT 2017


Summary of changes:
 services/crunch-run/crunchrun.go      |  14 +--
 services/crunch-run/crunchrun_test.go |  11 ++-
 services/crunch-run/logging.go        | 159 ++++++++++++++++++++++++++++++++--
 services/crunch-run/logging_test.go   |  24 +++++
 4 files changed, 191 insertions(+), 17 deletions(-)

       via  f1fb39153bc234b49ca77674a29abc029b12774a (commit)
      from  5e4bf6836d9762ef5552111776dcbade99f2e680 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f1fb39153bc234b49ca77674a29abc029b12774a
Author: radhika <radhika at curoverse.com>
Date:   Fri Apr 21 17:25:03 2017 -0400

    8019: rateLimit crunch-run logging using API configuration parameters

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index fd2ce3f..b95ae1b 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -631,10 +631,10 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
 // Get and save the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
 	w := &ArvLogWriter{
-		runner.ArvClient,
-		runner.Container.UUID,
-		"container",
-		runner.LogCollection.Open("container.json"),
+		ArvClient:     runner.ArvClient,
+		UUID:          runner.Container.UUID,
+		loggingStream: "container",
+		writeCloser:   runner.LogCollection.Open("container.json"),
 	}
 	// Get Container record JSON from the API Server
 	reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
@@ -1061,8 +1061,8 @@ func (runner *ContainerRunner) CommitLogs() error {
 	// point, but re-open crunch log with ArvClient in case there are any
 	// other further (such as failing to write the log to Keep!) while
 	// shutting down
-	runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
-		"crunch-run", nil})
+	runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+		UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
 
 	if runner.LogsPDH != nil {
 		// If we have already assigned something to LogsPDH,
@@ -1149,7 +1149,7 @@ func (runner *ContainerRunner) IsCancelled() bool {
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-	return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+	return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, writeCloser: runner.LogCollection.Open(name + ".txt")}
 }
 
 // Run the full container lifecycle.
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 43c55b6..5bfbf79 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -249,7 +249,16 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
 	return nil
 }
 
-var discoveryMap = map[string]interface{}{"defaultTrashLifetime": float64(1209600)}
+var discoveryMap = map[string]interface{}{
+	"defaultTrashLifetime":               float64(1209600),
+	"crunchLimitLogBytesPerJob":          float64(67108864),
+	"crunchLogThrottleBytes":             float64(65536),
+	"crunchLogThrottlePeriod":            float64(60),
+	"crunchLogThrottleLines":             float64(1024),
+	"crunchLogPartialLineThrottlePeriod": float64(5),
+	"crunchLogBytesPerEvent":             float64(4096),
+	"crunchLogSecondsBetweenEvents":      float64(1),
+}
 
 func (client *ArvTestClient) Discovery(key string) (interface{}, error) {
 	return discoveryMap[key], nil
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 5254ff6..a66525e 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -6,6 +6,8 @@ import (
 	"fmt"
 	"io"
 	"log"
+	"regexp"
+	"strings"
 	"sync"
 	"time"
 
@@ -176,6 +178,18 @@ type ArvLogWriter struct {
 	UUID          string
 	loggingStream string
 	writeCloser   io.WriteCloser
+
+	// for rate limiting
+	bytesLogged                  int64
+	logThrottleResetTime         time.Time
+	logThrottleLinesSoFar        int64
+	logThrottleBytesSoFar        int64
+	logThrottleBytesSkipped      int64
+	logThrottleIsOpen            bool
+	logThrottlePartialLineLastAt time.Time
+	logThrottleFirstPartialLine  bool
+	stderrBufToFlush             bytes.Buffer
+	stderrFlushedAt              time.Time
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -185,17 +199,72 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 		_, err1 = arvlog.writeCloser.Write(p)
 	}
 
-	// write to API
-	lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-		"object_uuid": arvlog.UUID,
-		"event_type":  arvlog.loggingStream,
-		"properties":  map[string]string{"text": string(p)}}}
-	err2 := arvlog.ArvClient.Create("logs", lr, nil)
-
-	if err1 != nil || err2 != nil {
+	// write to API after checking rate limit
+	crunchLogThrottlePeriod, err2 := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
+	crunchLogBytesPerEvent, err2 := arvlog.ArvClient.Discovery("crunchLogBytesPerEvent")
+	crunchLogSecondsBetweenEvents, err2 := arvlog.ArvClient.Discovery("crunchLogSecondsBetweenEvents")
+	if err2 != nil {
 		return 0, fmt.Errorf("%s ; %s", err1, err2)
 	}
-	return len(p), nil
+
+	now := time.Now()
+	bytesWritten := 0
+
+	if now.After(arvlog.logThrottleResetTime) {
+		// It has been more than throttle_period seconds since the last
+		// checkpoint; so reset the throttle
+		if arvlog.logThrottleBytesSkipped > 0 {
+			arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now()), arvlog.logThrottleBytesSkipped))
+		}
+
+		arvlog.logThrottleResetTime = time.Now().Add(time.Duration(int(crunchLogThrottlePeriod.(float64))))
+		arvlog.logThrottleBytesSoFar = 0
+		arvlog.logThrottleLinesSoFar = 0
+		arvlog.logThrottleBytesSkipped = 0
+		arvlog.logThrottleIsOpen = true
+		arvlog.logThrottlePartialLineLastAt = time.Time{}
+		arvlog.logThrottleFirstPartialLine = true
+	}
+
+	lines := bytes.Split(p, []byte("\n"))
+
+	for _, line := range lines {
+		// Short circuit the counting code if we're just going to throw
+		// away the data anyway.
+		if !arvlog.logThrottleIsOpen {
+			arvlog.logThrottleBytesSkipped += int64(len(line))
+			continue
+		} else if len(line) == 0 {
+			continue
+		}
+
+		// check rateLimit
+		_, msg, err2 := arvlog.rateLimit(line)
+		if err2 != nil {
+			return 0, fmt.Errorf("%s ; %s", err1, err2)
+		}
+		arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
+	}
+
+	if arvlog.stderrBufToFlush.Len() > int(crunchLogBytesPerEvent.(float64)) ||
+		(time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(int64(crunchLogSecondsBetweenEvents.(float64)))) {
+		// write to API
+		lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+			"object_uuid": arvlog.UUID,
+			"event_type":  arvlog.loggingStream,
+			"properties":  map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
+		err2 := arvlog.ArvClient.Create("logs", lr, nil)
+
+		bytesWritten = arvlog.stderrBufToFlush.Len()
+		arvlog.stderrBufToFlush = bytes.Buffer{}
+		arvlog.stderrFlushedAt = time.Now()
+
+		if err1 != nil || err2 != nil {
+			return 0, fmt.Errorf("%s ; %s", err1, err2)
+		}
+	}
+
+	return bytesWritten, nil
 }
 
 // Close the underlying writer
@@ -206,3 +275,75 @@ func (arvlog *ArvLogWriter) Close() (err error) {
 	}
 	return err
 }
+
+var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
+
+// Test for hard cap on total output and for log throttling. Returns whether
+// the log line should go to output or not. Returns message if limit exceeded.
+func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
+	message := ""
+	lineSize := int64(len(line))
+	partialLine := false
+	skipCounts := false
+	if arvlog.logThrottleIsOpen {
+		matches := lineRegexp.FindStringSubmatch(string(line))
+
+		crunchLogPartialLineThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogPartialLineThrottlePeriod")
+		crunchLimitLogBytesPerJob, err := arvlog.ArvClient.Discovery("crunchLimitLogBytesPerJob")
+		crunchLogThrottleBytes, err := arvlog.ArvClient.Discovery("crunchLogThrottleBytes")
+		crunchLogThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
+		crunchLogThrottleLines, err := arvlog.ArvClient.Discovery("crunchLogThrottleLines")
+		if err != nil {
+			return false, []byte(""), err
+		}
+
+		if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
+			partialLine = true
+
+			if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(int(crunchLogPartialLineThrottlePeriod.(float64))))) {
+				arvlog.logThrottlePartialLineLastAt = time.Now()
+			} else {
+				skipCounts = true
+			}
+		}
+
+		if !skipCounts {
+			arvlog.logThrottleLinesSoFar += 1
+			arvlog.logThrottleBytesSoFar += lineSize
+			arvlog.bytesLogged += lineSize
+		}
+
+		if arvlog.bytesLogged > int64(crunchLimitLogBytesPerJob.(float64)) {
+			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now()), int(crunchLimitLogBytesPerJob.(float64)))
+			arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
+			arvlog.logThrottleIsOpen = false
+		} else if arvlog.logThrottleBytesSoFar > int64(crunchLogThrottleBytes.(float64)) {
+			remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
+			message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleBytes, int(crunchLogThrottlePeriod.(float64)), remainingTime)
+			arvlog.logThrottleIsOpen = false
+		} else if arvlog.logThrottleLinesSoFar > int64(crunchLogThrottleLines.(float64)) {
+			remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
+			message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleLines, int(crunchLogThrottlePeriod.(float64)), remainingTime)
+			arvlog.logThrottleIsOpen = false
+		} else if partialLine && arvlog.logThrottleFirstPartialLine {
+			arvlog.logThrottleFirstPartialLine = false
+			message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now()), crunchLogPartialLineThrottlePeriod)
+		}
+	}
+
+	if !arvlog.logThrottleIsOpen {
+		// Don't log anything if any limit has been exceeded. Just count lossage.
+		arvlog.logThrottleBytesSkipped += lineSize
+	}
+
+	if message != "" {
+		// Yes, write to logs, but use our "rate exceeded" message
+		// instead of the log message that exceeded the limit.
+		message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
+		return true, []byte(message), nil
+	} else if partialLine {
+		return false, line, nil
+	} else {
+		return arvlog.logThrottleIsOpen, line, nil
+	}
+}
diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index ceb8ca8..91af16d 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -109,3 +109,27 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
 		". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunch-run.txt\n"+
 		". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
 }
+
+func (s *LoggingTestSuite) TestWriteLogsWithRateLimit(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+	cr.CrunchLog.Print("Hello world!")
+	cr.CrunchLog.Print("Goodbye")
+	cr.CrunchLog.Close()
+
+	c.Check(api.Calls, Equals, 1)
+
+	mt, err := cr.LogCollection.ManifestText()
+	c.Check(err, IsNil)
+	c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
+
+	logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+		"2015-12-29T15:51:45.000000002Z Goodbye\n"
+
+	c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+	c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
+	c.Check(string(kc.Content), Equals, logtext)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list