[ARVADOS] updated: 455f862a19fe0bcc8ac3c6e685a96faf747ae623
Git user
git at public.curoverse.com
Tue Apr 25 15:06:11 EDT 2017
Summary of changes:
.../app/controllers/keep_disks_controller.rb | 1 +
apps/workbench/app/controllers/users_controller.rb | 1 +
apps/workbench/app/models/arvados_base.rb | 4 +
apps/workbench/app/models/arvados_resource_list.rb | 8 +
apps/workbench/app/models/job.rb | 2 +-
apps/workbench/app/models/pipeline_instance.rb | 7 +-
apps/workbench/app/models/proxy_work_unit.rb | 1 +
.../test/unit/arvados_resource_list_test.rb | 8 +
apps/workbench/test/unit/link_test.rb | 3 +
apps/workbench/test/unit/pipeline_instance_test.rb | 3 +
apps/workbench/test/unit/work_unit_test.rb | 3 +
sdk/go/dispatch/throttle_test.go | 29 ++--
sdk/python/arvados/_ranges.py | 2 +-
sdk/python/arvados/arvfile.py | 170 ++++++++++++---------
sdk/python/tests/test_collections.py | 60 +++++++-
services/crunch-run/logging.go | 56 ++++---
services/fuse/tests/test_tmp_collection.py | 13 ++
17 files changed, 250 insertions(+), 121 deletions(-)
via 455f862a19fe0bcc8ac3c6e685a96faf747ae623 (commit)
via 8622b46a4a6c127a1927d9c2e54febec6a5bf503 (commit)
via f2f8340b18430738a9527f05e707dd8f03508cc0 (commit)
via ad7294edfcc59c3e67548328a88c9e689c3ae2cf (commit)
via bf5d77baad2071af6eea514c76b4892cec4974a0 (commit)
via 04bd6b08b9ac13d29ac05c9281850d430d71066d (commit)
via 840b855ff0317e66f4176ae0f23e9785f72267b4 (commit)
via 1220e2184449ccab288fa41de4749fb029cd317b (commit)
via 17b80c32a5b177ee8c5f32b81dd0889f3399eee8 (commit)
via 35c2572761bb060aa1c12f417f97aa9e1ccbe7eb (commit)
via f4661a02245a35f8d223693a5aecaae87083fb16 (commit)
via 52c6f13db207030bdbe063665c0dd524007db828 (commit)
via 0ecea550fde014578e71004360b700cdfeae4909 (commit)
via 7116da151dc8bfd5ac1a9b016b2ed6e4c35572f7 (commit)
via bef5d901f00648e703bb6a3ad58fa481a610ffd7 (commit)
from 4344941f8410ed21befead8d6b8d2471d291032f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 455f862a19fe0bcc8ac3c6e685a96faf747ae623
Author: radhika <radhika at curoverse.com>
Date: Tue Apr 25 15:05:31 2017 -0400
8019: rateLimit method signature
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 22ba130..6e32d72 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -188,8 +188,8 @@ type ArvLogWriter struct {
logThrottleIsOpen bool
logThrottlePartialLineLastAt time.Time
logThrottleFirstPartialLine bool
- stderrBufToFlush bytes.Buffer
- stderrFlushedAt time.Time
+ bufToFlush bytes.Buffer
+ bufFlushedAt time.Time
// rate limiting config parameters
crunchLimitLogBytesPerJob int64
@@ -273,10 +273,10 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
// It has been more than throttle_period seconds since the last
// checkpoint; so reset the throttle
if arvlog.logThrottleBytesSkipped > 0 {
- arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped))
+ arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
}
- arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+ arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
arvlog.logThrottleBytesSoFar = 0
arvlog.logThrottleLinesSoFar = 0
arvlog.logThrottleBytesSkipped = 0
@@ -298,25 +298,23 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
}
// check rateLimit
- _, msg, err2 := arvlog.rateLimit(line)
- if err2 != nil {
- return 0, fmt.Errorf("%s ; %s", err1, err2)
- }
- arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
+ logOpen, msg := arvlog.rateLimit(line, now)
+ arvlog.bufToFlush.WriteString(string(msg) + "\n")
+ arvlog.logThrottleIsOpen = logOpen
}
- if int64(arvlog.stderrBufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
- (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+ if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
+ (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
// write to API
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
"object_uuid": arvlog.UUID,
"event_type": arvlog.loggingStream,
- "properties": map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
+ "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
err2 := arvlog.ArvClient.Create("logs", lr, nil)
- bytesWritten = arvlog.stderrBufToFlush.Len()
- arvlog.stderrBufToFlush = bytes.Buffer{}
- arvlog.stderrFlushedAt = time.Now()
+ bytesWritten = arvlog.bufToFlush.Len()
+ arvlog.bufToFlush = bytes.Buffer{}
+ arvlog.bufFlushedAt = now
if err1 != nil || err2 != nil {
return 0, fmt.Errorf("%s ; %s", err1, err2)
@@ -335,11 +333,11 @@ func (arvlog *ArvLogWriter) Close() (err error) {
return err
}
-var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
// Test for hard cap on total output and for log throttling. Returns whether
// the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
message := ""
lineSize := int64(len(line))
partialLine := false
@@ -351,8 +349,8 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
partialLine = true
- if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
- arvlog.logThrottlePartialLineLastAt = time.Now()
+ if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+ arvlog.logThrottlePartialLineLastAt = now
} else {
skipCounts = true
}
@@ -365,23 +363,23 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
}
if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
- message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLimitLogBytesPerJob)
- arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+ arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
arvlog.logThrottleIsOpen = false
} else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
- remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
} else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
- remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
} else if partialLine && arvlog.logThrottleFirstPartialLine {
arvlog.logThrottleFirstPartialLine = false
- message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+ message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
}
}
@@ -395,10 +393,10 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
// Yes, write to logs, but use our "rate exceeded" message
// instead of the log message that exceeded the limit.
message += " A complete log is still being written to Keep, and will be available when the job finishes."
- return true, []byte(message), nil
+ return true, []byte(message)
} else if partialLine {
- return false, line, nil
+ return false, line
} else {
- return arvlog.logThrottleIsOpen, line, nil
+ return arvlog.logThrottleIsOpen, line
}
}
commit 8622b46a4a6c127a1927d9c2e54febec6a5bf503
Merge: 4344941 f2f8340
Author: radhika <radhika at curoverse.com>
Date: Tue Apr 25 14:08:14 2017 -0400
Merge branch 'master' into 8019-crunchrun-log-throttle
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list