[ARVADOS] created: 46558e7be9da2099ccb12497230ad81ed1d35889

git at public.curoverse.com git at public.curoverse.com
Mon Mar 2 21:35:07 EST 2015


        at  46558e7be9da2099ccb12497230ad81ed1d35889 (commit)


commit 46558e7be9da2099ccb12497230ad81ed1d35889
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Mar 2 21:35:35 2015 -0500

    5043: Use Go's log package to serialize writes. Lose logChan.

diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 4e7e53d..356eb01 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -34,7 +34,10 @@ type Cgroup struct {
 	cid    string
 }
 
-func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
+var childLog = log.New(os.Stderr, "", 0)
+var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+
+func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
 	reader := bufio.NewReader(in)
 	for {
 		line, err := reader.ReadBytes('\n')
@@ -43,38 +46,22 @@ func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
 				// err == nil IFF line ends in \n
 				line = line[:len(line)-1]
 			}
-			out <- string(line)
+			childLog.Println(string(line))
 		}
-		if err != nil {
-			if err != io.EOF {
-				out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
-			}
+		if err == io.EOF {
 			break
+		} else if err != nil {
+			statLog.Fatalln("line buffering error:", err)
 		}
 	}
 	done <- true
 	in.Close()
 }
 
-func CopyChanToPipe(in <-chan string, out io.Writer) {
-	for s := range in {
-		fmt.Fprintln(out, s)
-	}
-}
-
-var logChan chan string
-
-func LogPrintf(format string, args ...interface{}) {
-	if logChan == nil {
-		return
-	}
-	logChan <- fmt.Sprintf("crunchstat: "+format, args...)
-}
-
 func ReadAllOrWarn(in *os.File) ([]byte, error) {
 	content, err := ioutil.ReadAll(in)
 	if err != nil {
-		LogPrintf("read %s: %s", in.Name(), err)
+		statLog.Printf("read %s: %s\n", in.Name(), err)
 	}
 	return content, err
 }
@@ -116,9 +103,9 @@ func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error
 		// [b] after all contained processes have exited.
 		reportedStatFile[stat] = path
 		if path == "" {
-			LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+			statLog.Printf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
 		} else {
-			LogPrintf("reading stats from %s", path)
+			statLog.Printf("reading stats from %s\n", path)
 		}
 	}
 	return file, err
@@ -136,7 +123,7 @@ func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
 		statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
 		stats, err := ioutil.ReadFile(statsFilename)
 		if err != nil {
-			LogPrintf("read %s: %s", statsFilename, err)
+			statLog.Printf("read %s: %s\n", statsFilename, err)
 			continue
 		}
 		return strings.NewReader(string(stats)), nil
@@ -189,7 +176,7 @@ func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
 				sample.txBytes-prev.txBytes,
 				sample.rxBytes-prev.rxBytes)
 		}
-		LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
+		statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
 		lastSample[dev] = sample
 	}
 }
@@ -222,7 +209,7 @@ func DoMemoryStats(cgroup Cgroup) {
 			outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
 		}
 	}
-	LogPrintf("mem%s", outstat.String())
+	statLog.Printf("mem%s\n", outstat.String())
 }
 
 func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
@@ -264,7 +251,7 @@ func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
 				tx-prev.txBytes,
 				rx-prev.rxBytes)
 		}
-		LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta)
+		statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
 		lastSample[ifName] = nextSample
 	}
 }
@@ -325,7 +312,7 @@ func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
 			nextSample.user-lastSample.user,
 			nextSample.sys-lastSample.sys)
 	}
-	LogPrintf("cpu %.4f user %.4f sys %d cpus%s",
+	statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
 		nextSample.user, nextSample.sys, nextSample.cpus, delta)
 	*lastSample = nextSample
 }
@@ -377,23 +364,19 @@ func run(logger *log.Logger) error {
 	flag.Parse()
 
 	if cgroup_root == "" {
-		logger.Fatal("Must provide -cgroup-root")
+		statLog.Fatalln("Must provide -cgroup-root")
 	}
 
-	logChan = make(chan string, 1)
-	defer close(logChan)
 	finish_chan := make(chan bool)
 	defer close(finish_chan)
 
-	go CopyChanToPipe(logChan, os.Stderr)
-
 	var cmd *exec.Cmd
 
 	if len(flag.Args()) > 0 {
 		// Set up subprocess
 		cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
 
-		logger.Print("Running ", flag.Args())
+		statLog.Println("Running ", flag.Args())
 
 		// Child process will use our stdin and stdout pipes
 		// (we close our copies below)
@@ -401,27 +384,27 @@ func run(logger *log.Logger) error {
 		cmd.Stdout = os.Stdout
 
 		// Forward SIGINT and SIGTERM to inner process
-		term := make(chan os.Signal, 1)
+		sigChan := make(chan os.Signal, 1)
 		go func(sig <-chan os.Signal) {
 			catch := <-sig
 			if cmd.Process != nil {
 				cmd.Process.Signal(catch)
 			}
-			logger.Print("caught signal: ", catch)
-		}(term)
-		signal.Notify(term, syscall.SIGTERM)
-		signal.Notify(term, syscall.SIGINT)
+			statLog.Println("caught signal:", catch)
+		}(sigChan)
+		signal.Notify(sigChan, syscall.SIGTERM)
+		signal.Notify(sigChan, syscall.SIGINT)
 
 		// Funnel stderr through our channel
 		stderr_pipe, err := cmd.StderrPipe()
 		if err != nil {
-			logger.Fatal(err)
+			statLog.Fatalln("stderr:", err)
 		}
-		go CopyPipeToChan(stderr_pipe, logChan, finish_chan)
+		go CopyPipeToChildLog(stderr_pipe, finish_chan)
 
 		// Run subprocess
 		if err := cmd.Start(); err != nil {
-			logger.Fatal(err)
+			statLog.Fatalln("cmd.Start:", err)
 		}
 
 		// Close stdin/stdout in this (parent) process
@@ -445,7 +428,7 @@ func run(logger *log.Logger) error {
 			time.Sleep(100 * time.Millisecond)
 		}
 		if !ok {
-			logger.Printf("Could not read cid file %s", cgroup_cidfile)
+			statLog.Println("Could not read cid file:", cgroup_cidfile)
 		}
 	}
 
@@ -478,7 +461,7 @@ func main() {
 				os.Exit(status.ExitStatus())
 			}
 		} else {
-			logger.Fatalf("cmd.Wait: %v", err)
+			statLog.Fatalln("cmd.Wait:", err)
 		}
 	}
 }
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index 91fe851..fe922e9 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -4,6 +4,7 @@ import (
 	"bufio"
 	"bytes"
 	"io"
+	"log"
 	"math/rand"
 	"os"
 	"regexp"
@@ -12,9 +13,9 @@ import (
 )
 
 func TestReadAllOrWarnFail(t *testing.T) {
-	logChan = make(chan string)
+	rcv := captureLogs()
+	defer uncaptureLogs()
 	go func() {
-		defer close(logChan)
 		// The special file /proc/self/mem can be opened for
 		// reading, but reading from byte 0 returns an error.
 		f, err := os.Open("/proc/self/mem")
@@ -25,67 +26,69 @@ func TestReadAllOrWarnFail(t *testing.T) {
 			t.Fatalf("Expected error, got %v", x)
 		}
 	}()
-	if _, ok := <-logChan; !ok {
-		t.Fatalf("Expected error message about nonexistent file")
-	}
-	if msg, ok := <-logChan; ok {
-		t.Fatalf("Expected channel to close, got %s", msg)
+	if msg, err := rcv.ReadBytes('\n'); err != nil {
+		t.Fatal(err)
+	} else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
+		t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
 	}
 }
 
 func TestReadAllOrWarnSuccess(t *testing.T) {
-	logChan = make(chan string)
-	go func() {
-		defer close(logChan)
-		f, err := os.Open("./crunchstat_test.go")
-		if err != nil {
-			t.Fatalf("Opening ./crunchstat_test.go: %s", err)
-		}
-		data, err := ReadAllOrWarn(f)
-		if err != nil {
-			t.Fatalf("got error %s", err)
-		}
-		if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
-			t.Fatalf("data failed regexp: %s", err)
-		}
-	}()
-	if msg, ok := <-logChan; ok {
-		t.Fatalf("Expected channel to close, got %s", msg)
+	f, err := os.Open("./crunchstat_test.go")
+	if err != nil {
+		t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+	}
+	data, err := ReadAllOrWarn(f)
+	if err != nil {
+		t.Fatalf("got error %s", err)
+	}
+	if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
+		t.Fatalf("data failed regexp: %s", err)
 	}
 }
 
-// Test that CopyPipeToChan works even on lines longer than
+// Test that CopyPipeToChildLog works even on lines longer than
 // bufio.MaxScanTokenSize.
-func TestCopyPipeToChanLongLines(t *testing.T) {
-	logChan := make(chan string)
-	control := make(chan bool)
+func TestCopyPipeToChildLogLongLines(t *testing.T) {
+	rcv := captureLogs()
+	defer uncaptureLogs()
 
+	control := make(chan bool)
 	pipeIn, pipeOut := io.Pipe()
-	go CopyPipeToChan(pipeIn, logChan, control)
+	go CopyPipeToChildLog(pipeIn, control)
 
 	sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
 	go func() {
+		pipeOut.Write([]byte("before\n"))
+
 		for i := range sentBytes {
 			// Some bytes that aren't newlines:
 			sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
 		}
-		pipeOut.Write([]byte("before\n"))
+		sentBytes[len(sentBytes)-1] = '\n'
 		pipeOut.Write(sentBytes)
-		pipeOut.Write([]byte("\nafter\n"))
+
+		pipeOut.Write([]byte("after"))
 		pipeOut.Close()
 	}()
 
-	if before := <-logChan; before != "before" {
-		t.Fatalf("\"before\" not received (got \"%s\")", before)
+	if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+		t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
+	}
+	
+	receivedString, err := rcv.ReadBytes('\n')
+	if err != nil {
+		t.Fatal(err)
 	}
-	receivedString := <-logChan
 	receivedBytes := []byte(receivedString)
 	if bytes.Compare(receivedBytes, sentBytes) != 0 {
-		t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+		t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes)+1, len(receivedBytes))
 	}
-	if after := <-logChan; after != "after" {
-		t.Fatal("\"after\" not received")
+
+	if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+		t.Fatal("\"after\n\" not received (got \"%s\", %s)", after, err)
 	}
+
 	select {
 	case <-time.After(time.Second):
 		t.Fatal("Timeout")
@@ -93,3 +96,16 @@ func TestCopyPipeToChanLongLines(t *testing.T) {
 		// Done.
 	}
 }
+
+func captureLogs() (*bufio.Reader) {
+	// Send childLog to our bufio reader instead of stderr
+	stderrIn, stderrOut := io.Pipe()
+	childLog = log.New(stderrOut, "", 0)
+	statLog = log.New(stderrOut, "crunchstat: ", 0)
+	return bufio.NewReader(stderrIn)
+}
+
+func uncaptureLogs() {
+	childLog = log.New(os.Stderr, "", 0)
+	statLog = log.New(os.Stderr, "crunchstat: ", 0)
+}

commit 61508b804d2ab9fc36308bb74c00e5bd58e09314
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Mar 2 14:42:35 2015 -0500

    5043: Accept long stderr lines from crunch tasks.

diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index e35e98a..4e7e53d 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -35,19 +35,25 @@ type Cgroup struct {
 }
 
 func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
-	defer in.Close()
-
-	// TODO(twp): handle long input records gracefully, if possible
-	// without killing the child task (#4889)
-	//
-	s := bufio.NewScanner(in)
-	for s.Scan() {
-		out <- s.Text()
-	}
-	if s.Err() != nil {
-		out <- fmt.Sprintf("crunchstat: line buffering error: %s", s.Err())
+	reader := bufio.NewReader(in)
+	for {
+		line, err := reader.ReadBytes('\n')
+		if len(line) > 0 {
+			if err == nil {
+				// err == nil IFF line ends in \n
+				line = line[:len(line)-1]
+			}
+			out <- string(line)
+		}
+		if err != nil {
+			if err != io.EOF {
+				out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
+			}
+			break
+		}
 	}
 	done <- true
+	in.Close()
 }
 
 func CopyChanToPipe(in <-chan string, out io.Writer) {
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index e3c3a59..91fe851 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -2,10 +2,13 @@ package main
 
 import (
 	"bufio"
+	"bytes"
 	"io"
+	"math/rand"
 	"os"
 	"regexp"
 	"testing"
+	"time"
 )
 
 func TestReadAllOrWarnFail(t *testing.T) {
@@ -51,8 +54,8 @@ func TestReadAllOrWarnSuccess(t *testing.T) {
 	}
 }
 
-// Test that if CopyPipeToChan reads a line longer than
-// bufio.MaxScanTokenSize, it emits an error to the output channel.
+// Test that CopyPipeToChan works even on lines longer than
+// bufio.MaxScanTokenSize.
 func TestCopyPipeToChanLongLines(t *testing.T) {
 	logChan := make(chan string)
 	control := make(chan bool)
@@ -60,20 +63,33 @@ func TestCopyPipeToChanLongLines(t *testing.T) {
 	pipeIn, pipeOut := io.Pipe()
 	go CopyPipeToChan(pipeIn, logChan, control)
 
+	sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
 	go func() {
-		long_line := make([]byte, bufio.MaxScanTokenSize+1)
-		for i := range long_line {
-			long_line[i] = byte('x')
+		for i := range sentBytes {
+			// Some bytes that aren't newlines:
+			sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
 		}
-		pipeOut.Write(long_line)
+		pipeOut.Write([]byte("before\n"))
+		pipeOut.Write(sentBytes)
+		pipeOut.Write([]byte("\nafter\n"))
+		pipeOut.Close()
 	}()
 
-	// Expect error message from logChan.
-
-	errmsg := <-logChan
-	if matched, err := regexp.MatchString("^crunchstat: line buffering error:.*token too long", errmsg); err != nil || !matched {
-		t.Fatalf("expected CopyPipeToChan error, got %s", errmsg)
+	if before := <-logChan; before != "before" {
+		t.Fatalf("\"before\" not received (got \"%s\")", before)
+	}
+	receivedString := <-logChan
+	receivedBytes := []byte(receivedString)
+	if bytes.Compare(receivedBytes, sentBytes) != 0 {
+		t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+	}
+	if after := <-logChan; after != "after" {
+		t.Fatal("\"after\" not received")
+	}
+	select {
+	case <-time.After(time.Second):
+		t.Fatal("Timeout")
+	case <-control:
+		// Done.
 	}
-
-	<-control
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list