[ARVADOS] updated: 508bcab6bd1a7ca34a929d7314f5ad5c2e48ce26

git at public.curoverse.com git at public.curoverse.com
Tue Mar 3 10:56:05 EST 2015


Summary of changes:
 services/crunchstat/crunchstat.go      | 47 ++++++++++++++++++++--------------
 services/crunchstat/crunchstat_test.go | 29 ++++++++++++++++-----
 2 files changed, 50 insertions(+), 26 deletions(-)

       via  508bcab6bd1a7ca34a929d7314f5ad5c2e48ce26 (commit)
      from  46558e7be9da2099ccb12497230ad81ed1d35889 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 508bcab6bd1a7ca34a929d7314f5ad5c2e48ce26
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Mar 3 10:56:52 2015 -0500

    5043: Split long stderr lines rather than consume unlimited memory.

diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 356eb01..1a22e26 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -37,21 +37,30 @@ type Cgroup struct {
 var childLog = log.New(os.Stderr, "", 0)
 var statLog = log.New(os.Stderr, "crunchstat: ", 0)
 
+const (
+	MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
+)
+
 func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
-	reader := bufio.NewReader(in)
+	reader := bufio.NewReaderSize(in, MaxLogLine)
+	var prefix string
 	for {
-		line, err := reader.ReadBytes('\n')
-		if len(line) > 0 {
-			if err == nil {
-				// err == nil IFF line ends in \n
-				line = line[:len(line)-1]
-			}
-			childLog.Println(string(line))
-		}
+		line, isPrefix, err := reader.ReadLine()
 		if err == io.EOF {
 			break
 		} else if err != nil {
-			statLog.Fatalln("line buffering error:", err)
+			statLog.Fatal("error reading child stderr:", err)
+		}
+		var suffix string
+		if isPrefix {
+			suffix = "[...]"
+		}
+		childLog.Print(prefix, string(line), suffix)
+		// Set up prefix for following line
+		if isPrefix {
+			prefix = "[...]"
+		} else {
+			prefix = ""
 		}
 	}
 	done <- true
@@ -61,7 +70,7 @@ func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
 func ReadAllOrWarn(in *os.File) ([]byte, error) {
 	content, err := ioutil.ReadAll(in)
 	if err != nil {
-		statLog.Printf("read %s: %s\n", in.Name(), err)
+		statLog.Printf("error reading %s: %s\n", in.Name(), err)
 	}
 	return content, err
 }
@@ -103,9 +112,9 @@ func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error
 		// [b] after all contained processes have exited.
 		reportedStatFile[stat] = path
 		if path == "" {
-			statLog.Printf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+			statLog.Printf("error finding stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
 		} else {
-			statLog.Printf("reading stats from %s\n", path)
+			statLog.Printf("error reading stats from %s\n", path)
 		}
 	}
 	return file, err
@@ -364,7 +373,7 @@ func run(logger *log.Logger) error {
 	flag.Parse()
 
 	if cgroup_root == "" {
-		statLog.Fatalln("Must provide -cgroup-root")
+		statLog.Fatal("error: must provide -cgroup-root")
 	}
 
 	finish_chan := make(chan bool)
@@ -376,7 +385,7 @@ func run(logger *log.Logger) error {
 		// Set up subprocess
 		cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
 
-		statLog.Println("Running ", flag.Args())
+		childLog.Println("Running", flag.Args())
 
 		// Child process will use our stdin and stdout pipes
 		// (we close our copies below)
@@ -398,13 +407,13 @@ func run(logger *log.Logger) error {
 		// Funnel stderr through our channel
 		stderr_pipe, err := cmd.StderrPipe()
 		if err != nil {
-			statLog.Fatalln("stderr:", err)
+			statLog.Fatalln("error in StderrPipe:", err)
 		}
 		go CopyPipeToChildLog(stderr_pipe, finish_chan)
 
 		// Run subprocess
 		if err := cmd.Start(); err != nil {
-			statLog.Fatalln("cmd.Start:", err)
+			statLog.Fatalln("error in cmd.Start:", err)
 		}
 
 		// Close stdin/stdout in this (parent) process
@@ -428,7 +437,7 @@ func run(logger *log.Logger) error {
 			time.Sleep(100 * time.Millisecond)
 		}
 		if !ok {
-			statLog.Println("Could not read cid file:", cgroup_cidfile)
+			statLog.Println("error reading cid file:", cgroup_cidfile)
 		}
 	}
 
@@ -461,7 +470,7 @@ func main() {
 				os.Exit(status.ExitStatus())
 			}
 		} else {
-			statLog.Fatalln("cmd.Wait:", err)
+			statLog.Fatalln("error in cmd.Wait:", err)
 		}
 	}
 }
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index fe922e9..dff323e 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -57,7 +57,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
 	pipeIn, pipeOut := io.Pipe()
 	go CopyPipeToChildLog(pipeIn, control)
 
-	sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
+	sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
 	go func() {
 		pipeOut.Write([]byte("before\n"))
 
@@ -75,12 +75,27 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
 	if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
 		t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
 	}
-	
-	receivedString, err := rcv.ReadBytes('\n')
-	if err != nil {
-		t.Fatal(err)
+
+	var receivedBytes []byte
+	done := false
+	for !done {
+		line, err := rcv.ReadBytes('\n')
+		if err != nil {
+			t.Fatal(err)
+		}
+		if len(line) >= 5 && string(line[0:5]) == "[...]" {
+			if receivedBytes == nil {
+				t.Fatal("Beginning of line reported as continuation")
+			}
+			line = line[5:]
+		}
+		if len(line) >= 6 && string(line[len(line)-6:len(line)]) == "[...]\n" {
+			line = line[:len(line)-6]
+		} else {
+			done = true
+		}
+		receivedBytes = append(receivedBytes, line...)
 	}
-	receivedBytes := []byte(receivedString)
 	if bytes.Compare(receivedBytes, sentBytes) != 0 {
 		t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes)+1, len(receivedBytes))
 	}
@@ -97,7 +112,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
 	}
 }
 
-func captureLogs() (*bufio.Reader) {
+func captureLogs() *bufio.Reader {
 	// Send childLog to our bufio reader instead of stderr
 	stderrIn, stderrOut := io.Pipe()
 	childLog = log.New(stderrOut, "", 0)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list