[ARVADOS] updated: 508bcab6bd1a7ca34a929d7314f5ad5c2e48ce26
git at public.curoverse.com
git at public.curoverse.com
Tue Mar 3 10:56:05 EST 2015
Summary of changes:
services/crunchstat/crunchstat.go | 47 ++++++++++++++++++++--------------
services/crunchstat/crunchstat_test.go | 29 ++++++++++++++++-----
2 files changed, 50 insertions(+), 26 deletions(-)
via 508bcab6bd1a7ca34a929d7314f5ad5c2e48ce26 (commit)
from 46558e7be9da2099ccb12497230ad81ed1d35889 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 508bcab6bd1a7ca34a929d7314f5ad5c2e48ce26
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Mar 3 10:56:52 2015 -0500
5043: Split long stderr lines rather than consume unlimited memory.
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 356eb01..1a22e26 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -37,21 +37,30 @@ type Cgroup struct {
var childLog = log.New(os.Stderr, "", 0)
var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+const (
+ MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
+)
+
func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
- reader := bufio.NewReader(in)
+ reader := bufio.NewReaderSize(in, MaxLogLine)
+ var prefix string
for {
- line, err := reader.ReadBytes('\n')
- if len(line) > 0 {
- if err == nil {
- // err == nil IFF line ends in \n
- line = line[:len(line)-1]
- }
- childLog.Println(string(line))
- }
+ line, isPrefix, err := reader.ReadLine()
if err == io.EOF {
break
} else if err != nil {
- statLog.Fatalln("line buffering error:", err)
+ statLog.Fatal("error reading child stderr:", err)
+ }
+ var suffix string
+ if isPrefix {
+ suffix = "[...]"
+ }
+ childLog.Print(prefix, string(line), suffix)
+ // Set up prefix for following line
+ if isPrefix {
+ prefix = "[...]"
+ } else {
+ prefix = ""
}
}
done <- true
@@ -61,7 +70,7 @@ func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
func ReadAllOrWarn(in *os.File) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
- statLog.Printf("read %s: %s\n", in.Name(), err)
+ statLog.Printf("error reading %s: %s\n", in.Name(), err)
}
return content, err
}
@@ -103,9 +112,9 @@ func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error
// [b] after all contained processes have exited.
reportedStatFile[stat] = path
if path == "" {
- statLog.Printf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+ statLog.Printf("error finding stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
} else {
- statLog.Printf("reading stats from %s\n", path)
+ statLog.Printf("error reading stats from %s\n", path)
}
}
return file, err
@@ -364,7 +373,7 @@ func run(logger *log.Logger) error {
flag.Parse()
if cgroup_root == "" {
- statLog.Fatalln("Must provide -cgroup-root")
+ statLog.Fatal("error: must provide -cgroup-root")
}
finish_chan := make(chan bool)
@@ -376,7 +385,7 @@ func run(logger *log.Logger) error {
// Set up subprocess
cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
- statLog.Println("Running ", flag.Args())
+ childLog.Println("Running", flag.Args())
// Child process will use our stdin and stdout pipes
// (we close our copies below)
@@ -398,13 +407,13 @@ func run(logger *log.Logger) error {
// Funnel stderr through our channel
stderr_pipe, err := cmd.StderrPipe()
if err != nil {
- statLog.Fatalln("stderr:", err)
+ statLog.Fatalln("error in StderrPipe:", err)
}
go CopyPipeToChildLog(stderr_pipe, finish_chan)
// Run subprocess
if err := cmd.Start(); err != nil {
- statLog.Fatalln("cmd.Start:", err)
+ statLog.Fatalln("error in cmd.Start:", err)
}
// Close stdin/stdout in this (parent) process
@@ -428,7 +437,7 @@ func run(logger *log.Logger) error {
time.Sleep(100 * time.Millisecond)
}
if !ok {
- statLog.Println("Could not read cid file:", cgroup_cidfile)
+ statLog.Println("error reading cid file:", cgroup_cidfile)
}
}
@@ -461,7 +470,7 @@ func main() {
os.Exit(status.ExitStatus())
}
} else {
- statLog.Fatalln("cmd.Wait:", err)
+ statLog.Fatalln("error in cmd.Wait:", err)
}
}
}
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index fe922e9..dff323e 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -57,7 +57,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
pipeIn, pipeOut := io.Pipe()
go CopyPipeToChildLog(pipeIn, control)
- sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
+ sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
go func() {
pipeOut.Write([]byte("before\n"))
@@ -75,12 +75,27 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
}
-
- receivedString, err := rcv.ReadBytes('\n')
- if err != nil {
- t.Fatal(err)
+
+ var receivedBytes []byte
+ done := false
+ for !done {
+ line, err := rcv.ReadBytes('\n')
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(line) >= 5 && string(line[0:5]) == "[...]" {
+ if receivedBytes == nil {
+ t.Fatal("Beginning of line reported as continuation")
+ }
+ line = line[5:]
+ }
+ if len(line) >= 6 && string(line[len(line)-6:len(line)]) == "[...]\n" {
+ line = line[:len(line)-6]
+ } else {
+ done = true
+ }
+ receivedBytes = append(receivedBytes, line...)
}
- receivedBytes := []byte(receivedString)
if bytes.Compare(receivedBytes, sentBytes) != 0 {
t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes)+1, len(receivedBytes))
}
@@ -97,7 +112,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
}
}
-func captureLogs() (*bufio.Reader) {
+func captureLogs() *bufio.Reader {
// Send childLog to our bufio reader instead of stderr
stderrIn, stderrOut := io.Pipe()
childLog = log.New(stderrOut, "", 0)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list