[ARVADOS] updated: ddb96e86a98113ea313dd01e8b7957f07345b6a7
Git user
git at public.curoverse.com
Thu Aug 4 17:52:41 EDT 2016
Summary of changes:
lib/crunchstat/crunchstat.go | 7 +++++-
services/crunch-run/logging.go | 52 ++++++++++++++++--------------------------
2 files changed, 26 insertions(+), 33 deletions(-)
via ddb96e86a98113ea313dd01e8b7957f07345b6a7 (commit)
via 13c821b6dd20ed658db9c6c3db0b8d10d9175fbb (commit)
via 5951500ed7a4eaf07c4d2bad3b87d5a0ed1f60e8 (commit)
from c3cbac378fb45e3bf996a5d691cd3f205dfb3f90 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit ddb96e86a98113ea313dd01e8b7957f07345b6a7
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Aug 4 17:45:10 2016 -0400
Fix race in crunchstat.
To fulfill the "nothing more will be logged after Stop() returns"
guarantee, Stop() needs to wait for run() to finish.
Should fix https://ci.curoverse.com/job/run-tests-remainder/38/consoleText
No issue #
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 03cfa7d..f3c2d84 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -56,7 +56,8 @@ type Reporter struct {
lastDiskSample map[string]ioSample
lastCPUSample cpuSample
- done chan struct{}
+ done chan struct{} // closed when we should stop reporting
+ flushed chan struct{} // closed when we have made our last report
}
// Start starts monitoring in a new goroutine, and returns
@@ -72,6 +73,7 @@ type Reporter struct {
// Callers should not modify public data fields after calling Start.
func (r *Reporter) Start() {
r.done = make(chan struct{})
+ r.flushed = make(chan struct{})
go r.run()
}
@@ -81,6 +83,7 @@ func (r *Reporter) Start() {
// Nothing will be logged after Stop returns.
func (r *Reporter) Stop() {
close(r.done)
+ <-r.flushed
}
func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
@@ -366,6 +369,8 @@ func (r *Reporter) doCPUStats() {
// Report stats periodically until we learn (via r.done) that someone
// called Stop.
func (r *Reporter) run() {
+ defer close(r.flushed)
+
r.reportedStatFile = make(map[string]string)
if !r.waitForCIDFile() || !r.waitForCgroup() {
commit 13c821b6dd20ed658db9c6c3db0b8d10d9175fbb
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Aug 4 17:44:34 2016 -0400
Simplify write flusher using a time.Ticker.
No issue #
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index a2aa5c6..4f8f95c 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -18,7 +18,7 @@ type Timestamper func(t time.Time) string
// Logging plumbing:
//
// ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
+// ThrottledLogger.buf -> ThrottledLogger.flusher ->
// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
//
// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
@@ -51,10 +51,11 @@ func RFC3339Timestamp(t time.Time) string {
// tl.Immediate, if tl.Immediate is not nil.
func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
tl.Mutex.Lock()
+ defer tl.Mutex.Unlock()
+
if tl.buf == nil {
tl.buf = &bytes.Buffer{}
}
- defer tl.Mutex.Unlock()
now := tl.Timestamper(time.Now().UTC())
sc := bufio.NewScanner(bytes.NewBuffer(p))
@@ -77,39 +78,28 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
// Periodically check the current buffer; if not empty, send it on the
// channel to the goWriter goroutine.
func (tl *ThrottledLogger) flusher() {
- bufchan := make(chan *bytes.Buffer)
- bufterm := make(chan bool)
-
- // Use a separate goroutine for the actual write so that the writes are
- // actually initiated closer every 1s instead of every
- // 1s + (time to it takes to write).
- go goWriter(tl.writer, bufchan, bufterm)
-
- // We use a separate "stopping" var here to ensure we flush
- // tl.buf after tl.stop becomes true.
- stopping := false
- for !stopping {
- time.Sleep(time.Second)
- stopping = tl.stop
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
+ for range ticker.C {
+ // We use a separate "stopping" var here to ensure we flush
+ // tl.buf after tl.stop becomes true.
+ stopping := tl.stop
+
+ var ready *bytes.Buffer
+
tl.Mutex.Lock()
- if tl.buf != nil && tl.buf.Len() > 0 {
- oldbuf := tl.buf
- tl.buf = nil
- bufchan <- oldbuf
- }
+ ready, tl.buf = tl.buf, nil
tl.Mutex.Unlock()
- }
- close(bufchan)
- <-bufterm
- tl.flusherDone <- true
-}
-// Receive buffers from a channel and send to the underlying Writer
-func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
- for b := range c {
- writer.Write(b.Bytes())
+ if ready != nil && ready.Len() > 0 {
+ tl.writer.Write(ready.Bytes())
+ }
+
+ if stopping {
+ break
+ }
}
- t <- true
+ close(tl.flusherDone)
}
// Close the flusher goroutine and wait for it to complete, then close the
commit 5951500ed7a4eaf07c4d2bad3b87d5a0ed1f60e8
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Aug 4 16:53:32 2016 -0400
Fix potential race in ThrottledLogger flusher.
No issue #
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index db9d101..a2aa5c6 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -84,22 +84,20 @@ func (tl *ThrottledLogger) flusher() {
// actually initiated closer every 1s instead of every
// 1s + (time to it takes to write).
go goWriter(tl.writer, bufchan, bufterm)
- for {
- if !tl.stop {
- time.Sleep(1 * time.Second)
- }
+
+ // We use a separate "stopping" var here to ensure we flush
+ // tl.buf after tl.stop becomes true.
+ stopping := false
+ for !stopping {
+ time.Sleep(time.Second)
+ stopping = tl.stop
tl.Mutex.Lock()
if tl.buf != nil && tl.buf.Len() > 0 {
oldbuf := tl.buf
tl.buf = nil
- tl.Mutex.Unlock()
bufchan <- oldbuf
- } else if tl.stop {
- tl.Mutex.Unlock()
- break
- } else {
- tl.Mutex.Unlock()
}
+ tl.Mutex.Unlock()
}
close(bufchan)
<-bufterm
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list