[ARVADOS] updated: ddb96e86a98113ea313dd01e8b7957f07345b6a7

Git user git at public.curoverse.com
Thu Aug 4 17:52:41 EDT 2016


Summary of changes:
 lib/crunchstat/crunchstat.go   |  7 +++++-
 services/crunch-run/logging.go | 52 ++++++++++++++++--------------------------
 2 files changed, 26 insertions(+), 33 deletions(-)

       via  ddb96e86a98113ea313dd01e8b7957f07345b6a7 (commit)
       via  13c821b6dd20ed658db9c6c3db0b8d10d9175fbb (commit)
       via  5951500ed7a4eaf07c4d2bad3b87d5a0ed1f60e8 (commit)
      from  c3cbac378fb45e3bf996a5d691cd3f205dfb3f90 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit ddb96e86a98113ea313dd01e8b7957f07345b6a7
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Aug 4 17:45:10 2016 -0400

    Fix race in crunchstat.
    
    To fulfill the "nothing more will be logged after Stop() returns"
    guarantee, Stop() needs to wait for run() to finish.
    
    Should fix https://ci.curoverse.com/job/run-tests-remainder/38/consoleText
    
    No issue #

diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 03cfa7d..f3c2d84 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -56,7 +56,8 @@ type Reporter struct {
 	lastDiskSample   map[string]ioSample
 	lastCPUSample    cpuSample
 
-	done chan struct{}
+	done    chan struct{}	// closed when we should stop reporting
+	flushed chan struct{}	// closed when we have made our last report
 }
 
 // Start starts monitoring in a new goroutine, and returns
@@ -72,6 +73,7 @@ type Reporter struct {
 // Callers should not modify public data fields after calling Start.
 func (r *Reporter) Start() {
 	r.done = make(chan struct{})
+	r.flushed = make(chan struct{})
 	go r.run()
 }
 
@@ -81,6 +83,7 @@ func (r *Reporter) Start() {
 // Nothing will be logged after Stop returns.
 func (r *Reporter) Stop() {
 	close(r.done)
+	<-r.flushed
 }
 
 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
@@ -366,6 +369,8 @@ func (r *Reporter) doCPUStats() {
 // Report stats periodically until we learn (via r.done) that someone
 // called Stop.
 func (r *Reporter) run() {
+	defer close(r.flushed)
+
 	r.reportedStatFile = make(map[string]string)
 
 	if !r.waitForCIDFile() || !r.waitForCgroup() {

commit 13c821b6dd20ed658db9c6c3db0b8d10d9175fbb
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Aug 4 17:44:34 2016 -0400

    Simplify write flusher using a time.Ticker.
    
    No issue #

diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index a2aa5c6..4f8f95c 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -18,7 +18,7 @@ type Timestamper func(t time.Time) string
 // Logging plumbing:
 //
 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
+// ThrottledLogger.buf -> ThrottledLogger.flusher ->
 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
 //
 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
@@ -51,10 +51,11 @@ func RFC3339Timestamp(t time.Time) string {
 // tl.Immediate, if tl.Immediate is not nil.
 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 	tl.Mutex.Lock()
+	defer tl.Mutex.Unlock()
+
 	if tl.buf == nil {
 		tl.buf = &bytes.Buffer{}
 	}
-	defer tl.Mutex.Unlock()
 
 	now := tl.Timestamper(time.Now().UTC())
 	sc := bufio.NewScanner(bytes.NewBuffer(p))
@@ -77,39 +78,28 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
 func (tl *ThrottledLogger) flusher() {
-	bufchan := make(chan *bytes.Buffer)
-	bufterm := make(chan bool)
-
-	// Use a separate goroutine for the actual write so that the writes are
-	// actually initiated closer every 1s instead of every
-	// 1s + (time to it takes to write).
-	go goWriter(tl.writer, bufchan, bufterm)
-
-	// We use a separate "stopping" var here to ensure we flush
-	// tl.buf after tl.stop becomes true.
-	stopping := false
-	for !stopping {
-		time.Sleep(time.Second)
-		stopping = tl.stop
+	ticker := time.NewTicker(time.Second)
+	defer ticker.Stop()
+	for range ticker.C {
+		// We use a separate "stopping" var here to ensure we flush
+		// tl.buf after tl.stop becomes true.
+		stopping := tl.stop
+
+		var ready *bytes.Buffer
+
 		tl.Mutex.Lock()
-		if tl.buf != nil && tl.buf.Len() > 0 {
-			oldbuf := tl.buf
-			tl.buf = nil
-			bufchan <- oldbuf
-		}
+		ready, tl.buf = tl.buf, nil
 		tl.Mutex.Unlock()
-	}
-	close(bufchan)
-	<-bufterm
-	tl.flusherDone <- true
-}
 
-// Receive buffers from a channel and send to the underlying Writer
-func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
-	for b := range c {
-		writer.Write(b.Bytes())
+		if ready != nil && ready.Len() > 0 {
+			tl.writer.Write(ready.Bytes())
+		}
+
+		if stopping {
+			break
+		}
 	}
-	t <- true
+	close(tl.flusherDone)
 }
 
 // Close the flusher goroutine and wait for it to complete, then close the

commit 5951500ed7a4eaf07c4d2bad3b87d5a0ed1f60e8
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Aug 4 16:53:32 2016 -0400

    Fix potential race in ThrottledLogger flusher.
    
    No issue #

diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index db9d101..a2aa5c6 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -84,22 +84,20 @@ func (tl *ThrottledLogger) flusher() {
 	// actually initiated closer every 1s instead of every
 	// 1s + (time to it takes to write).
 	go goWriter(tl.writer, bufchan, bufterm)
-	for {
-		if !tl.stop {
-			time.Sleep(1 * time.Second)
-		}
+
+	// We use a separate "stopping" var here to ensure we flush
+	// tl.buf after tl.stop becomes true.
+	stopping := false
+	for !stopping {
+		time.Sleep(time.Second)
+		stopping = tl.stop
 		tl.Mutex.Lock()
 		if tl.buf != nil && tl.buf.Len() > 0 {
 			oldbuf := tl.buf
 			tl.buf = nil
-			tl.Mutex.Unlock()
 			bufchan <- oldbuf
-		} else if tl.stop {
-			tl.Mutex.Unlock()
-			break
-		} else {
-			tl.Mutex.Unlock()
 		}
+		tl.Mutex.Unlock()
 	}
 	close(bufchan)
 	<-bufterm

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list