[ARVADOS] updated: c9e64770f294108a4f1c82bd892d740573e21a50
Git user
git at public.curoverse.com
Thu Jun 30 17:42:27 EDT 2016
Summary of changes:
build/run-build-packages.sh | 2 +-
lib/crunchstat/crunchstat.go | 14 ++---
sdk/cwl/setup.py | 2 +-
services/crunch-run/crunchrun.go | 35 +++++++++++
services/crunch-run/crunchrun_test.go | 110 ++++++++++++++++++++--------------
5 files changed, 110 insertions(+), 53 deletions(-)
discards b5bbdf7032e8c2e31a6afeae00e388d7216db63d (commit)
discards fc8699fde83da5983d98d41a38533b33aec94382 (commit)
discards d541c980b22b7414147317f7a05aa67170425a9c (commit)
discards 1467ece28d0f09b59296dcd9a2ce848b3a505c20 (commit)
discards 6d909db25783bd19a152f25e47262de70cd8719f (commit)
discards 096c81d3b4d364705b8ef7489b634bcee05d8b1d (commit)
via c9e64770f294108a4f1c82bd892d740573e21a50 (commit)
via 29d5ca4976ef491f04ed88a286656b2a94453c06 (commit)
via 8e39b792a728d460d9ed439f664d54aac5432168 (commit)
via 0d6519e0eaf803e2bba346a7f1c4cf94e1ea0829 (commit)
via f14646c95f3852d958e2fcaa053ecf71e341d033 (commit)
via 1ebc07e853ac6fb44b5a8966d7381e771dd68898 (commit)
via 37f07a3f9f202f66a88eb70dcd0002713889a9b0 (commit)
via db4eb863a36acbacde64f7d356065b3b9cbfb342 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (b5bbdf7032e8c2e31a6afeae00e388d7216db63d)
\
N -- N -- N (c9e64770f294108a4f1c82bd892d740573e21a50)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c9e64770f294108a4f1c82bd892d740573e21a50
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 30 17:42:03 2016 -0400
8016: Report container resource usage in "crunchstat" log.
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 7da1beb..b064c42 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/lib/crunchstat"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -91,6 +92,12 @@ type ContainerRunner struct {
SigChan chan os.Signal
ArvMountExit chan error
finalState string
+
+ statLogger io.WriteCloser
+ statReporter *crunchstat.Reporter
+ statInterval time.Duration
+ cgroupRoot string
+ cgroupParent string
}
// SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -366,6 +373,14 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
}
+ if runner.statReporter != nil {
+ runner.statReporter.Stop()
+ closeerr = runner.statLogger.Close()
+ if closeerr != nil {
+ runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
+ }
+ }
+
runner.loggingDone <- true
close(runner.loggingDone)
return
@@ -373,6 +388,18 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
}
}
+func (runner *ContainerRunner) StartCrunchstat() {
+ runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+ runner.statReporter = &crunchstat.Reporter{
+ CID: runner.ContainerID,
+ Logger: log.New(runner.statLogger, "", 0),
+ CgroupParent: runner.cgroupParent,
+ CgroupRoot: runner.cgroupRoot,
+ Poll: runner.statInterval,
+ }
+ runner.statReporter.Start()
+}
+
// AttachLogs connects the docker container stdout and stderr logs to the
// Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
@@ -752,6 +779,8 @@ func (runner *ContainerRunner) Run() (err error) {
return
}
+ runner.StartCrunchstat()
+
if runner.IsCancelled() {
return
}
@@ -792,6 +821,9 @@ func NewContainerRunner(api IArvadosClient,
}
func main() {
+ statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "resource usage statistics reporting period")
+ cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+ cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup")
flag.Parse()
containerId := flag.Arg(0)
@@ -816,6 +848,9 @@ func main() {
}
cr := NewContainerRunner(api, kc, docker, containerId)
+ cr.statInterval = *statInterval
+ cr.cgroupRoot = *cgroupRoot
+ cr.cgroupParent = *cgroupParent
err = cr.Run()
if err != nil {
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 9880230..d95ff08 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -14,7 +14,6 @@ import (
. "gopkg.in/check.v1"
"io"
"io/ioutil"
- "log"
"os"
"os/exec"
"sort"
@@ -139,7 +138,7 @@ func (client *ArvTestClient) Create(resourceType string,
client.Mutex.Lock()
defer client.Mutex.Unlock()
- client.Calls += 1
+ client.Calls++
client.Content = append(client.Content, parameters)
if resourceType == "logs" {
@@ -192,7 +191,7 @@ func (client *ArvTestClient) Get(resourceType string, uuid string, parameters ar
func (client *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
client.Mutex.Lock()
defer client.Mutex.Unlock()
- client.Calls += 1
+ client.Calls++
client.Content = append(client.Content, parameters)
if resourceType == "containers" {
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
@@ -206,7 +205,7 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
// parameters match jpath/string. E.g., CalledWith(c, "foo.bar",
// "baz") returns parameters with parameters["foo"]["bar"]=="baz". If
// no call matches, it returns nil.
-func (client *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
+func (client *ArvTestClient) CalledWith(jpath string, expect interface{}) arvadosclient.Dict {
call:
for _, content := range client.Content {
var v interface{} = content
@@ -217,7 +216,7 @@ call:
v = dict[k]
}
}
- if v, ok := v.(string); ok && v == expect {
+ if v == expect {
return content
}
}
@@ -518,6 +517,7 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
@@ -553,14 +553,45 @@ func (s *TestSuite) TestFullRunHello(c *C) {
t.finish <- dockerclient.WaitResult{}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
}
+func (s *TestSuite) TestCrunchstat(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["sleep", "1"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`, func(t *TestDockerClient) {
+ time.Sleep(time.Second)
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+
+ // We didn't actually start a container, so crunchstat didn't
+ // find accounting files and therefore didn't log any stats.
+ // It should have logged a "can't find accounting files"
+ // message after one poll interval, though, so we can confirm
+ // it's alive:
+ c.Assert(api.Logs["crunchstat"], NotNil)
+ c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`)
+
+ // The "files never appeared" log assures us that we called
+ // (*crunchstat.Reporter)Stop(), and that we set it up with
+ // the correct container ID "abcde":
+ c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for abcde\n`)
+}
+
func (s *TestSuite) TestFullRunStderr(c *C) {
api, _ := FullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
@@ -578,10 +609,10 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
t.finish <- dockerclient.WaitResult{ExitCode: 1}
})
- c.Assert(api.Calls, Equals, 8)
- c.Check(api.Content[7]["container"].(arvadosclient.Dict)["log"], NotNil)
- c.Check(api.Content[7]["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
- c.Check(api.Content[7]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+ final := api.CalledWith("container.state", "Complete")
+ c.Assert(final, NotNil)
+ c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+ c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
@@ -603,12 +634,9 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
- log.Print(api.Logs["stdout"].String())
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Log(api.Logs["stdout"])
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
}
@@ -628,10 +656,8 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
}
@@ -682,9 +708,8 @@ func (s *TestSuite) TestCancel(c *C) {
}
}
- c.Assert(api.Calls, Equals, 6)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["log"], IsNil)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
+ c.Check(api.CalledWith("container.log", nil), NotNil)
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
}
@@ -705,10 +730,8 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
}
@@ -787,16 +810,16 @@ func (s *TestSuite) TestSetupMounts(c *C) {
}
func (s *TestSuite) TestStdout(c *C) {
- helperRecord := `{`
- helperRecord += `"command": ["/bin/sh", "-c", "echo $FROBIZ"],`
- helperRecord += `"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",`
- helperRecord += `"cwd": "/bin",`
- helperRecord += `"environment": {"FROBIZ": "bilbo"},`
- helperRecord += `"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },`
- helperRecord += `"output_path": "/tmp",`
- helperRecord += `"priority": 1,`
- helperRecord += `"runtime_constraints": {}`
- helperRecord += `}`
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
api, _ := FullRunHelper(c, helperRecord, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
@@ -804,10 +827,9 @@ func (s *TestSuite) TestStdout(c *C) {
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Assert(api.Calls, Equals, 6)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
- c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), Not(IsNil))
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
}
// Used by the TestStdoutWithWrongPath*()
commit 29d5ca4976ef491f04ed88a286656b2a94453c06
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 30 11:05:37 2016 -0400
8016: Fix some error checking in ThrottledLogger.
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 9b19bf0..db9d101 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -46,8 +46,9 @@ func RFC3339Timestamp(t time.Time) string {
return t.Format(RFC3339NanoFixed)
}
-// Write to the internal buffer. Prepend a timestamp to each line of the input
-// data.
+// Write prepends a timestamp to each line of the input data and
+// appends to the internal buffer. Each line is also logged to
+// tl.Immediate, if tl.Immediate is not nil.
func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
tl.Mutex.Lock()
if tl.buf == nil {
@@ -57,13 +58,20 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
now := tl.Timestamper(time.Now().UTC())
sc := bufio.NewScanner(bytes.NewBuffer(p))
- for sc.Scan() {
- _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
+ for err == nil && sc.Scan() {
+ out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
if tl.Immediate != nil {
- tl.Immediate.Printf("%s %s\n", now, sc.Text())
+ tl.Immediate.Print(out[:len(out)-1])
}
+ _, err = io.WriteString(tl.buf, out)
}
- return len(p), err
+ if err == nil {
+ err = sc.Err()
+ if err == nil {
+ n = len(p)
+ }
+ }
+ return
}
// Periodically check the current buffer; if not empty, send it on the
commit 8e39b792a728d460d9ed439f664d54aac5432168
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 30 10:41:07 2016 -0400
8016: Fix comment grammar and weird variable name.
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 8d6cce8..9b19bf0 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -157,17 +157,18 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
// (b) batches log messages and only calls the underlying Writer at most once
// per second.
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
- alw := &ThrottledLogger{}
- alw.flusherDone = make(chan bool)
- alw.writer = writer
- alw.Logger = log.New(alw, "", 0)
- alw.Timestamper = RFC3339Timestamp
- go alw.flusher()
- return alw
+ tl := &ThrottledLogger{}
+ tl.flusherDone = make(chan bool)
+ tl.writer = writer
+ tl.Logger = log.New(tl, "", 0)
+ tl.Timestamper = RFC3339Timestamp
+ go tl.flusher()
+ return tl
}
-// ArvLogWriter implements a writer that writes to each of a WriteCloser
-// (typically CollectionFileWriter) and creates an API server log entry.
+// ArvLogWriter is an io.WriteCloser that processes each write by
+// writing it through to another io.WriteCloser (typically a
+// CollectionFileWriter) and creating an Arvados log entry.
type ArvLogWriter struct {
ArvClient IArvadosClient
UUID string
commit 0d6519e0eaf803e2bba346a7f1c4cf94e1ea0829
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 30 10:26:07 2016 -0400
8016: Fix timestamp format (add missing nanoseconds), and use it in tests.
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 20928db..8d6cce8 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -38,13 +38,12 @@ type ThrottledLogger struct {
Immediate *log.Logger
}
-// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
-// because the RFC3339Nano format isn't fixed width.
-const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
+// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
-func RFC3339Timestamp(now time.Time) string {
- return now.Format(RFC3339Fixed)
+// RFC3339Timestamp formats t as RFC3339NanoFixed.
+func RFC3339Timestamp(t time.Time) string {
+ return t.Format(RFC3339NanoFixed)
}
// Write to the internal buffer. Prepend a timestamp to each line of the input
diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index bb3123a..ceb8ca8 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -16,7 +16,11 @@ type TestTimestamper struct {
func (this *TestTimestamper) Timestamp(t time.Time) string {
this.count += 1
- return fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count)
+ t, err := time.ParseInLocation(time.RFC3339Nano, fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count), t.Location())
+ if err != nil {
+ panic(err)
+ }
+ return RFC3339Timestamp(t)
}
// Gocheck boilerplate
commit f14646c95f3852d958e2fcaa053ecf71e341d033
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 30 09:48:44 2016 -0400
8016: Fix up docs and trim API.
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 3ba3ce6..0a6d3bc 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -1,3 +1,5 @@
+// Package crunchstat reports resource usage (CPU, memory, disk,
+// network) for a cgroup.
package crunchstat
import (
@@ -14,7 +16,7 @@ import (
"time"
)
-// This magically allows us to look up user_hz via _SC_CLK_TCK:
+// This magically allows us to look up userHz via _SC_CLK_TCK:
/*
#include <unistd.h>
@@ -28,42 +30,53 @@ import "C"
// log.Logger.
type Reporter struct {
// CID of the container to monitor. If empty, read the CID
- // from CIDFile.
+ // from CIDFile (first waiting until a non-empty file appears
+ // at CIDFile). If CIDFile is also empty, report host
+ // statistics.
CID string
- // Where cgroup special files live on this system
+
+ // Path to a file we can read CID from.
+ CIDFile string
+
+ // Where cgroup accounting files live on this system, e.g.,
+ // "/sys/fs/cgroup".
CgroupRoot string
+
+ // Parent cgroup, e.g., "docker".
CgroupParent string
- // Path to a file we can read CID from. If CIDFile is empty or
- // nonexistent, wait for it to appear.
- CIDFile string
- // Interval between samples
+ // Interval between samples. Must be positive.
Poll time.Duration
- // Where to write statistics.
+ // Where to write statistics. Must not be nil.
Logger *log.Logger
reportedStatFile map[string]string
- lastNetSample map[string]IoSample
- lastDiskSample map[string]IoSample
- lastCPUSample CpuSample
+ lastNetSample map[string]ioSample
+ lastDiskSample map[string]ioSample
+ lastCPUSample cpuSample
done chan struct{}
}
-// Wait (if necessary) for the CID to appear in CIDFile, then start
-// reporting statistics.
+// Start starts monitoring in a new goroutine, and returns
+// immediately.
+//
+// The monitoring goroutine waits for a non-empty CIDFile to appear
+// (unless CID is non-empty). Then it waits for the accounting files
+// to appear for the monitored container. Then it collects and reports
+// statistics until Stop is called.
//
-// Start should not be called more than once on a Reporter.
+// Callers should not call Start more than once.
//
-// Public data fields should not be changed after calling Start.
+// Callers should not modify public data fields after calling Start.
func (r *Reporter) Start() {
r.done = make(chan struct{})
go r.run()
}
-// Stop reporting statistics. Do not call more than once, or before
-// calling Start.
+// Stop reporting. Do not call more than once, or before calling
+// Start.
//
// Nothing will be logged after Stop returns.
func (r *Reporter) Stop() {
@@ -157,13 +170,13 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) {
return nil, errors.New("Could not read stats for any proc in container")
}
-type IoSample struct {
+type ioSample struct {
sampleTime time.Time
txBytes int64
rxBytes int64
}
-func (r *Reporter) DoBlkIoStats() {
+func (r *Reporter) doBlkIOStats() {
c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
if err != nil {
return
@@ -171,17 +184,17 @@ func (r *Reporter) DoBlkIoStats() {
defer c.Close()
b := bufio.NewScanner(c)
var sampleTime = time.Now()
- newSamples := make(map[string]IoSample)
+ newSamples := make(map[string]ioSample)
for b.Scan() {
var device, op string
var val int64
if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
continue
}
- var thisSample IoSample
+ var thisSample ioSample
var ok bool
if thisSample, ok = newSamples[device]; !ok {
- thisSample = IoSample{sampleTime, -1, -1}
+ thisSample = ioSample{sampleTime, -1, -1}
}
switch op {
case "Read":
@@ -207,19 +220,19 @@ func (r *Reporter) DoBlkIoStats() {
}
}
-type MemSample struct {
+type memSample struct {
sampleTime time.Time
memStat map[string]int64
}
-func (r *Reporter) DoMemoryStats() {
+func (r *Reporter) doMemoryStats() {
c, err := r.openStatFile("memory", "memory.stat", true)
if err != nil {
return
}
defer c.Close()
b := bufio.NewScanner(c)
- thisSample := MemSample{time.Now(), make(map[string]int64)}
+ thisSample := memSample{time.Now(), make(map[string]int64)}
wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
for b.Scan() {
var stat string
@@ -238,7 +251,7 @@ func (r *Reporter) DoMemoryStats() {
r.Logger.Printf("mem%s\n", outstat.String())
}
-func (r *Reporter) DoNetworkStats() {
+func (r *Reporter) doNetworkStats() {
sampleTime := time.Now()
stats, err := r.getContainerNetStats()
if err != nil {
@@ -265,7 +278,7 @@ func (r *Reporter) DoNetworkStats() {
if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
continue
}
- nextSample := IoSample{}
+ nextSample := ioSample{}
nextSample.sampleTime = sampleTime
nextSample.txBytes = tx
nextSample.rxBytes = rx
@@ -282,7 +295,7 @@ func (r *Reporter) DoNetworkStats() {
}
}
-type CpuSample struct {
+type cpuSample struct {
hasData bool // to distinguish the zero value from real data
sampleTime time.Time
user float64
@@ -292,7 +305,7 @@ type CpuSample struct {
// Return the number of CPUs available in the container. Return 0 if
// we can't figure out the real number of CPUs.
-func (r *Reporter) GetCpuCount() int64 {
+func (r *Reporter) getCPUCount() int64 {
cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
if err != nil {
return 0
@@ -307,13 +320,13 @@ func (r *Reporter) GetCpuCount() int64 {
if n == 2 {
cpus += (max - min) + 1
} else {
- cpus += 1
+ cpus++
}
}
return cpus
}
-func (r *Reporter) DoCpuStats() {
+func (r *Reporter) doCPUStats() {
statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
if err != nil {
return
@@ -324,12 +337,12 @@ func (r *Reporter) DoCpuStats() {
return
}
- nextSample := CpuSample{true, time.Now(), 0, 0, r.GetCpuCount()}
+ nextSample := cpuSample{true, time.Now(), 0, 0, r.getCPUCount()}
var userTicks, sysTicks int64
fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
- user_hz := float64(C.sysconf(C._SC_CLK_TCK))
- nextSample.user = float64(userTicks) / user_hz
- nextSample.sys = float64(sysTicks) / user_hz
+ userHz := float64(C.sysconf(C._SC_CLK_TCK))
+ nextSample.user = float64(userTicks) / userHz
+ nextSample.sys = float64(sysTicks) / userHz
delta := ""
if r.lastCPUSample.hasData {
@@ -352,15 +365,15 @@ func (r *Reporter) run() {
return
}
- r.lastNetSample = make(map[string]IoSample)
- r.lastDiskSample = make(map[string]IoSample)
+ r.lastNetSample = make(map[string]ioSample)
+ r.lastDiskSample = make(map[string]ioSample)
ticker := time.NewTicker(r.Poll)
for {
- r.DoMemoryStats()
- r.DoCpuStats()
- r.DoBlkIoStats()
- r.DoNetworkStats()
+ r.doMemoryStats()
+ r.doCPUStats()
+ r.doBlkIOStats()
+ r.doNetworkStats()
select {
case <-r.done:
return
@@ -372,7 +385,7 @@ func (r *Reporter) run() {
// If CID is empty, wait for it to appear in CIDFile. Return true if
// we get it before r.done indicates someone called Stop.
func (r *Reporter) waitForCIDFile() bool {
- if r.CID != "" {
+ if r.CID != "" || r.CIDFile == "" {
return true
}
commit 1ebc07e853ac6fb44b5a8966d7381e771dd68898
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Jun 30 09:11:49 2016 -0400
8016: Reduce logging noise by waiting for cgroup files to appear before polling.
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 8d7621c..3ba3ce6 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -70,7 +70,7 @@ func (r *Reporter) Stop() {
close(r.done)
}
-func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
+func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
r.Logger.Print(err)
@@ -79,14 +79,20 @@ func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
}
// Open the cgroup stats file in /sys/fs corresponding to the target
-// cgroup, and return an *os.File. If no stats file is available,
+// cgroup, and return an io.ReadCloser. If no stats file is available,
// return nil.
//
+// Log the file that was opened, if it isn't the same file opened on
+// the last openStatFile for this stat.
+//
+// Log "not available" if no file is found and either this stat has
+// been available in the past, or verbose==true.
+//
// TODO: Instead of trying all options, choose a process in the
// container, and read /proc/PID/cgroup to determine the appropriate
// cgroup root for the given statgroup. (This will avoid falling back
// to host-level stats during container setup and teardown.)
-func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error) {
+func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
var paths []string
if r.CID != "" {
// Collect container's stats
@@ -112,16 +118,16 @@ func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error)
path = ""
}
}
- if pathWas, ok := r.reportedStatFile[stat]; !ok || pathWas != path {
+ if pathWas := r.reportedStatFile[stat]; pathWas != path {
// Log whenever we start using a new/different cgroup
// stat file for a given statistic. This typically
// happens 1 to 3 times per statistic, depending on
// whether we happen to collect stats [a] before any
// processes have been created in the container and
// [b] after all contained processes have exited.
- if path == "" {
+ if path == "" && verbose {
r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
- } else if ok {
+ } else if pathWas != "" {
r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
} else {
r.Logger.Printf("notice: reading stats from %s\n", path)
@@ -132,7 +138,7 @@ func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error)
}
func (r *Reporter) getContainerNetStats() (io.Reader, error) {
- procsFile, err := r.openStatFile("cpuacct", "cgroup.procs")
+ procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
if err != nil {
return nil, err
}
@@ -158,7 +164,7 @@ type IoSample struct {
}
func (r *Reporter) DoBlkIoStats() {
- c, err := r.openStatFile("blkio", "blkio.io_service_bytes")
+ c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
if err != nil {
return
}
@@ -207,7 +213,7 @@ type MemSample struct {
}
func (r *Reporter) DoMemoryStats() {
- c, err := r.openStatFile("memory", "memory.stat")
+ c, err := r.openStatFile("memory", "memory.stat", true)
if err != nil {
return
}
@@ -287,7 +293,7 @@ type CpuSample struct {
// Return the number of CPUs available in the container. Return 0 if
// we can't figure out the real number of CPUs.
func (r *Reporter) GetCpuCount() int64 {
- cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus")
+ cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
if err != nil {
return 0
}
@@ -308,7 +314,7 @@ func (r *Reporter) GetCpuCount() int64 {
}
func (r *Reporter) DoCpuStats() {
- statFile, err := r.openStatFile("cpuacct", "cpuacct.stat")
+ statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
if err != nil {
return
}
@@ -337,13 +343,15 @@ func (r *Reporter) DoCpuStats() {
r.lastCPUSample = nextSample
}
-// Report stats periodically until someone closes or sends to r.done.
+// Report stats periodically until r.done indicates someone called
+// Stop.
func (r *Reporter) run() {
- if !r.waitForCIDFile() {
+ r.reportedStatFile = make(map[string]string)
+
+ if !r.waitForCIDFile() || !r.waitForCgroup() {
return
}
- r.reportedStatFile = make(map[string]string)
r.lastNetSample = make(map[string]IoSample)
r.lastDiskSample = make(map[string]IoSample)
@@ -362,7 +370,7 @@ func (r *Reporter) run() {
}
// If CID is empty, wait for it to appear in CIDFile. Return true if
-// we get it before someone calls Stop().
+// we get it before r.done indicates someone called Stop.
func (r *Reporter) waitForCIDFile() bool {
if r.CID != "" {
return true
@@ -384,3 +392,28 @@ func (r *Reporter) waitForCIDFile() bool {
}
}
}
+
+// Wait for the cgroup stats files to appear in cgroup_root. Return
+// true if they appear before r.done indicates someone called Stop. If
+// they don't appear within one poll interval, log a warning and keep
+// waiting.
+func (r *Reporter) waitForCgroup() bool {
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ warningTimer := time.After(r.Poll)
+ for {
+ c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
+ if err == nil {
+ c.Close()
+ return true
+ }
+ select {
+ case <-ticker.C:
+ case <-warningTimer:
+ r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.Poll)
+ case <-r.done:
+ r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+ return false
+ }
+ }
+}
commit 37f07a3f9f202f66a88eb70dcd0002713889a9b0
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Jun 29 20:13:09 2016 -0400
8016: Split crunchstat into a module and a commmand line tool.
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 30a80f5..2e8641a 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -714,6 +714,7 @@ gostuff=(
sdk/go/manifest
sdk/go/streamer
sdk/go/crunchrunner
+ lib/crunchstat
services/arv-git-httpd
services/crunchstat
services/keep-web
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
new file mode 100644
index 0000000..8d7621c
--- /dev/null
+++ b/lib/crunchstat/crunchstat.go
@@ -0,0 +1,386 @@
+package crunchstat
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// This magically allows us to look up user_hz via _SC_CLK_TCK:
+
+/*
+#include <unistd.h>
+#include <sys/types.h>
+#include <pwd.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// A Reporter gathers statistics for a cgroup and writes them to a
+// log.Logger.
+type Reporter struct {
+ // CID of the container to monitor. If empty, read the CID
+ // from CIDFile.
+ CID string
+ // Where cgroup special files live on this system
+ CgroupRoot string
+ CgroupParent string
+ // Path to a file we can read CID from. If CIDFile is empty or
+ // nonexistent, wait for it to appear.
+ CIDFile string
+
+ // Interval between samples
+ Poll time.Duration
+
+ // Where to write statistics.
+ Logger *log.Logger
+
+ reportedStatFile map[string]string
+ lastNetSample map[string]IoSample
+ lastDiskSample map[string]IoSample
+ lastCPUSample CpuSample
+
+ done chan struct{}
+}
+
+// Wait (if necessary) for the CID to appear in CIDFile, then start
+// reporting statistics.
+//
+// Start should not be called more than once on a Reporter.
+//
+// Public data fields should not be changed after calling Start.
+func (r *Reporter) Start() {
+ r.done = make(chan struct{})
+ go r.run()
+}
+
+// Stop reporting statistics. Do not call more than once, or before
+// calling Start.
+//
+// Nothing will be logged after Stop returns.
+func (r *Reporter) Stop() {
+ close(r.done)
+}
+
+func (r *Reporter) readAllOrWarn(in *os.File) ([]byte, error) {
+ content, err := ioutil.ReadAll(in)
+ if err != nil {
+ r.Logger.Print(err)
+ }
+ return content, err
+}
+
+// Open the cgroup stats file in /sys/fs corresponding to the target
+// cgroup, and return an *os.File. If no stats file is available,
+// return nil.
+//
+// TODO: Instead of trying all options, choose a process in the
+// container, and read /proc/PID/cgroup to determine the appropriate
+// cgroup root for the given statgroup. (This will avoid falling back
+// to host-level stats during container setup and teardown.)
+func (r *Reporter) openStatFile(statgroup string, stat string) (*os.File, error) {
+ var paths []string
+ if r.CID != "" {
+ // Collect container's stats
+ paths = []string{
+ fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
+ fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
+ }
+ } else {
+ // Collect this host's stats
+ paths = []string{
+ fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
+ fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
+ }
+ }
+ var path string
+ var file *os.File
+ var err error
+ for _, path = range paths {
+ file, err = os.Open(path)
+ if err == nil {
+ break
+ } else {
+ path = ""
+ }
+ }
+ if pathWas, ok := r.reportedStatFile[stat]; !ok || pathWas != path {
+ // Log whenever we start using a new/different cgroup
+ // stat file for a given statistic. This typically
+ // happens 1 to 3 times per statistic, depending on
+ // whether we happen to collect stats [a] before any
+ // processes have been created in the container and
+ // [b] after all contained processes have exited.
+ if path == "" {
+ r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
+ } else if ok {
+ r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
+ } else {
+ r.Logger.Printf("notice: reading stats from %s\n", path)
+ }
+ r.reportedStatFile[stat] = path
+ }
+ return file, err
+}
+
+func (r *Reporter) getContainerNetStats() (io.Reader, error) {
+ procsFile, err := r.openStatFile("cpuacct", "cgroup.procs")
+ if err != nil {
+ return nil, err
+ }
+ defer procsFile.Close()
+ reader := bufio.NewScanner(procsFile)
+ for reader.Scan() {
+ taskPid := reader.Text()
+ statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
+ stats, err := ioutil.ReadFile(statsFilename)
+ if err != nil {
+ r.Logger.Print(err)
+ continue
+ }
+ return strings.NewReader(string(stats)), nil
+ }
+ return nil, errors.New("Could not read stats for any proc in container")
+}
+
+type IoSample struct {
+ sampleTime time.Time
+ txBytes int64
+ rxBytes int64
+}
+
+func (r *Reporter) DoBlkIoStats() {
+ c, err := r.openStatFile("blkio", "blkio.io_service_bytes")
+ if err != nil {
+ return
+ }
+ defer c.Close()
+ b := bufio.NewScanner(c)
+ var sampleTime = time.Now()
+ newSamples := make(map[string]IoSample)
+ for b.Scan() {
+ var device, op string
+ var val int64
+ if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
+ continue
+ }
+ var thisSample IoSample
+ var ok bool
+ if thisSample, ok = newSamples[device]; !ok {
+ thisSample = IoSample{sampleTime, -1, -1}
+ }
+ switch op {
+ case "Read":
+ thisSample.rxBytes = val
+ case "Write":
+ thisSample.txBytes = val
+ }
+ newSamples[device] = thisSample
+ }
+ for dev, sample := range newSamples {
+ if sample.txBytes < 0 || sample.rxBytes < 0 {
+ continue
+ }
+ delta := ""
+ if prev, ok := r.lastDiskSample[dev]; ok {
+ delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
+ sample.sampleTime.Sub(prev.sampleTime).Seconds(),
+ sample.txBytes-prev.txBytes,
+ sample.rxBytes-prev.rxBytes)
+ }
+ r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
+ r.lastDiskSample[dev] = sample
+ }
+}
+
+type MemSample struct {
+ sampleTime time.Time
+ memStat map[string]int64
+}
+
+func (r *Reporter) DoMemoryStats() {
+ c, err := r.openStatFile("memory", "memory.stat")
+ if err != nil {
+ return
+ }
+ defer c.Close()
+ b := bufio.NewScanner(c)
+ thisSample := MemSample{time.Now(), make(map[string]int64)}
+ wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
+ for b.Scan() {
+ var stat string
+ var val int64
+ if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
+ continue
+ }
+ thisSample.memStat[stat] = val
+ }
+ var outstat bytes.Buffer
+ for _, key := range wantStats {
+ if val, ok := thisSample.memStat[key]; ok {
+ outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+ }
+ }
+ r.Logger.Printf("mem%s\n", outstat.String())
+}
+
+func (r *Reporter) DoNetworkStats() {
+ sampleTime := time.Now()
+ stats, err := r.getContainerNetStats()
+ if err != nil {
+ return
+ }
+
+ scanner := bufio.NewScanner(stats)
+ for scanner.Scan() {
+ var ifName string
+ var rx, tx int64
+ words := strings.Fields(scanner.Text())
+ if len(words) != 17 {
+ // Skip lines with wrong format
+ continue
+ }
+ ifName = strings.TrimRight(words[0], ":")
+ if ifName == "lo" || ifName == "" {
+ // Skip loopback interface and lines with wrong format
+ continue
+ }
+ if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
+ continue
+ }
+ if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
+ continue
+ }
+ nextSample := IoSample{}
+ nextSample.sampleTime = sampleTime
+ nextSample.txBytes = tx
+ nextSample.rxBytes = rx
+ var delta string
+ if prev, ok := r.lastNetSample[ifName]; ok {
+ interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
+ delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
+ interval,
+ tx-prev.txBytes,
+ rx-prev.rxBytes)
+ }
+ r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
+ r.lastNetSample[ifName] = nextSample
+ }
+}
+
+type CpuSample struct {
+ hasData bool // to distinguish the zero value from real data
+ sampleTime time.Time
+ user float64
+ sys float64
+ cpus int64
+}
+
+// Return the number of CPUs available in the container. Return 0 if
+// we can't figure out the real number of CPUs.
+func (r *Reporter) GetCpuCount() int64 {
+ cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus")
+ if err != nil {
+ return 0
+ }
+ defer cpusetFile.Close()
+ b, err := r.readAllOrWarn(cpusetFile)
+ sp := strings.Split(string(b), ",")
+ cpus := int64(0)
+ for _, v := range sp {
+ var min, max int64
+ n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
+ if n == 2 {
+ cpus += (max - min) + 1
+ } else {
+ cpus += 1
+ }
+ }
+ return cpus
+}
+
+func (r *Reporter) DoCpuStats() {
+ statFile, err := r.openStatFile("cpuacct", "cpuacct.stat")
+ if err != nil {
+ return
+ }
+ defer statFile.Close()
+ b, err := r.readAllOrWarn(statFile)
+ if err != nil {
+ return
+ }
+
+ nextSample := CpuSample{true, time.Now(), 0, 0, r.GetCpuCount()}
+ var userTicks, sysTicks int64
+ fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
+ user_hz := float64(C.sysconf(C._SC_CLK_TCK))
+ nextSample.user = float64(userTicks) / user_hz
+ nextSample.sys = float64(sysTicks) / user_hz
+
+ delta := ""
+ if r.lastCPUSample.hasData {
+ delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
+ nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
+ nextSample.user-r.lastCPUSample.user,
+ nextSample.sys-r.lastCPUSample.sys)
+ }
+ r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
+ nextSample.user, nextSample.sys, nextSample.cpus, delta)
+ r.lastCPUSample = nextSample
+}
+
+// Report stats periodically until someone closes or sends to r.done.
+func (r *Reporter) run() {
+ if !r.waitForCIDFile() {
+ return
+ }
+
+ r.reportedStatFile = make(map[string]string)
+ r.lastNetSample = make(map[string]IoSample)
+ r.lastDiskSample = make(map[string]IoSample)
+
+ ticker := time.NewTicker(r.Poll)
+ for {
+ r.DoMemoryStats()
+ r.DoCpuStats()
+ r.DoBlkIoStats()
+ r.DoNetworkStats()
+ select {
+ case <-r.done:
+ return
+ case <-ticker.C:
+ }
+ }
+}
+
+// If CID is empty, wait for it to appear in CIDFile. Return true if
+// we get it before someone calls Stop().
+func (r *Reporter) waitForCIDFile() bool {
+ if r.CID != "" {
+ return true
+ }
+
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ cid, err := ioutil.ReadFile(r.CIDFile)
+ if err == nil && len(cid) > 0 {
+ r.CID = string(cid)
+ return true
+ }
+ select {
+ case <-ticker.C:
+ case <-r.done:
+ r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+ return false
+ }
+ }
+}
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
new file mode 100644
index 0000000..864a3e3
--- /dev/null
+++ b/lib/crunchstat/crunchstat_test.go
@@ -0,0 +1,59 @@
+package crunchstat
+
+import (
+ "bufio"
+ "io"
+ "log"
+ "os"
+ "regexp"
+ "testing"
+)
+
+func bufLogger() (*log.Logger, *bufio.Reader) {
+ r, w := io.Pipe()
+ logger := log.New(w, "", 0)
+ return logger, bufio.NewReader(r)
+}
+
+func TestReadAllOrWarnFail(t *testing.T) {
+ logger, rcv := bufLogger()
+ rep := Reporter{Logger: logger}
+
+ var msg []byte
+ var err error
+ go func() {
+ msg, err = rcv.ReadBytes('\n')
+ }()
+ {
+ // The special file /proc/self/mem can be opened for
+ // reading, but reading from byte 0 returns an error.
+ f, err := os.Open("/proc/self/mem")
+ if err != nil {
+ t.Fatalf("Opening /proc/self/mem: %s", err)
+ }
+ if x, err := rep.readAllOrWarn(f); err == nil {
+ t.Fatalf("Expected error, got %v", x)
+ }
+ }
+ if err != nil {
+ t.Fatal(err)
+ } else if matched, err := regexp.MatchString("^read /proc/self/mem: .*", string(msg)); err != nil || !matched {
+ t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+ }
+}
+
+func TestReadAllOrWarnSuccess(t *testing.T) {
+ rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+
+ f, err := os.Open("./crunchstat_test.go")
+ if err != nil {
+ t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+ }
+ data, err := rep.readAllOrWarn(f)
+ if err != nil {
+ t.Fatalf("got error %s", err)
+ }
+ if matched, err := regexp.MatchString("^package crunchstat\n", string(data)); err != nil || !matched {
+ t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ }
+}
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 6bce325..8c05069 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -2,485 +2,122 @@ package main
import (
"bufio"
- "bytes"
- "errors"
"flag"
- "fmt"
"io"
- "io/ioutil"
"log"
"os"
"os/exec"
"os/signal"
- "strconv"
- "strings"
"syscall"
"time"
-)
-/*
-#include <unistd.h>
-#include <sys/types.h>
-#include <pwd.h>
-#include <stdlib.h>
-*/
-import "C"
+ "git.curoverse.com/arvados.git/lib/crunchstat"
+)
-// The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
+const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-type Cgroup struct {
- root string
- parent string
- cid string
-}
+func main() {
+ reporter := crunchstat.Reporter{
+ Logger: log.New(os.Stderr, "crunchstat: ", 0),
+ }
-var childLog = log.New(os.Stderr, "", 0)
-var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+ flag.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
+ flag.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
+ flag.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
+ pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
-const (
- MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-)
+ flag.Parse()
-func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
- reader := bufio.NewReaderSize(in, MaxLogLine)
- var prefix string
- for {
- line, isPrefix, err := reader.ReadLine()
- if err == io.EOF {
- break
- } else if err != nil {
- statLog.Fatal("error reading child stderr:", err)
- }
- var suffix string
- if isPrefix {
- suffix = "[...]"
- }
- childLog.Print(prefix, string(line), suffix)
- // Set up prefix for following line
- if isPrefix {
- prefix = "[...]"
- } else {
- prefix = ""
- }
+ if reporter.CgroupRoot == "" {
+ reporter.Logger.Fatal("error: must provide -cgroup-root")
}
- done <- true
- in.Close()
-}
+ reporter.Poll = time.Duration(*pollMsec) * time.Millisecond
-func ReadAllOrWarn(in *os.File) ([]byte, error) {
- content, err := ioutil.ReadAll(in)
- if err != nil {
- statLog.Printf("error reading %s: %s\n", in.Name(), err)
- }
- return content, err
-}
+ reporter.Start()
+ err := runCommand(flag.Args(), reporter.Logger)
+ reporter.Stop()
-var reportedStatFile = map[string]string{}
+ if err, ok := err.(*exec.ExitError); ok {
+ // The program has exited with an exit code != 0
-// Open the cgroup stats file in /sys/fs corresponding to the target
-// cgroup, and return an *os.File. If no stats file is available,
-// return nil.
-//
-// TODO: Instead of trying all options, choose a process in the
-// container, and read /proc/PID/cgroup to determine the appropriate
-// cgroup root for the given statgroup. (This will avoid falling back
-// to host-level stats during container setup and teardown.)
-func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
- var paths []string
- if cgroup.cid != "" {
- // Collect container's stats
- paths = []string{
- fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
- fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
- }
- } else {
- // Collect this host's stats
- paths = []string{
- fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
- fmt.Sprintf("%s/%s", cgroup.root, stat),
- }
- }
- var path string
- var file *os.File
- var err error
- for _, path = range paths {
- file, err = os.Open(path)
- if err == nil {
- break
+ // This works on both Unix and Windows. Although
+ // package syscall is generally platform dependent,
+ // WaitStatus is defined for both Unix and Windows and
+ // in both cases has an ExitStatus() method with the
+ // same signature.
+ if status, ok := err.Sys().(syscall.WaitStatus); ok {
+ os.Exit(status.ExitStatus())
} else {
- path = ""
+ reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
}
+ } else if err != nil {
+ reporter.Logger.Fatalln("error in cmd.Wait:", err)
}
- if pathWas, ok := reportedStatFile[stat]; !ok || pathWas != path {
- // Log whenever we start using a new/different cgroup
- // stat file for a given statistic. This typically
- // happens 1 to 3 times per statistic, depending on
- // whether we happen to collect stats [a] before any
- // processes have been created in the container and
- // [b] after all contained processes have exited.
- if path == "" {
- statLog.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
- } else if ok {
- statLog.Printf("notice: stats moved from %s to %s\n", reportedStatFile[stat], path)
- } else {
- statLog.Printf("notice: reading stats from %s\n", path)
- }
- reportedStatFile[stat] = path
- }
- return file, err
}
-func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
- procsFile, err := OpenStatFile(cgroup, "cpuacct", "cgroup.procs")
- if err != nil {
- return nil, err
- }
- defer procsFile.Close()
- reader := bufio.NewScanner(procsFile)
- for reader.Scan() {
- taskPid := reader.Text()
- statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
- stats, err := ioutil.ReadFile(statsFilename)
- if err != nil {
- statLog.Printf("error reading %s: %s\n", statsFilename, err)
- continue
- }
- return strings.NewReader(string(stats)), nil
- }
- return nil, errors.New("Could not read stats for any proc in container")
-}
+func runCommand(argv []string, logger *log.Logger) error {
+ cmd := exec.Command(argv[0], argv[1:]...)
-type IoSample struct {
- sampleTime time.Time
- txBytes int64
- rxBytes int64
-}
+ logger.Println("Running", argv)
-func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
- c, err := OpenStatFile(cgroup, "blkio", "blkio.io_service_bytes")
- if err != nil {
- return
- }
- defer c.Close()
- b := bufio.NewScanner(c)
- var sampleTime = time.Now()
- newSamples := make(map[string]IoSample)
- for b.Scan() {
- var device, op string
- var val int64
- if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
- continue
- }
- var thisSample IoSample
- var ok bool
- if thisSample, ok = newSamples[device]; !ok {
- thisSample = IoSample{sampleTime, -1, -1}
- }
- switch op {
- case "Read":
- thisSample.rxBytes = val
- case "Write":
- thisSample.txBytes = val
- }
- newSamples[device] = thisSample
- }
- for dev, sample := range newSamples {
- if sample.txBytes < 0 || sample.rxBytes < 0 {
- continue
- }
- delta := ""
- if prev, ok := lastSample[dev]; ok {
- delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
- sample.sampleTime.Sub(prev.sampleTime).Seconds(),
- sample.txBytes-prev.txBytes,
- sample.rxBytes-prev.rxBytes)
- }
- statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
- lastSample[dev] = sample
- }
-}
-
-type MemSample struct {
- sampleTime time.Time
- memStat map[string]int64
-}
+ // Child process will use our stdin and stdout pipes
+ // (we close our copies below)
+ cmd.Stdin = os.Stdin
+ cmd.Stdout = os.Stdout
-func DoMemoryStats(cgroup Cgroup) {
- c, err := OpenStatFile(cgroup, "memory", "memory.stat")
- if err != nil {
- return
- }
- defer c.Close()
- b := bufio.NewScanner(c)
- thisSample := MemSample{time.Now(), make(map[string]int64)}
- wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
- for b.Scan() {
- var stat string
- var val int64
- if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
- continue
- }
- thisSample.memStat[stat] = val
- }
- var outstat bytes.Buffer
- for _, key := range wantStats {
- if val, ok := thisSample.memStat[key]; ok {
- outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+ // Forward SIGINT and SIGTERM to child process
+ sigChan := make(chan os.Signal, 1)
+ go func(sig <-chan os.Signal) {
+ catch := <-sig
+ if cmd.Process != nil {
+ cmd.Process.Signal(catch)
}
- }
- statLog.Printf("mem%s\n", outstat.String())
-}
+ logger.Println("notice: caught signal:", catch)
+ }(sigChan)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
-func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
- sampleTime := time.Now()
- stats, err := GetContainerNetStats(cgroup)
+ // Funnel stderr through our channel
+ stderr_pipe, err := cmd.StderrPipe()
if err != nil {
- return
+ logger.Fatalln("error in StderrPipe:", err)
}
- scanner := bufio.NewScanner(stats)
- for scanner.Scan() {
- var ifName string
- var rx, tx int64
- words := strings.Fields(scanner.Text())
- if len(words) != 17 {
- // Skip lines with wrong format
- continue
- }
- ifName = strings.TrimRight(words[0], ":")
- if ifName == "lo" || ifName == "" {
- // Skip loopback interface and lines with wrong format
- continue
- }
- if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
- continue
- }
- if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
- continue
- }
- nextSample := IoSample{}
- nextSample.sampleTime = sampleTime
- nextSample.txBytes = tx
- nextSample.rxBytes = rx
- var delta string
- if prev, ok := lastSample[ifName]; ok {
- interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
- delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
- interval,
- tx-prev.txBytes,
- rx-prev.rxBytes)
- }
- statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
- lastSample[ifName] = nextSample
+ // Run subprocess
+ if err := cmd.Start(); err != nil {
+ logger.Fatalln("error in cmd.Start:", err)
}
-}
-type CpuSample struct {
- hasData bool // to distinguish the zero value from real data
- sampleTime time.Time
- user float64
- sys float64
- cpus int64
-}
+ // Close stdin/stdout in this (parent) process
+ os.Stdin.Close()
+ os.Stdout.Close()
-// Return the number of CPUs available in the container. Return 0 if
-// we can't figure out the real number of CPUs.
-func GetCpuCount(cgroup Cgroup) int64 {
- cpusetFile, err := OpenStatFile(cgroup, "cpuset", "cpuset.cpus")
- if err != nil {
- return 0
- }
- defer cpusetFile.Close()
- b, err := ReadAllOrWarn(cpusetFile)
- sp := strings.Split(string(b), ",")
- cpus := int64(0)
- for _, v := range sp {
- var min, max int64
- n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
- if n == 2 {
- cpus += (max - min) + 1
- } else {
- cpus += 1
- }
- }
- return cpus
-}
+ copyPipeToChildLog(stderr_pipe, log.New(os.Stderr, "", 0))
-func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
- statFile, err := OpenStatFile(cgroup, "cpuacct", "cpuacct.stat")
- if err != nil {
- return
- }
- defer statFile.Close()
- b, err := ReadAllOrWarn(statFile)
- if err != nil {
- return
- }
-
- nextSample := CpuSample{true, time.Now(), 0, 0, GetCpuCount(cgroup)}
- var userTicks, sysTicks int64
- fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
- user_hz := float64(C.sysconf(C._SC_CLK_TCK))
- nextSample.user = float64(userTicks) / user_hz
- nextSample.sys = float64(sysTicks) / user_hz
-
- delta := ""
- if lastSample.hasData {
- delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
- nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
- nextSample.user-lastSample.user,
- nextSample.sys-lastSample.sys)
- }
- statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
- nextSample.user, nextSample.sys, nextSample.cpus, delta)
- *lastSample = nextSample
+ return cmd.Wait()
}
-func PollCgroupStats(cgroup Cgroup, poll int64, stop_poll_chan <-chan bool) {
- var lastNetSample = map[string]IoSample{}
- var lastDiskSample = map[string]IoSample{}
- var lastCpuSample = CpuSample{}
-
- poll_chan := make(chan bool, 1)
- go func() {
- // Send periodic poll events.
- poll_chan <- true
- for {
- time.Sleep(time.Duration(poll) * time.Millisecond)
- poll_chan <- true
- }
- }()
+func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+ reader := bufio.NewReaderSize(in, MaxLogLine)
+ var prefix string
for {
- select {
- case <-stop_poll_chan:
- return
- case <-poll_chan:
- // Emit stats, then select again.
- }
- DoMemoryStats(cgroup)
- DoCpuStats(cgroup, &lastCpuSample)
- DoBlkIoStats(cgroup, lastDiskSample)
- DoNetworkStats(cgroup, lastNetSample)
- }
-}
-
-func run(logger *log.Logger) error {
-
- var (
- cgroup_root string
- cgroup_parent string
- cgroup_cidfile string
- wait int64
- poll int64
- )
-
- flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
- flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
- flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
- flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
- flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
-
- flag.Parse()
-
- if cgroup_root == "" {
- statLog.Fatal("error: must provide -cgroup-root")
- }
-
- finish_chan := make(chan bool)
- defer close(finish_chan)
-
- var cmd *exec.Cmd
-
- if len(flag.Args()) > 0 {
- // Set up subprocess
- cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
-
- childLog.Println("Running", flag.Args())
-
- // Child process will use our stdin and stdout pipes
- // (we close our copies below)
- cmd.Stdin = os.Stdin
- cmd.Stdout = os.Stdout
-
- // Forward SIGINT and SIGTERM to inner process
- sigChan := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- catch := <-sig
- if cmd.Process != nil {
- cmd.Process.Signal(catch)
- }
- statLog.Println("notice: caught signal:", catch)
- }(sigChan)
- signal.Notify(sigChan, syscall.SIGTERM)
- signal.Notify(sigChan, syscall.SIGINT)
-
- // Funnel stderr through our channel
- stderr_pipe, err := cmd.StderrPipe()
- if err != nil {
- statLog.Fatalln("error in StderrPipe:", err)
- }
- go CopyPipeToChildLog(stderr_pipe, finish_chan)
-
- // Run subprocess
- if err := cmd.Start(); err != nil {
- statLog.Fatalln("error in cmd.Start:", err)
- }
-
- // Close stdin/stdout in this (parent) process
- os.Stdin.Close()
- os.Stdout.Close()
- }
-
- // Read the cid file
- var container_id string
- if cgroup_cidfile != "" {
- // wait up to 'wait' seconds for the cid file to appear
- ok := false
- var i time.Duration
- for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
- cid, err := ioutil.ReadFile(cgroup_cidfile)
- if err == nil && len(cid) > 0 {
- ok = true
- container_id = string(cid)
- break
- }
- time.Sleep(100 * time.Millisecond)
+ line, isPrefix, err := reader.ReadLine()
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ logger.Fatal("error reading child stderr:", err)
}
- if !ok {
- statLog.Println("error reading cid file:", cgroup_cidfile)
+ var suffix string
+ if isPrefix {
+ suffix = "[...]"
}
- }
-
- stop_poll_chan := make(chan bool, 1)
- cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
- go PollCgroupStats(cgroup, poll, stop_poll_chan)
-
- // When the child exits, tell the polling goroutine to stop.
- defer func() { stop_poll_chan <- true }()
-
- // Wait for CopyPipeToChan to consume child's stderr pipe
- <-finish_chan
-
- return cmd.Wait()
-}
-
-func main() {
- logger := log.New(os.Stderr, "crunchstat: ", 0)
- if err := run(logger); err != nil {
- if exiterr, ok := err.(*exec.ExitError); ok {
- // The program has exited with an exit code != 0
-
- // This works on both Unix and
- // Windows. Although package syscall is
- // generally platform dependent, WaitStatus is
- // defined for both Unix and Windows and in
- // both cases has an ExitStatus() method with
- // the same signature.
- if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
- os.Exit(status.ExitStatus())
- }
+ logger.Print(prefix, string(line), suffix)
+ // Set up prefix for following line
+ if isPrefix {
+ prefix = "[...]"
} else {
- statLog.Fatalln("error in cmd.Wait:", err)
+ prefix = ""
}
}
+ in.Close()
}
diff --git a/services/crunchstat/crunchstat_test.go b/services/crunchstat/crunchstat_test.go
index 69f31af..63967d5 100644
--- a/services/crunchstat/crunchstat_test.go
+++ b/services/crunchstat/crunchstat_test.go
@@ -6,56 +6,21 @@ import (
"io"
"log"
"math/rand"
- "os"
- "regexp"
"testing"
"time"
)
-func TestReadAllOrWarnFail(t *testing.T) {
- rcv := captureLogs()
- defer uncaptureLogs()
- go func() {
- // The special file /proc/self/mem can be opened for
- // reading, but reading from byte 0 returns an error.
- f, err := os.Open("/proc/self/mem")
- if err != nil {
- t.Fatalf("Opening /proc/self/mem: %s", err)
- }
- if x, err := ReadAllOrWarn(f); err == nil {
- t.Fatalf("Expected error, got %v", x)
- }
- }()
- if msg, err := rcv.ReadBytes('\n'); err != nil {
- t.Fatal(err)
- } else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
- t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
- }
-}
-
-func TestReadAllOrWarnSuccess(t *testing.T) {
- f, err := os.Open("./crunchstat_test.go")
- if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
- }
- data, err := ReadAllOrWarn(f)
- if err != nil {
- t.Fatalf("got error %s", err)
- }
- if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: %s", err)
- }
-}
-
// Test that CopyPipeToChildLog works even on lines longer than
// bufio.MaxScanTokenSize.
func TestCopyPipeToChildLogLongLines(t *testing.T) {
- rcv := captureLogs()
- defer uncaptureLogs()
+ logger, logBuf := bufLogger()
- control := make(chan bool)
pipeIn, pipeOut := io.Pipe()
- go CopyPipeToChildLog(pipeIn, control)
+ copied := make(chan bool)
+ go func() {
+ copyPipeToChildLog(pipeIn, logger)
+ close(copied)
+ }()
sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
go func() {
@@ -72,14 +37,14 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
pipeOut.Close()
}()
- if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+ if before, err := logBuf.ReadBytes('\n'); err != nil || string(before) != "before\n" {
t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
}
var receivedBytes []byte
done := false
for !done {
- line, err := rcv.ReadBytes('\n')
+ line, err := logBuf.ReadBytes('\n')
if err != nil {
t.Fatal(err)
}
@@ -100,27 +65,20 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
}
- if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+ if after, err := logBuf.ReadBytes('\n'); err != nil || string(after) != "after\n" {
t.Fatalf("\"after\n\" not received (got \"%s\", %s)", after, err)
}
select {
case <-time.After(time.Second):
t.Fatal("Timeout")
- case <-control:
+ case <-copied:
// Done.
}
}
-func captureLogs() *bufio.Reader {
- // Send childLog to our bufio reader instead of stderr
- stderrIn, stderrOut := io.Pipe()
- childLog = log.New(stderrOut, "", 0)
- statLog = log.New(stderrOut, "crunchstat: ", 0)
- return bufio.NewReader(stderrIn)
-}
-
-func uncaptureLogs() {
- childLog = log.New(os.Stderr, "", 0)
- statLog = log.New(os.Stderr, "crunchstat: ", 0)
+func bufLogger() (*log.Logger, *bufio.Reader) {
+ r, w := io.Pipe()
+ logger := log.New(w, "", 0)
+ return logger, bufio.NewReader(r)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list