[ARVADOS] created: 6d909db25783bd19a152f25e47262de70cd8719f

Git user git at public.curoverse.com
Thu Jun 30 09:12:44 EDT 2016


        at  6d909db25783bd19a152f25e47262de70cd8719f (commit)


commit 6d909db25783bd19a152f25e47262de70cd8719f
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Jun 30 09:11:49 2016 -0400

    8016: Reduce logging noise by waiting for cgroup files to appear before polling.

diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 8d7621c..3ba3ce6 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -70,7 +70,7 @@ func (r *Reporter) Stop() {
 	close(r.done)
 }
 
-func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
+func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
 	content, err := ioutil.ReadAll(in)
 	if err != nil {
 		r.Logger.Print(err)
@@ -79,14 +79,20 @@ func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
 }
 
 // Open the cgroup stats file in /sys/fs corresponding to the target
-// cgroup, and return an *os.File. If no stats file is available,
+// cgroup, and return an io.ReadCloser. If no stats file is available,
 // return nil.
 //
+// Log the file that was opened, if it isn't the same file opened on
+// the last openStatFile for this stat.
+//
+// Log "not available" if no file is found and either this stat has
+// been available in the past, or verbose==true.
+//
 // TODO: Instead of trying all options, choose a process in the
 // container, and read /proc/PID/cgroup to determine the appropriate
 // cgroup root for the given statgroup. (This will avoid falling back
 // to host-level stats during container setup and teardown.)
-func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error) {
+func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
 	var paths []string
 	if r.CID != "" {
 		// Collect container's stats
@@ -112,16 +118,16 @@ func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error)
 			path = ""
 		}
 	}
-	if pathWas, ok := r.reportedStatFile[stat]; !ok || pathWas != path {
+	if pathWas := r.reportedStatFile[stat]; pathWas != path {
 		// Log whenever we start using a new/different cgroup
 		// stat file for a given statistic. This typically
 		// happens 1 to 3 times per statistic, depending on
 		// whether we happen to collect stats [a] before any
 		// processes have been created in the container and
 		// [b] after all contained processes have exited.
-		if path == "" {
+		if path == "" && verbose {
 			r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
-		} else if ok {
+		} else if pathWas != "" {
 			r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
 		} else {
 			r.Logger.Printf("notice: reading stats from %s\n", path)
@@ -132,7 +138,7 @@ func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error)
 }
 
 func (r *Reporter) getContainerNetStats() (io.Reader, error) {
-	procsFile, err := r.openStatFile("cpuacct", "cgroup.procs")
+	procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
 	if err != nil {
 		return nil, err
 	}
@@ -158,7 +164,7 @@ type IoSample struct {
 }
 
 func (r *Reporter) DoBlkIoStats() {
-	c, err := r.openStatFile("blkio", "blkio.io_service_bytes")
+	c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
 	if err != nil {
 		return
 	}
@@ -207,7 +213,7 @@ type MemSample struct {
 }
 
 func (r *Reporter) DoMemoryStats() {
-	c, err := r.openStatFile("memory", "memory.stat")
+	c, err := r.openStatFile("memory", "memory.stat", true)
 	if err != nil {
 		return
 	}
@@ -287,7 +293,7 @@ type CpuSample struct {
 // Return the number of CPUs available in the container. Return 0 if
 // we can't figure out the real number of CPUs.
 func (r *Reporter) GetCpuCount() int64 {
-	cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus")
+	cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
 	if err != nil {
 		return 0
 	}
@@ -308,7 +314,7 @@ func (r *Reporter) GetCpuCount() int64 {
 }
 
 func (r *Reporter) DoCpuStats() {
-	statFile, err := r.openStatFile("cpuacct", "cpuacct.stat")
+	statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
 	if err != nil {
 		return
 	}
@@ -337,13 +343,15 @@ func (r *Reporter) DoCpuStats() {
 	r.lastCPUSample = nextSample
 }
 
-// Report stats periodically until someone closes or sends to r.done.
+// Report stats periodically until r.done indicates someone called
+// Stop.
 func (r *Reporter) run() {
-	if !r.waitForCIDFile() {
+	r.reportedStatFile = make(map[string]string)
+
+	if !r.waitForCIDFile() || !r.waitForCgroup() {
 		return
 	}
 
-	r.reportedStatFile = make(map[string]string)
 	r.lastNetSample = make(map[string]IoSample)
 	r.lastDiskSample = make(map[string]IoSample)
 
@@ -362,7 +370,7 @@ func (r *Reporter) run() {
 }
 
 // If CID is empty, wait for it to appear in CIDFile. Return true if
-// we get it before someone calls Stop().
+// we get it before r.done indicates someone called Stop.
 func (r *Reporter) waitForCIDFile() bool {
 	if r.CID != "" {
 		return true
@@ -384,3 +392,28 @@ func (r *Reporter) waitForCIDFile() bool {
 		}
 	}
 }
+
+// Wait for the cgroup stats files to appear in cgroup_root. Return
+// true if they appear before r.done indicates someone called Stop. If
+// they don't appear within one poll interval, log a warning and keep
+// waiting.
+func (r *Reporter) waitForCgroup() bool {
+	ticker := time.NewTicker(100 * time.Millisecond)
+	defer ticker.Stop()
+	warningTimer := time.After(r.Poll)
+	for {
+		c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
+		if err == nil {
+			c.Close()
+			return true
+		}
+		select {
+		case <-ticker.C:
+		case <-warningTimer:
+			r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.Poll)
+		case <-r.done:
+			r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+			return false
+		}
+	}
+}

commit 096c81d3b4d364705b8ef7489b634bcee05d8b1d
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jun 29 20:13:09 2016 -0400

    8016: Split crunchstat into a module and a commmand line tool.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 30a80f5..2e8641a 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -714,6 +714,7 @@ gostuff=(
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
+    lib/crunchstat
     services/arv-git-httpd
     services/crunchstat
     services/keep-web
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
new file mode 100644
index 0000000..8d7621c
--- /dev/null
+++ b/lib/crunchstat/crunchstat.go
@@ -0,0 +1,386 @@
+package crunchstat
+
+import (
+	"bufio"
+	"bytes"
+	"errors"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"os"
+	"strconv"
+	"strings"
+	"time"
+)
+
+// This magically allows us to look up user_hz via _SC_CLK_TCK:
+
+/*
+#include <unistd.h>
+#include <sys/types.h>
+#include <pwd.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// A Reporter gathers statistics for a cgroup and writes them to a
+// log.Logger.
+type Reporter struct {
+	// CID of the container to monitor. If empty, read the CID
+	// from CIDFile.
+	CID string
+	// Where cgroup special files live on this system
+	CgroupRoot   string
+	CgroupParent string
+	// Path to a file we can read CID from. If CIDFile is empty or
+	// nonexistent, wait for it to appear.
+	CIDFile string
+
+	// Interval between samples
+	Poll time.Duration
+
+	// Where to write statistics.
+	Logger *log.Logger
+
+	reportedStatFile map[string]string
+	lastNetSample    map[string]IoSample
+	lastDiskSample   map[string]IoSample
+	lastCPUSample    CpuSample
+
+	done chan struct{}
+}
+
+// Wait (if necessary) for the CID to appear in CIDFile, then start
+// reporting statistics.
+//
+// Start should not be called more than once on a Reporter.
+//
+// Public data fields should not be changed after calling Start.
+func (r *Reporter) Start() {
+	r.done = make(chan struct{})
+	go r.run()
+}
+
+// Stop reporting statistics. Do not call more than once, or before
+// calling Start.
+//
+// Nothing will be logged after Stop returns.
+func (r *Reporter) Stop() {
+	close(r.done)
+}
+
+func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
+	content, err := ioutil.ReadAll(in)
+	if err != nil {
+		r.Logger.Print(err)
+	}
+	return content, err
+}
+
+// Open the cgroup stats file in /sys/fs corresponding to the target
+// cgroup, and return an *os.File. If no stats file is available,
+// return nil.
+//
+// TODO: Instead of trying all options, choose a process in the
+// container, and read /proc/PID/cgroup to determine the appropriate
+// cgroup root for the given statgroup. (This will avoid falling back
+// to host-level stats during container setup and teardown.)
+func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error) {
+	var paths []string
+	if r.CID != "" {
+		// Collect container's stats
+		paths = []string{
+			fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
+			fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
+		}
+	} else {
+		// Collect this host's stats
+		paths = []string{
+			fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
+			fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
+		}
+	}
+	var path string
+	var file *os.File
+	var err error
+	for _, path = range paths {
+		file, err = os.Open(path)
+		if err == nil {
+			break
+		} else {
+			path = ""
+		}
+	}
+	if pathWas, ok := r.reportedStatFile[stat]; !ok || pathWas != path {
+		// Log whenever we start using a new/different cgroup
+		// stat file for a given statistic. This typically
+		// happens 1 to 3 times per statistic, depending on
+		// whether we happen to collect stats [a] before any
+		// processes have been created in the container and
+		// [b] after all contained processes have exited.
+		if path == "" {
+			r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
+		} else if ok {
+			r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
+		} else {
+			r.Logger.Printf("notice: reading stats from %s\n", path)
+		}
+		r.reportedStatFile[stat] = path
+	}
+	return file, err
+}
+
+func (r *Reporter) getContainerNetStats() (io.Reader, error) {
+	procsFile, err := r.openStatFile("cpuacct", "cgroup.procs")
+	if err != nil {
+		return nil, err
+	}
+	defer procsFile.Close()
+	reader := bufio.NewScanner(procsFile)
+	for reader.Scan() {
+		taskPid := reader.Text()
+		statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
+		stats, err := ioutil.ReadFile(statsFilename)
+		if err != nil {
+			r.Logger.Print(err)
+			continue
+		}
+		return strings.NewReader(string(stats)), nil
+	}
+	return nil, errors.New("Could not read stats for any proc in container")
+}
+
+type IoSample struct {
+	sampleTime time.Time
+	txBytes    int64
+	rxBytes    int64
+}
+
+func (r *Reporter) DoBlkIoStats() {
+	c, err := r.openStatFile("blkio", "blkio.io_service_bytes")
+	if err != nil {
+		return
+	}
+	defer c.Close()
+	b := bufio.NewScanner(c)
+	var sampleTime = time.Now()
+	newSamples := make(map[string]IoSample)
+	for b.Scan() {
+		var device, op string
+		var val int64
+		if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
+			continue
+		}
+		var thisSample IoSample
+		var ok bool
+		if thisSample, ok = newSamples[device]; !ok {
+			thisSample = IoSample{sampleTime, -1, -1}
+		}
+		switch op {
+		case "Read":
+			thisSample.rxBytes = val
+		case "Write":
+			thisSample.txBytes = val
+		}
+		newSamples[device] = thisSample
+	}
+	for dev, sample := range newSamples {
+		if sample.txBytes < 0 || sample.rxBytes < 0 {
+			continue
+		}
+		delta := ""
+		if prev, ok := r.lastDiskSample[dev]; ok {
+			delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
+				sample.sampleTime.Sub(prev.sampleTime).Seconds(),
+				sample.txBytes-prev.txBytes,
+				sample.rxBytes-prev.rxBytes)
+		}
+		r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
+		r.lastDiskSample[dev] = sample
+	}
+}
+
+type MemSample struct {
+	sampleTime time.Time
+	memStat    map[string]int64
+}
+
+func (r *Reporter) DoMemoryStats() {
+	c, err := r.openStatFile("memory", "memory.stat")
+	if err != nil {
+		return
+	}
+	defer c.Close()
+	b := bufio.NewScanner(c)
+	thisSample := MemSample{time.Now(), make(map[string]int64)}
+	wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
+	for b.Scan() {
+		var stat string
+		var val int64
+		if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
+			continue
+		}
+		thisSample.memStat[stat] = val
+	}
+	var outstat bytes.Buffer
+	for _, key := range wantStats {
+		if val, ok := thisSample.memStat[key]; ok {
+			outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+		}
+	}
+	r.Logger.Printf("mem%s\n", outstat.String())
+}
+
+func (r *Reporter) DoNetworkStats() {
+	sampleTime := time.Now()
+	stats, err := r.getContainerNetStats()
+	if err != nil {
+		return
+	}
+
+	scanner := bufio.NewScanner(stats)
+	for scanner.Scan() {
+		var ifName string
+		var rx, tx int64
+		words := strings.Fields(scanner.Text())
+		if len(words) != 17 {
+			// Skip lines with wrong format
+			continue
+		}
+		ifName = strings.TrimRight(words[0], ":")
+		if ifName == "lo" || ifName == "" {
+			// Skip loopback interface and lines with wrong format
+			continue
+		}
+		if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
+			continue
+		}
+		if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
+			continue
+		}
+		nextSample := IoSample{}
+		nextSample.sampleTime = sampleTime
+		nextSample.txBytes = tx
+		nextSample.rxBytes = rx
+		var delta string
+		if prev, ok := r.lastNetSample[ifName]; ok {
+			interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
+			delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
+				interval,
+				tx-prev.txBytes,
+				rx-prev.rxBytes)
+		}
+		r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
+		r.lastNetSample[ifName] = nextSample
+	}
+}
+
+type CpuSample struct {
+	hasData    bool // to distinguish the zero value from real data
+	sampleTime time.Time
+	user       float64
+	sys        float64
+	cpus       int64
+}
+
+// Return the number of CPUs available in the container. Return 0 if
+// we can't figure out the real number of CPUs.
+func (r *Reporter) GetCpuCount() int64 {
+	cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus")
+	if err != nil {
+		return 0
+	}
+	defer cpusetFile.Close()
+	b, err := r.readAllOrWarn(cpusetFile)
+	sp := strings.Split(string(b), ",")
+	cpus := int64(0)
+	for _, v := range sp {
+		var min, max int64
+		n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
+		if n == 2 {
+			cpus += (max - min) + 1
+		} else {
+			cpus += 1
+		}
+	}
+	return cpus
+}
+
+func (r *Reporter) DoCpuStats() {
+	statFile, err := r.openStatFile("cpuacct", "cpuacct.stat")
+	if err != nil {
+		return
+	}
+	defer statFile.Close()
+	b, err := r.readAllOrWarn(statFile)
+	if err != nil {
+		return
+	}
+
+	nextSample := CpuSample{true, time.Now(), 0, 0, r.GetCpuCount()}
+	var userTicks, sysTicks int64
+	fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
+	user_hz := float64(C.sysconf(C._SC_CLK_TCK))
+	nextSample.user = float64(userTicks) / user_hz
+	nextSample.sys = float64(sysTicks) / user_hz
+
+	delta := ""
+	if r.lastCPUSample.hasData {
+		delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
+			nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
+			nextSample.user-r.lastCPUSample.user,
+			nextSample.sys-r.lastCPUSample.sys)
+	}
+	r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
+		nextSample.user, nextSample.sys, nextSample.cpus, delta)
+	r.lastCPUSample = nextSample
+}
+
+// Report stats periodically until someone closes or sends to r.done.
+func (r *Reporter) run() {
+	if !r.waitForCIDFile() {
+		return
+	}
+
+	r.reportedStatFile = make(map[string]string)
+	r.lastNetSample = make(map[string]IoSample)
+	r.lastDiskSample = make(map[string]IoSample)
+
+	ticker := time.NewTicker(r.Poll)
+	for {
+		r.DoMemoryStats()
+		r.DoCpuStats()
+		r.DoBlkIoStats()
+		r.DoNetworkStats()
+		select {
+		case <-r.done:
+			return
+		case <-ticker.C:
+		}
+	}
+}
+
+// If CID is empty, wait for it to appear in CIDFile. Return true if
+// we get it before someone calls Stop().
+func (r *Reporter) waitForCIDFile() bool {
+	if r.CID != "" {
+		return true
+	}
+
+	ticker := time.NewTicker(100 * time.Millisecond)
+	defer ticker.Stop()
+	for {
+		cid, err := ioutil.ReadFile(r.CIDFile)
+		if err == nil && len(cid) > 0 {
+			r.CID = string(cid)
+			return true
+		}
+		select {
+		case <-ticker.C:
+		case <-r.done:
+			r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+			return false
+		}
+	}
+}
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
new file mode 100644
index 0000000..864a3e3
--- /dev/null
+++ b/lib/crunchstat/crunchstat_test.go
@@ -0,0 +1,59 @@
+package crunchstat
+
+import (
+	"bufio"
+	"io"
+	"log"
+	"os"
+	"regexp"
+	"testing"
+)
+
+func bufLogger() (*log.Logger, *bufio.Reader) {
+	r, w := io.Pipe()
+	logger := log.New(w, "", 0)
+	return logger, bufio.NewReader(r)
+}
+
+func TestReadAllOrWarnFail(t *testing.T) {
+	logger, rcv := bufLogger()
+	rep := Reporter{Logger: logger}
+
+	var msg []byte
+	var err error
+	go func() {
+		msg, err = rcv.ReadBytes('\n')
+	}()
+	{
+		// The special file /proc/self/mem can be opened for
+		// reading, but reading from byte 0 returns an error.
+		f, err := os.Open("/proc/self/mem")
+		if err != nil {
+			t.Fatalf("Opening /proc/self/mem: %s", err)
+		}
+		if x, err := rep.readAllOrWarn(f); err == nil {
+			t.Fatalf("Expected error, got %v", x)
+		}
+	}
+	if err != nil {
+		t.Fatal(err)
+	} else if matched, err := regexp.MatchString("^read /proc/self/mem: .*", string(msg)); err != nil || !matched {
+		t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+	}
+}
+
+func TestReadAllOrWarnSuccess(t *testing.T) {
+	rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+
+	f, err := os.Open("./crunchstat_test.go")
+	if err != nil {
+		t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+	}
+	data, err := rep.readAllOrWarn(f)
+	if err != nil {
+		t.Fatalf("got error %s", err)
+	}
+	if matched, err := regexp.MatchString("^package crunchstat\n", string(data)); err != nil || !matched {
+		t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+	}
+}
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 6bce325..8c05069 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -2,485 +2,122 @@ package main
 
 import (
 	"bufio"
-	"bytes"
-	"errors"
 	"flag"
-	"fmt"
 	"io"
-	"io/ioutil"
 	"log"
 	"os"
 	"os/exec"
 	"os/signal"
-	"strconv"
-	"strings"
 	"syscall"
 	"time"
-)
 
-/*
-#include <unistd.h>
-#include <sys/types.h>
-#include <pwd.h>
-#include <stdlib.h>
-*/
-import "C"
+	"git.curoverse.com/arvados.git/lib/crunchstat"
+)
 
-// The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
+const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
 
-type Cgroup struct {
-	root   string
-	parent string
-	cid    string
-}
+func main() {
+	reporter := crunchstat.Reporter{
+		Logger: log.New(os.Stderr, "crunchstat: ", 0),
+	}
 
-var childLog = log.New(os.Stderr, "", 0)
-var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+	flag.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
+	flag.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
+	flag.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
+	pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
 
-const (
-	MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-)
+	flag.Parse()
 
-func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
-	reader := bufio.NewReaderSize(in, MaxLogLine)
-	var prefix string
-	for {
-		line, isPrefix, err := reader.ReadLine()
-		if err == io.EOF {
-			break
-		} else if err != nil {
-			statLog.Fatal("error reading child stderr:", err)
-		}
-		var suffix string
-		if isPrefix {
-			suffix = "[...]"
-		}
-		childLog.Print(prefix, string(line), suffix)
-		// Set up prefix for following line
-		if isPrefix {
-			prefix = "[...]"
-		} else {
-			prefix = ""
-		}
+	if reporter.CgroupRoot == "" {
+		reporter.Logger.Fatal("error: must provide -cgroup-root")
 	}
-	done <- true
-	in.Close()
-}
+	reporter.Poll = time.Duration(*pollMsec) * time.Millisecond
 
-func ReadAllOrWarn(in *os.File) ([]byte, error) {
-	content, err := ioutil.ReadAll(in)
-	if err != nil {
-		statLog.Printf("error reading %s: %s\n", in.Name(), err)
-	}
-	return content, err
-}
+	reporter.Start()
+	err := runCommand(flag.Args(), reporter.Logger)
+	reporter.Stop()
 
-var reportedStatFile = map[string]string{}
+	if err, ok := err.(*exec.ExitError); ok {
+		// The program has exited with an exit code != 0
 
-// Open the cgroup stats file in /sys/fs corresponding to the target
-// cgroup, and return an *os.File. If no stats file is available,
-// return nil.
-//
-// TODO: Instead of trying all options, choose a process in the
-// container, and read /proc/PID/cgroup to determine the appropriate
-// cgroup root for the given statgroup. (This will avoid falling back
-// to host-level stats during container setup and teardown.)
-func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
-	var paths []string
-	if cgroup.cid != "" {
-		// Collect container's stats
-		paths = []string{
-			fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
-			fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
-		}
-	} else {
-		// Collect this host's stats
-		paths = []string{
-			fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
-			fmt.Sprintf("%s/%s", cgroup.root, stat),
-		}
-	}
-	var path string
-	var file *os.File
-	var err error
-	for _, path = range paths {
-		file, err = os.Open(path)
-		if err == nil {
-			break
+		// This works on both Unix and Windows. Although
+		// package syscall is generally platform dependent,
+		// WaitStatus is defined for both Unix and Windows and
+		// in both cases has an ExitStatus() method with the
+		// same signature.
+		if status, ok := err.Sys().(syscall.WaitStatus); ok {
+			os.Exit(status.ExitStatus())
 		} else {
-			path = ""
+			reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
 		}
+	} else if err != nil {
+		reporter.Logger.Fatalln("error in cmd.Wait:", err)
 	}
-	if pathWas, ok := reportedStatFile[stat]; !ok || pathWas != path {
-		// Log whenever we start using a new/different cgroup
-		// stat file for a given statistic. This typically
-		// happens 1 to 3 times per statistic, depending on
-		// whether we happen to collect stats [a] before any
-		// processes have been created in the container and
-		// [b] after all contained processes have exited.
-		if path == "" {
-			statLog.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
-		} else if ok {
-			statLog.Printf("notice: stats moved from %s to %s\n", reportedStatFile[stat], path)
-		} else {
-			statLog.Printf("notice: reading stats from %s\n", path)
-		}
-		reportedStatFile[stat] = path
-	}
-	return file, err
 }
 
-func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
-	procsFile, err := OpenStatFile(cgroup, "cpuacct", "cgroup.procs")
-	if err != nil {
-		return nil, err
-	}
-	defer procsFile.Close()
-	reader := bufio.NewScanner(procsFile)
-	for reader.Scan() {
-		taskPid := reader.Text()
-		statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
-		stats, err := ioutil.ReadFile(statsFilename)
-		if err != nil {
-			statLog.Printf("error reading %s: %s\n", statsFilename, err)
-			continue
-		}
-		return strings.NewReader(string(stats)), nil
-	}
-	return nil, errors.New("Could not read stats for any proc in container")
-}
+func runCommand(argv []string, logger *log.Logger) error {
+	cmd := exec.Command(argv[0], argv[1:]...)
 
-type IoSample struct {
-	sampleTime time.Time
-	txBytes    int64
-	rxBytes    int64
-}
+	logger.Println("Running", argv)
 
-func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
-	c, err := OpenStatFile(cgroup, "blkio", "blkio.io_service_bytes")
-	if err != nil {
-		return
-	}
-	defer c.Close()
-	b := bufio.NewScanner(c)
-	var sampleTime = time.Now()
-	newSamples := make(map[string]IoSample)
-	for b.Scan() {
-		var device, op string
-		var val int64
-		if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
-			continue
-		}
-		var thisSample IoSample
-		var ok bool
-		if thisSample, ok = newSamples[device]; !ok {
-			thisSample = IoSample{sampleTime, -1, -1}
-		}
-		switch op {
-		case "Read":
-			thisSample.rxBytes = val
-		case "Write":
-			thisSample.txBytes = val
-		}
-		newSamples[device] = thisSample
-	}
-	for dev, sample := range newSamples {
-		if sample.txBytes < 0 || sample.rxBytes < 0 {
-			continue
-		}
-		delta := ""
-		if prev, ok := lastSample[dev]; ok {
-			delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
-				sample.sampleTime.Sub(prev.sampleTime).Seconds(),
-				sample.txBytes-prev.txBytes,
-				sample.rxBytes-prev.rxBytes)
-		}
-		statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
-		lastSample[dev] = sample
-	}
-}
-
-type MemSample struct {
-	sampleTime time.Time
-	memStat    map[string]int64
-}
+	// Child process will use our stdin and stdout pipes
+	// (we close our copies below)
+	cmd.Stdin = os.Stdin
+	cmd.Stdout = os.Stdout
 
-func DoMemoryStats(cgroup Cgroup) {
-	c, err := OpenStatFile(cgroup, "memory", "memory.stat")
-	if err != nil {
-		return
-	}
-	defer c.Close()
-	b := bufio.NewScanner(c)
-	thisSample := MemSample{time.Now(), make(map[string]int64)}
-	wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
-	for b.Scan() {
-		var stat string
-		var val int64
-		if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
-			continue
-		}
-		thisSample.memStat[stat] = val
-	}
-	var outstat bytes.Buffer
-	for _, key := range wantStats {
-		if val, ok := thisSample.memStat[key]; ok {
-			outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+	// Forward SIGINT and SIGTERM to child process
+	sigChan := make(chan os.Signal, 1)
+	go func(sig <-chan os.Signal) {
+		catch := <-sig
+		if cmd.Process != nil {
+			cmd.Process.Signal(catch)
 		}
-	}
-	statLog.Printf("mem%s\n", outstat.String())
-}
+		logger.Println("notice: caught signal:", catch)
+	}(sigChan)
+	signal.Notify(sigChan, syscall.SIGTERM)
+	signal.Notify(sigChan, syscall.SIGINT)
 
-func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
-	sampleTime := time.Now()
-	stats, err := GetContainerNetStats(cgroup)
+	// Funnel stderr through our channel
+	stderr_pipe, err := cmd.StderrPipe()
 	if err != nil {
-		return
+		logger.Fatalln("error in StderrPipe:", err)
 	}
 
-	scanner := bufio.NewScanner(stats)
-	for scanner.Scan() {
-		var ifName string
-		var rx, tx int64
-		words := strings.Fields(scanner.Text())
-		if len(words) != 17 {
-			// Skip lines with wrong format
-			continue
-		}
-		ifName = strings.TrimRight(words[0], ":")
-		if ifName == "lo" || ifName == "" {
-			// Skip loopback interface and lines with wrong format
-			continue
-		}
-		if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
-			continue
-		}
-		if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
-			continue
-		}
-		nextSample := IoSample{}
-		nextSample.sampleTime = sampleTime
-		nextSample.txBytes = tx
-		nextSample.rxBytes = rx
-		var delta string
-		if prev, ok := lastSample[ifName]; ok {
-			interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
-			delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
-				interval,
-				tx-prev.txBytes,
-				rx-prev.rxBytes)
-		}
-		statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
-		lastSample[ifName] = nextSample
+	// Run subprocess
+	if err := cmd.Start(); err != nil {
+		logger.Fatalln("error in cmd.Start:", err)
 	}
-}
 
-type CpuSample struct {
-	hasData    bool // to distinguish the zero value from real data
-	sampleTime time.Time
-	user       float64
-	sys        float64
-	cpus       int64
-}
+	// Close stdin/stdout in this (parent) process
+	os.Stdin.Close()
+	os.Stdout.Close()
 
-// Return the number of CPUs available in the container. Return 0 if
-// we can't figure out the real number of CPUs.
-func GetCpuCount(cgroup Cgroup) int64 {
-	cpusetFile, err := OpenStatFile(cgroup, "cpuset", "cpuset.cpus")
-	if err != nil {
-		return 0
-	}
-	defer cpusetFile.Close()
-	b, err := ReadAllOrWarn(cpusetFile)
-	sp := strings.Split(string(b), ",")
-	cpus := int64(0)
-	for _, v := range sp {
-		var min, max int64
-		n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
-		if n == 2 {
-			cpus += (max - min) + 1
-		} else {
-			cpus += 1
-		}
-	}
-	return cpus
-}
+	copyPipeToChildLog(stderr_pipe, log.New(os.Stderr, "", 0))
 
-func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
-	statFile, err := OpenStatFile(cgroup, "cpuacct", "cpuacct.stat")
-	if err != nil {
-		return
-	}
-	defer statFile.Close()
-	b, err := ReadAllOrWarn(statFile)
-	if err != nil {
-		return
-	}
-
-	nextSample := CpuSample{true, time.Now(), 0, 0, GetCpuCount(cgroup)}
-	var userTicks, sysTicks int64
-	fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
-	user_hz := float64(C.sysconf(C._SC_CLK_TCK))
-	nextSample.user = float64(userTicks) / user_hz
-	nextSample.sys = float64(sysTicks) / user_hz
-
-	delta := ""
-	if lastSample.hasData {
-		delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
-			nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
-			nextSample.user-lastSample.user,
-			nextSample.sys-lastSample.sys)
-	}
-	statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
-		nextSample.user, nextSample.sys, nextSample.cpus, delta)
-	*lastSample = nextSample
+	return cmd.Wait()
 }
 
-func PollCgroupStats(cgroup Cgroup, poll int64, stop_poll_chan <-chan bool) {
-	var lastNetSample = map[string]IoSample{}
-	var lastDiskSample = map[string]IoSample{}
-	var lastCpuSample = CpuSample{}
-
-	poll_chan := make(chan bool, 1)
-	go func() {
-		// Send periodic poll events.
-		poll_chan <- true
-		for {
-			time.Sleep(time.Duration(poll) * time.Millisecond)
-			poll_chan <- true
-		}
-	}()
+func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+	reader := bufio.NewReaderSize(in, MaxLogLine)
+	var prefix string
 	for {
-		select {
-		case <-stop_poll_chan:
-			return
-		case <-poll_chan:
-			// Emit stats, then select again.
-		}
-		DoMemoryStats(cgroup)
-		DoCpuStats(cgroup, &lastCpuSample)
-		DoBlkIoStats(cgroup, lastDiskSample)
-		DoNetworkStats(cgroup, lastNetSample)
-	}
-}
-
-func run(logger *log.Logger) error {
-
-	var (
-		cgroup_root    string
-		cgroup_parent  string
-		cgroup_cidfile string
-		wait           int64
-		poll           int64
-	)
-
-	flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
-	flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
-	flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
-	flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
-	flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
-
-	flag.Parse()
-
-	if cgroup_root == "" {
-		statLog.Fatal("error: must provide -cgroup-root")
-	}
-
-	finish_chan := make(chan bool)
-	defer close(finish_chan)
-
-	var cmd *exec.Cmd
-
-	if len(flag.Args()) > 0 {
-		// Set up subprocess
-		cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
-
-		childLog.Println("Running", flag.Args())
-
-		// Child process will use our stdin and stdout pipes
-		// (we close our copies below)
-		cmd.Stdin = os.Stdin
-		cmd.Stdout = os.Stdout
-
-		// Forward SIGINT and SIGTERM to inner process
-		sigChan := make(chan os.Signal, 1)
-		go func(sig <-chan os.Signal) {
-			catch := <-sig
-			if cmd.Process != nil {
-				cmd.Process.Signal(catch)
-			}
-			statLog.Println("notice: caught signal:", catch)
-		}(sigChan)
-		signal.Notify(sigChan, syscall.SIGTERM)
-		signal.Notify(sigChan, syscall.SIGINT)
-
-		// Funnel stderr through our channel
-		stderr_pipe, err := cmd.StderrPipe()
-		if err != nil {
-			statLog.Fatalln("error in StderrPipe:", err)
-		}
-		go CopyPipeToChildLog(stderr_pipe, finish_chan)
-
-		// Run subprocess
-		if err := cmd.Start(); err != nil {
-			statLog.Fatalln("error in cmd.Start:", err)
-		}
-
-		// Close stdin/stdout in this (parent) process
-		os.Stdin.Close()
-		os.Stdout.Close()
-	}
-
-	// Read the cid file
-	var container_id string
-	if cgroup_cidfile != "" {
-		// wait up to 'wait' seconds for the cid file to appear
-		ok := false
-		var i time.Duration
-		for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
-			cid, err := ioutil.ReadFile(cgroup_cidfile)
-			if err == nil && len(cid) > 0 {
-				ok = true
-				container_id = string(cid)
-				break
-			}
-			time.Sleep(100 * time.Millisecond)
+		line, isPrefix, err := reader.ReadLine()
+		if err == io.EOF {
+			break
+		} else if err != nil {
+			logger.Fatal("error reading child stderr:", err)
 		}
-		if !ok {
-			statLog.Println("error reading cid file:", cgroup_cidfile)
+		var suffix string
+		if isPrefix {
+			suffix = "[...]"
 		}
-	}
-
-	stop_poll_chan := make(chan bool, 1)
-	cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
-	go PollCgroupStats(cgroup, poll, stop_poll_chan)
-
-	// When the child exits, tell the polling goroutine to stop.
-	defer func() { stop_poll_chan <- true }()
-
-	// Wait for CopyPipeToChan to consume child's stderr pipe
-	<-finish_chan
-
-	return cmd.Wait()
-}
-
-func main() {
-	logger := log.New(os.Stderr, "crunchstat: ", 0)
-	if err := run(logger); err != nil {
-		if exiterr, ok := err.(*exec.ExitError); ok {
-			// The program has exited with an exit code != 0
-
-			// This works on both Unix and
-			// Windows. Although package syscall is
-			// generally platform dependent, WaitStatus is
-			// defined for both Unix and Windows and in
-			// both cases has an ExitStatus() method with
-			// the same signature.
-			if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
-				os.Exit(status.ExitStatus())
-			}
+		logger.Print(prefix, string(line), suffix)
+		// Set up prefix for following line
+		if isPrefix {
+			prefix = "[...]"
 		} else {
-			statLog.Fatalln("error in cmd.Wait:", err)
+			prefix = ""
 		}
 	}
+	in.Close()
 }
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index 69f31af..63967d5 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -6,56 +6,21 @@ import (
 	"io"
 	"log"
 	"math/rand"
-	"os"
-	"regexp"
 	"testing"
 	"time"
 )
 
-func TestReadAllOrWarnFail(t *testing.T) {
-	rcv := captureLogs()
-	defer uncaptureLogs()
-	go func() {
-		// The special file /proc/self/mem can be opened for
-		// reading, but reading from byte 0 returns an error.
-		f, err := os.Open("/proc/self/mem")
-		if err != nil {
-			t.Fatalf("Opening /proc/self/mem: %s", err)
-		}
-		if x, err := ReadAllOrWarn(f); err == nil {
-			t.Fatalf("Expected error, got %v", x)
-		}
-	}()
-	if msg, err := rcv.ReadBytes('\n'); err != nil {
-		t.Fatal(err)
-	} else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
-		t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
-	}
-}
-
-func TestReadAllOrWarnSuccess(t *testing.T) {
-	f, err := os.Open("./crunchstat_test.go")
-	if err != nil {
-		t.Fatalf("Opening ./crunchstat_test.go: %s", err)
-	}
-	data, err := ReadAllOrWarn(f)
-	if err != nil {
-		t.Fatalf("got error %s", err)
-	}
-	if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
-		t.Fatalf("data failed regexp: %s", err)
-	}
-}
-
 // Test that CopyPipeToChildLog works even on lines longer than
 // bufio.MaxScanTokenSize.
 func TestCopyPipeToChildLogLongLines(t *testing.T) {
-	rcv := captureLogs()
-	defer uncaptureLogs()
+	logger, logBuf := bufLogger()
 
-	control := make(chan bool)
 	pipeIn, pipeOut := io.Pipe()
-	go CopyPipeToChildLog(pipeIn, control)
+	copied := make(chan bool)
+	go func() {
+		copyPipeToChildLog(pipeIn, logger)
+		close(copied)
+	}()
 
 	sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
 	go func() {
@@ -72,14 +37,14 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
 		pipeOut.Close()
 	}()
 
-	if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+	if before, err := logBuf.ReadBytes('\n'); err != nil || string(before) != "before\n" {
 		t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
 	}
 
 	var receivedBytes []byte
 	done := false
 	for !done {
-		line, err := rcv.ReadBytes('\n')
+		line, err := logBuf.ReadBytes('\n')
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -100,27 +65,20 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
 		t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
 	}
 
-	if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+	if after, err := logBuf.ReadBytes('\n'); err != nil || string(after) != "after\n" {
 		t.Fatalf("\"after\n\" not received (got \"%s\", %s)", after, err)
 	}
 
 	select {
 	case <-time.After(time.Second):
 		t.Fatal("Timeout")
-	case <-control:
+	case <-copied:
 		// Done.
 	}
 }
 
-func captureLogs() *bufio.Reader {
-	// Send childLog to our bufio reader instead of stderr
-	stderrIn, stderrOut := io.Pipe()
-	childLog = log.New(stderrOut, "", 0)
-	statLog = log.New(stderrOut, "crunchstat: ", 0)
-	return bufio.NewReader(stderrIn)
-}
-
-func uncaptureLogs() {
-	childLog = log.New(os.Stderr, "", 0)
-	statLog = log.New(os.Stderr, "crunchstat: ", 0)
+func bufLogger() (*log.Logger, *bufio.Reader) {
+	r, w := io.Pipe()
+	logger := log.New(w, "", 0)
+	return logger, bufio.NewReader(r)
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list