[ARVADOS] updated: f1fb39153bc234b49ca77674a29abc029b12774a
Git user
git at public.curoverse.com
Fri Apr 21 17:25:47 EDT 2017
Summary of changes:
services/crunch-run/crunchrun.go | 14 +--
services/crunch-run/crunchrun_test.go | 11 ++-
services/crunch-run/logging.go | 159 ++++++++++++++++++++++++++++++++--
services/crunch-run/logging_test.go | 24 +++++
4 files changed, 191 insertions(+), 17 deletions(-)
via f1fb39153bc234b49ca77674a29abc029b12774a (commit)
from 5e4bf6836d9762ef5552111776dcbade99f2e680 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f1fb39153bc234b49ca77674a29abc029b12774a
Author: radhika <radhika at curoverse.com>
Date: Fri Apr 21 17:25:03 2017 -0400
8019: rateLimit crunch-run logging using API configuration parameters
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index fd2ce3f..b95ae1b 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -631,10 +631,10 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
// Get and save the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
w := &ArvLogWriter{
- runner.ArvClient,
- runner.Container.UUID,
- "container",
- runner.LogCollection.Open("container.json"),
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: "container",
+ writeCloser: runner.LogCollection.Open("container.json"),
// Get Container record JSON from the API Server
reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
@@ -1061,8 +1061,8 @@ func (runner *ContainerRunner) CommitLogs() error {
// point, but re-open crunch log with ArvClient in case there are any
// other further (such as failing to write the log to Keep!) while
// shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
- "crunch-run", nil})
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
@@ -1149,7 +1149,7 @@ func (runner *ContainerRunner) IsCancelled() bool {
// NewArvLogWriter creates an ArvLogWriter
func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+ return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, writeCloser: runner.LogCollection.Open(name + ".txt")}
// Run the full container lifecycle.
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 43c55b6..5bfbf79 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -249,7 +249,16 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
return nil
-var discoveryMap = map[string]interface{}{"defaultTrashLifetime": float64(1209600)}
+var discoveryMap = map[string]interface{}{
+ "defaultTrashLifetime": float64(1209600),
+ "crunchLimitLogBytesPerJob": float64(67108864),
+ "crunchLogThrottleBytes": float64(65536),
+ "crunchLogThrottlePeriod": float64(60),
+ "crunchLogThrottleLines": float64(1024),
+ "crunchLogPartialLineThrottlePeriod": float64(5),
+ "crunchLogBytesPerEvent": float64(4096),
+ "crunchLogSecondsBetweenEvents": float64(1),
func (client *ArvTestClient) Discovery(key string) (interface{}, error) {
return discoveryMap[key], nil
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 5254ff6..a66525e 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -6,6 +6,8 @@ import (
+ "regexp"
+ "strings"
@@ -176,6 +178,18 @@ type ArvLogWriter struct {
UUID string
loggingStream string
writeCloser io.WriteCloser
+ // for rate limiting
+ bytesLogged int64
+ logThrottleResetTime time.Time
+ logThrottleLinesSoFar int64
+ logThrottleBytesSoFar int64
+ logThrottleBytesSkipped int64
+ logThrottleIsOpen bool
+ logThrottlePartialLineLastAt time.Time
+ logThrottleFirstPartialLine bool
+ stderrBufToFlush bytes.Buffer
+ stderrFlushedAt time.Time
func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -185,17 +199,72 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
_, err1 = arvlog.writeCloser.Write(p)
- // write to API
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": arvlog.UUID,
- "event_type": arvlog.loggingStream,
- "properties": map[string]string{"text": string(p)}}}
- err2 := arvlog.ArvClient.Create("logs", lr, nil)
- if err1 != nil || err2 != nil {
+ // write to API after checking rate limit
+ crunchLogThrottlePeriod, err2 := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
+ crunchLogBytesPerEvent, err2 := arvlog.ArvClient.Discovery("crunchLogBytesPerEvent")
+ crunchLogSecondsBetweenEvents, err2 := arvlog.ArvClient.Discovery("crunchLogSecondsBetweenEvents")
+ if err2 != nil {
return 0, fmt.Errorf("%s ; %s", err1, err2)
- return len(p), nil
+ now := time.Now()
+ bytesWritten := 0
+ if now.After(arvlog.logThrottleResetTime) {
+ // It has been more than throttle_period seconds since the last
+ // checkpoint; so reset the throttle
+ if arvlog.logThrottleBytesSkipped > 0 {
+ arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now()), arvlog.logThrottleBytesSkipped))
+ }
+ arvlog.logThrottleResetTime = time.Now().Add(time.Duration(int(crunchLogThrottlePeriod.(float64))))
+ arvlog.logThrottleBytesSoFar = 0
+ arvlog.logThrottleLinesSoFar = 0
+ arvlog.logThrottleBytesSkipped = 0
+ arvlog.logThrottleIsOpen = true
+ arvlog.logThrottlePartialLineLastAt = time.Time{}
+ arvlog.logThrottleFirstPartialLine = true
+ }
+ lines := bytes.Split(p, []byte("\n"))
+ for _, line := range lines {
+ // Short circuit the counting code if we're just going to throw
+ // away the data anyway.
+ if !arvlog.logThrottleIsOpen {
+ arvlog.logThrottleBytesSkipped += int64(len(line))
+ continue
+ } else if len(line) == 0 {
+ continue
+ }
+ // check rateLimit
+ _, msg, err2 := arvlog.rateLimit(line)
+ if err2 != nil {
+ return 0, fmt.Errorf("%s ; %s", err1, err2)
+ }
+ arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
+ }
+ if arvlog.stderrBufToFlush.Len() > int(crunchLogBytesPerEvent.(float64)) ||
+ (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(int64(crunchLogSecondsBetweenEvents.(float64)))) {
+ // write to API
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": arvlog.UUID,
+ "event_type": arvlog.loggingStream,
+ "properties": map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
+ err2 := arvlog.ArvClient.Create("logs", lr, nil)
+ bytesWritten = arvlog.stderrBufToFlush.Len()
+ arvlog.stderrBufToFlush = bytes.Buffer{}
+ arvlog.stderrFlushedAt = time.Now()
+ if err1 != nil || err2 != nil {
+ return 0, fmt.Errorf("%s ; %s", err1, err2)
+ }
+ }
+ return bytesWritten, nil
// Close the underlying writer
@@ -206,3 +275,75 @@ func (arvlog *ArvLogWriter) Close() (err error) {
return err
+var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
+// Test for hard cap on total output and for log throttling. Returns whether
+// the log line should go to output or not. Returns message if limit exceeded.
+func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
+ message := ""
+ lineSize := int64(len(line))
+ partialLine := false
+ skipCounts := false
+ if arvlog.logThrottleIsOpen {
+ matches := lineRegexp.FindStringSubmatch(string(line))
+ crunchLogPartialLineThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogPartialLineThrottlePeriod")
+ crunchLimitLogBytesPerJob, err := arvlog.ArvClient.Discovery("crunchLimitLogBytesPerJob")
+ crunchLogThrottleBytes, err := arvlog.ArvClient.Discovery("crunchLogThrottleBytes")
+ crunchLogThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
+ crunchLogThrottleLines, err := arvlog.ArvClient.Discovery("crunchLogThrottleLines")
+ if err != nil {
+ return false, []byte(""), err
+ }
+ if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
+ partialLine = true
+ if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(int(crunchLogPartialLineThrottlePeriod.(float64))))) {
+ arvlog.logThrottlePartialLineLastAt = time.Now()
+ } else {
+ skipCounts = true
+ }
+ }
+ if !skipCounts {
+ arvlog.logThrottleLinesSoFar += 1
+ arvlog.logThrottleBytesSoFar += lineSize
+ arvlog.bytesLogged += lineSize
+ }
+ if arvlog.bytesLogged > int64(crunchLimitLogBytesPerJob.(float64)) {
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now()), int(crunchLimitLogBytesPerJob.(float64)))
+ arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
+ arvlog.logThrottleIsOpen = false
+ } else if arvlog.logThrottleBytesSoFar > int64(crunchLogThrottleBytes.(float64)) {
+ remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleBytes, int(crunchLogThrottlePeriod.(float64)), remainingTime)
+ arvlog.logThrottleIsOpen = false
+ } else if arvlog.logThrottleLinesSoFar > int64(crunchLogThrottleLines.(float64)) {
+ remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleLines, int(crunchLogThrottlePeriod.(float64)), remainingTime)
+ arvlog.logThrottleIsOpen = false
+ } else if partialLine && arvlog.logThrottleFirstPartialLine {
+ arvlog.logThrottleFirstPartialLine = false
+ message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now()), crunchLogPartialLineThrottlePeriod)
+ }
+ }
+ if !arvlog.logThrottleIsOpen {
+ // Don't log anything if any limit has been exceeded. Just count lossage.
+ arvlog.logThrottleBytesSkipped += lineSize
+ }
+ if message != "" {
+ // Yes, write to logs, but use our "rate exceeded" message
+ // instead of the log message that exceeded the limit.
+ message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
+ return true, []byte(message), nil
+ } else if partialLine {
+ return false, line, nil
+ } else {
+ return arvlog.logThrottleIsOpen, line, nil
+ }
diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index ceb8ca8..91af16d 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -109,3 +109,27 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunch-run.txt\n"+
". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
+func (s *LoggingTestSuite) TestWriteLogsWithRateLimit(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+ cr.CrunchLog.Print("Hello world!")
+ cr.CrunchLog.Print("Goodbye")
+ cr.CrunchLog.Close()
+ c.Check(api.Calls, Equals, 1)
+ mt, err := cr.LogCollection.ManifestText()
+ c.Check(err, IsNil)
+ c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
+ logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+ "2015-12-29T15:51:45.000000002Z Goodbye\n"
+ c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+ c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
+ c.Check(string(kc.Content), Equals, logtext)
More information about the arvados-commits
mailing list