[ARVADOS] created: 46558e7be9da2099ccb12497230ad81ed1d35889
git at public.curoverse.com
git at public.curoverse.com
Mon Mar 2 21:35:07 EST 2015
at 46558e7be9da2099ccb12497230ad81ed1d35889 (commit)
commit 46558e7be9da2099ccb12497230ad81ed1d35889
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Mar 2 21:35:35 2015 -0500
5043: Use Go's log package to serialize writes. Lose logChan.
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 4e7e53d..356eb01 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -34,7 +34,10 @@ type Cgroup struct {
cid string
}
-func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
+var childLog = log.New(os.Stderr, "", 0)
+var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+
+func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
reader := bufio.NewReader(in)
for {
line, err := reader.ReadBytes('\n')
@@ -43,38 +46,22 @@ func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
// err == nil IFF line ends in \n
line = line[:len(line)-1]
}
- out <- string(line)
+ childLog.Println(string(line))
}
- if err != nil {
- if err != io.EOF {
- out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
- }
+ if err == io.EOF {
break
+ } else if err != nil {
+ statLog.Fatalln("line buffering error:", err)
}
}
done <- true
in.Close()
}
-func CopyChanToPipe(in <-chan string, out io.Writer) {
- for s := range in {
- fmt.Fprintln(out, s)
- }
-}
-
-var logChan chan string
-
-func LogPrintf(format string, args ...interface{}) {
- if logChan == nil {
- return
- }
- logChan <- fmt.Sprintf("crunchstat: "+format, args...)
-}
-
func ReadAllOrWarn(in *os.File) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
- LogPrintf("read %s: %s", in.Name(), err)
+ statLog.Printf("read %s: %s\n", in.Name(), err)
}
return content, err
}
@@ -116,9 +103,9 @@ func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error
// [b] after all contained processes have exited.
reportedStatFile[stat] = path
if path == "" {
- LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+ statLog.Printf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
} else {
- LogPrintf("reading stats from %s", path)
+ statLog.Printf("reading stats from %s\n", path)
}
}
return file, err
@@ -136,7 +123,7 @@ func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
stats, err := ioutil.ReadFile(statsFilename)
if err != nil {
- LogPrintf("read %s: %s", statsFilename, err)
+ statLog.Printf("read %s: %s\n", statsFilename, err)
continue
}
return strings.NewReader(string(stats)), nil
@@ -189,7 +176,7 @@ func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
sample.txBytes-prev.txBytes,
sample.rxBytes-prev.rxBytes)
}
- LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
+ statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
lastSample[dev] = sample
}
}
@@ -222,7 +209,7 @@ func DoMemoryStats(cgroup Cgroup) {
outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
}
}
- LogPrintf("mem%s", outstat.String())
+ statLog.Printf("mem%s\n", outstat.String())
}
func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
@@ -264,7 +251,7 @@ func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
tx-prev.txBytes,
rx-prev.rxBytes)
}
- LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta)
+ statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
lastSample[ifName] = nextSample
}
}
@@ -325,7 +312,7 @@ func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
nextSample.user-lastSample.user,
nextSample.sys-lastSample.sys)
}
- LogPrintf("cpu %.4f user %.4f sys %d cpus%s",
+ statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
nextSample.user, nextSample.sys, nextSample.cpus, delta)
*lastSample = nextSample
}
@@ -377,23 +364,19 @@ func run(logger *log.Logger) error {
flag.Parse()
if cgroup_root == "" {
- logger.Fatal("Must provide -cgroup-root")
+ statLog.Fatalln("Must provide -cgroup-root")
}
- logChan = make(chan string, 1)
- defer close(logChan)
finish_chan := make(chan bool)
defer close(finish_chan)
- go CopyChanToPipe(logChan, os.Stderr)
-
var cmd *exec.Cmd
if len(flag.Args()) > 0 {
// Set up subprocess
cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
- logger.Print("Running ", flag.Args())
+ statLog.Println("Running ", flag.Args())
// Child process will use our stdin and stdout pipes
// (we close our copies below)
@@ -401,27 +384,27 @@ func run(logger *log.Logger) error {
cmd.Stdout = os.Stdout
// Forward SIGINT and SIGTERM to inner process
- term := make(chan os.Signal, 1)
+ sigChan := make(chan os.Signal, 1)
go func(sig <-chan os.Signal) {
catch := <-sig
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- logger.Print("caught signal: ", catch)
- }(term)
- signal.Notify(term, syscall.SIGTERM)
- signal.Notify(term, syscall.SIGINT)
+ statLog.Println("caught signal:", catch)
+ }(sigChan)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
// Funnel stderr through our channel
stderr_pipe, err := cmd.StderrPipe()
if err != nil {
- logger.Fatal(err)
+ statLog.Fatalln("stderr:", err)
}
- go CopyPipeToChan(stderr_pipe, logChan, finish_chan)
+ go CopyPipeToChildLog(stderr_pipe, finish_chan)
// Run subprocess
if err := cmd.Start(); err != nil {
- logger.Fatal(err)
+ statLog.Fatalln("cmd.Start:", err)
}
// Close stdin/stdout in this (parent) process
@@ -445,7 +428,7 @@ func run(logger *log.Logger) error {
time.Sleep(100 * time.Millisecond)
}
if !ok {
- logger.Printf("Could not read cid file %s", cgroup_cidfile)
+ statLog.Println("Could not read cid file:", cgroup_cidfile)
}
}
@@ -478,7 +461,7 @@ func main() {
os.Exit(status.ExitStatus())
}
} else {
- logger.Fatalf("cmd.Wait: %v", err)
+ statLog.Fatalln("cmd.Wait:", err)
}
}
}
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index 91fe851..fe922e9 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"io"
+ "log"
"math/rand"
"os"
"regexp"
@@ -12,9 +13,9 @@ import (
)
func TestReadAllOrWarnFail(t *testing.T) {
- logChan = make(chan string)
+ rcv := captureLogs()
+ defer uncaptureLogs()
go func() {
- defer close(logChan)
// The special file /proc/self/mem can be opened for
// reading, but reading from byte 0 returns an error.
f, err := os.Open("/proc/self/mem")
@@ -25,67 +26,69 @@ func TestReadAllOrWarnFail(t *testing.T) {
t.Fatalf("Expected error, got %v", x)
}
}()
- if _, ok := <-logChan; !ok {
- t.Fatalf("Expected error message about nonexistent file")
- }
- if msg, ok := <-logChan; ok {
- t.Fatalf("Expected channel to close, got %s", msg)
+ if msg, err := rcv.ReadBytes('\n'); err != nil {
+ t.Fatal(err)
+ } else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
+ t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
}
}
func TestReadAllOrWarnSuccess(t *testing.T) {
- logChan = make(chan string)
- go func() {
- defer close(logChan)
- f, err := os.Open("./crunchstat_test.go")
- if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
- }
- data, err := ReadAllOrWarn(f)
- if err != nil {
- t.Fatalf("got error %s", err)
- }
- if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: %s", err)
- }
- }()
- if msg, ok := <-logChan; ok {
- t.Fatalf("Expected channel to close, got %s", msg)
+ f, err := os.Open("./crunchstat_test.go")
+ if err != nil {
+ t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+ }
+ data, err := ReadAllOrWarn(f)
+ if err != nil {
+ t.Fatalf("got error %s", err)
+ }
+ if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
+ t.Fatalf("data failed regexp: %s", err)
}
}
-// Test that CopyPipeToChan works even on lines longer than
+// Test that CopyPipeToChildLog works even on lines longer than
// bufio.MaxScanTokenSize.
-func TestCopyPipeToChanLongLines(t *testing.T) {
- logChan := make(chan string)
- control := make(chan bool)
+func TestCopyPipeToChildLogLongLines(t *testing.T) {
+ rcv := captureLogs()
+ defer uncaptureLogs()
+ control := make(chan bool)
pipeIn, pipeOut := io.Pipe()
- go CopyPipeToChan(pipeIn, logChan, control)
+ go CopyPipeToChildLog(pipeIn, control)
sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
go func() {
+ pipeOut.Write([]byte("before\n"))
+
for i := range sentBytes {
// Some bytes that aren't newlines:
sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
}
- pipeOut.Write([]byte("before\n"))
+ sentBytes[len(sentBytes)-1] = '\n'
pipeOut.Write(sentBytes)
- pipeOut.Write([]byte("\nafter\n"))
+
+ pipeOut.Write([]byte("after"))
pipeOut.Close()
}()
- if before := <-logChan; before != "before" {
- t.Fatalf("\"before\" not received (got \"%s\")", before)
+ if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+ t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
+ }
+
+ receivedString, err := rcv.ReadBytes('\n')
+ if err != nil {
+ t.Fatal(err)
}
- receivedString := <-logChan
receivedBytes := []byte(receivedString)
if bytes.Compare(receivedBytes, sentBytes) != 0 {
- t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+ t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes)+1, len(receivedBytes))
}
- if after := <-logChan; after != "after" {
- t.Fatal("\"after\" not received")
+
+ if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+ t.Fatal("\"after\n\" not received (got \"%s\", %s)", after, err)
}
+
select {
case <-time.After(time.Second):
t.Fatal("Timeout")
@@ -93,3 +96,16 @@ func TestCopyPipeToChanLongLines(t *testing.T) {
// Done.
}
}
+
+func captureLogs() (*bufio.Reader) {
+ // Send childLog to our bufio reader instead of stderr
+ stderrIn, stderrOut := io.Pipe()
+ childLog = log.New(stderrOut, "", 0)
+ statLog = log.New(stderrOut, "crunchstat: ", 0)
+ return bufio.NewReader(stderrIn)
+}
+
+func uncaptureLogs() {
+ childLog = log.New(os.Stderr, "", 0)
+ statLog = log.New(os.Stderr, "crunchstat: ", 0)
+}
commit 61508b804d2ab9fc36308bb74c00e5bd58e09314
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Mar 2 14:42:35 2015 -0500
5043: Accept long stderr lines from crunch tasks.
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index e35e98a..4e7e53d 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -35,19 +35,25 @@ type Cgroup struct {
}
func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
- defer in.Close()
-
- // TODO(twp): handle long input records gracefully, if possible
- // without killing the child task (#4889)
- //
- s := bufio.NewScanner(in)
- for s.Scan() {
- out <- s.Text()
- }
- if s.Err() != nil {
- out <- fmt.Sprintf("crunchstat: line buffering error: %s", s.Err())
+ reader := bufio.NewReader(in)
+ for {
+ line, err := reader.ReadBytes('\n')
+ if len(line) > 0 {
+ if err == nil {
+ // err == nil IFF line ends in \n
+ line = line[:len(line)-1]
+ }
+ out <- string(line)
+ }
+ if err != nil {
+ if err != io.EOF {
+ out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
+ }
+ break
+ }
}
done <- true
+ in.Close()
}
func CopyChanToPipe(in <-chan string, out io.Writer) {
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index e3c3a59..91fe851 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -2,10 +2,13 @@ package main
import (
"bufio"
+ "bytes"
"io"
+ "math/rand"
"os"
"regexp"
"testing"
+ "time"
)
func TestReadAllOrWarnFail(t *testing.T) {
@@ -51,8 +54,8 @@ func TestReadAllOrWarnSuccess(t *testing.T) {
}
}
-// Test that if CopyPipeToChan reads a line longer than
-// bufio.MaxScanTokenSize, it emits an error to the output channel.
+// Test that CopyPipeToChan works even on lines longer than
+// bufio.MaxScanTokenSize.
func TestCopyPipeToChanLongLines(t *testing.T) {
logChan := make(chan string)
control := make(chan bool)
@@ -60,20 +63,33 @@ func TestCopyPipeToChanLongLines(t *testing.T) {
pipeIn, pipeOut := io.Pipe()
go CopyPipeToChan(pipeIn, logChan, control)
+ sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
go func() {
- long_line := make([]byte, bufio.MaxScanTokenSize+1)
- for i := range long_line {
- long_line[i] = byte('x')
+ for i := range sentBytes {
+ // Some bytes that aren't newlines:
+ sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
}
- pipeOut.Write(long_line)
+ pipeOut.Write([]byte("before\n"))
+ pipeOut.Write(sentBytes)
+ pipeOut.Write([]byte("\nafter\n"))
+ pipeOut.Close()
}()
- // Expect error message from logChan.
-
- errmsg := <-logChan
- if matched, err := regexp.MatchString("^crunchstat: line buffering error:.*token too long", errmsg); err != nil || !matched {
- t.Fatalf("expected CopyPipeToChan error, got %s", errmsg)
+ if before := <-logChan; before != "before" {
+ t.Fatalf("\"before\" not received (got \"%s\")", before)
+ }
+ receivedString := <-logChan
+ receivedBytes := []byte(receivedString)
+ if bytes.Compare(receivedBytes, sentBytes) != 0 {
+ t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+ }
+ if after := <-logChan; after != "after" {
+ t.Fatal("\"after\" not received")
+ }
+ select {
+ case <-time.After(time.Second):
+ t.Fatal("Timeout")
+ case <-control:
+ // Done.
}
-
- <-control
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list