[arvados] created: 2.5.0-213-g711185abc

git repository hosting git at public.arvados.org
Wed Mar 1 20:36:45 UTC 2023


        at  711185abcd86bed2a86e77ca24786d99323168a5 (commit)


commit 711185abcd86bed2a86e77ca24786d99323168a5
Author: Brett Smith <brett.smith at curii.com>
Date:   Wed Mar 1 15:24:10 2023 -0500

    19986: Log max resource usage after a container finishes
    
    Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 3607cafaf..3708be0c2 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -734,6 +734,7 @@ func (runner *ContainerRunner) stopHoststat() error {
 		return nil
 	}
 	runner.hoststatReporter.Stop()
+	runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
 	err := runner.hoststatLogger.Close()
 	if err != nil {
 		return fmt.Errorf("error closing hoststat logs: %v", err)
@@ -1152,6 +1153,9 @@ func (runner *ContainerRunner) WaitFinish() error {
 
 	if runner.statReporter != nil {
 		runner.statReporter.Stop()
+		runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+			"rss": runner.Container.RuntimeConstraints.RAM,
+		})
 		err = runner.statLogger.Close()
 		if err != nil {
 			runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 46a71e592..701be4517 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -1025,6 +1025,33 @@ func (s *TestSuite) TestLogAllRSSThresholds(c *C) {
 	s.testLogRSSThresholds(c, 734003299, []int{90, 95, 99}, 0)
 }
 
+func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
+	s.runner.cgroupRoot = "testdata/fakestat"
+	s.runner.parentTemp = c.MkDir()
+	s.fullRunHelper(c, `{
+        "command": ["true"],
+        "container_image": "`+arvadostest.DockerImage112PDH+`",
+        "cwd": ".",
+        "environment": {},
+        "mounts": {"/tmp": {"kind": "tmp"} },
+        "output_path": "/tmp",
+        "priority": 1,
+        "runtime_constraints": {"ram": 7340032000},
+        "state": "Locked"
+    }`, nil, func() int { return 0 })
+	logs := s.api.Logs["crunch-run"].String()
+	for _, expected := range []string{
+		`Maximum disk usage was \d+%, \d+/\d+ bytes`,
+		`Maximum container memory cache usage was 73400320 bytes`,
+		`Maximum container memory swap usage was 320 bytes`,
+		`Maximum container memory pgmajfault usage was 20 faults`,
+		`Maximum container memory rss usage was 10%, 734003200/7340032000 bytes`,
+		`Maximum crunch-run memory rss usage was \d+ bytes`,
+	} {
+		c.Check(logs, Matches, logLineStart+expected)
+	}
+}
+
 func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
 	var collection_create, container_update arvadosclient.Dict
 	s.fullRunHelper(c, `{
diff --git a/lib/crunchrun/testdata/fakestat/cgroupid/memory.stat b/lib/crunchrun/testdata/fakestat/cgroupid/memory.stat
index fff3333c4..22f0e13fa 100644
--- a/lib/crunchrun/testdata/fakestat/cgroupid/memory.stat
+++ b/lib/crunchrun/testdata/fakestat/cgroupid/memory.stat
@@ -1 +1,5 @@
 rss 734003200
+pgmajfault 3200
+total_cache 73400320
+total_pgmajfault 20
+total_swap 320
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 952e3975c..4241f5fb5 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -23,6 +23,9 @@ import (
 	"time"
 )
 
+// crunchstat collects all memory statistics, but only reports these.
+var memoryStats = [...]string{"cache", "swap", "pgmajfault", "rss"}
+
 type logPrinter interface {
 	Printf(fmt string, args ...interface{})
 }
@@ -70,6 +73,8 @@ type Reporter struct {
 	lastCPUSample       cpuSample
 	lastDiskSpaceSample diskSpaceSample
 	lastMemSample       memSample
+	maxDiskSpaceSample  diskSpaceSample
+	maxMemSample        map[memoryKey]int64
 
 	reportPIDs   map[string]int
 	reportPIDsMu sync.Mutex
@@ -99,6 +104,15 @@ func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds
 	return
 }
 
+// memoryKey is a key into Reporter.maxMemSample.
+// Initialize it with just statName to get the host/cgroup maximum.
+// Initialize it with all fields to get that process' maximum.
+type memoryKey struct {
+	processID   int
+	processName string
+	statName    string
+}
+
 // Start starts monitoring in a new goroutine, and returns
 // immediately.
 //
@@ -130,12 +144,68 @@ func (r *Reporter) ReportPID(name string, pid int) {
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
-// Nothing will be logged after Stop returns.
+// Nothing will be logged after Stop returns unless you call a Log* method.
 func (r *Reporter) Stop() {
 	close(r.done)
 	<-r.flushed
 }
 
+func (r *Reporter) reportMemoryMax(logger logPrinter, source, statName string, value, limit int64) {
+	var units string
+	switch statName {
+	case "pgmajfault":
+		units = "faults"
+	default:
+		units = "bytes"
+	}
+	if limit > 0 {
+		percentage := 100 * value / limit
+		logger.Printf("Maximum %s memory %s usage was %d%%, %d/%d %s",
+			source, statName, percentage, value, limit, units)
+	} else {
+		logger.Printf("Maximum %s memory %s usage was %d %s",
+			source, statName, value, units)
+	}
+}
+
+func (r *Reporter) LogMaxima(logger logPrinter, memLimits map[string]int64) {
+	if r.lastCPUSample.hasData {
+		logger.Printf("Total CPU usage was %f user and %f sys on %d CPUs",
+			r.lastCPUSample.user, r.lastCPUSample.sys, r.lastCPUSample.cpus)
+	}
+	for disk, sample := range r.lastDiskIOSample {
+		logger.Printf("Total disk I/O on %s was %d bytes written and %d bytes read",
+			disk, sample.txBytes, sample.rxBytes)
+	}
+	if r.maxDiskSpaceSample.hasData {
+		percentage := 100 * r.maxDiskSpaceSample.used / r.maxDiskSpaceSample.total
+		logger.Printf("Maximum disk usage was %d%%, %d/%d bytes",
+			percentage, r.maxDiskSpaceSample.used, r.maxDiskSpaceSample.total)
+	}
+	for _, statName := range memoryStats {
+		value, ok := r.maxMemSample[memoryKey{statName: "total_" + statName}]
+		if !ok {
+			value, ok = r.maxMemSample[memoryKey{statName: statName}]
+		}
+		if ok {
+			r.reportMemoryMax(logger, "container", statName, value, memLimits[statName])
+		}
+	}
+	for ifname, sample := range r.lastNetSample {
+		logger.Printf("Total network I/O on %s was %d bytes written and %d bytes read",
+			ifname, sample.txBytes, sample.rxBytes)
+	}
+}
+
+func (r *Reporter) LogProcessMemMax(logger logPrinter) {
+	for memKey, value := range r.maxMemSample {
+		if memKey.processName == "" {
+			continue
+		}
+		r.reportMemoryMax(logger, memKey.processName, memKey.statName, value, 0)
+	}
+}
+
 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
 	content, err := ioutil.ReadAll(in)
 	if err != nil {
@@ -293,6 +363,10 @@ func (r *Reporter) getMemSample() {
 			continue
 		}
 		thisSample.memStat[stat] = val
+		maxKey := memoryKey{statName: stat}
+		if val > r.maxMemSample[maxKey] {
+			r.maxMemSample[maxKey] = val
+		}
 	}
 	r.lastMemSample = thisSample
 
@@ -325,8 +399,7 @@ func (r *Reporter) getMemSample() {
 
 func (r *Reporter) reportMemSample() {
 	var outstat bytes.Buffer
-	wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
-	for _, key := range wantStats {
+	for _, key := range memoryStats {
 		// Use "total_X" stats (entire hierarchy) if enabled,
 		// otherwise just the single cgroup -- see
 		// https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
@@ -399,7 +472,12 @@ func (r *Reporter) doProcmemStats() {
 		if err != nil {
 			continue
 		}
-		procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+		value := rss * r.kernelPageSize
+		procmem += fmt.Sprintf(" %d %s", value, procname)
+		maxKey := memoryKey{pid, procname, "rss"}
+		if value > r.maxMemSample[maxKey] {
+			r.maxMemSample[maxKey] = value
+		}
 	}
 	if procmem != "" {
 		r.Logger.Printf("procmem%s\n", procmem)
@@ -472,6 +550,9 @@ func (r *Reporter) doDiskSpaceStats() {
 		used:       (s.Blocks - s.Bfree) * bs,
 		available:  s.Bavail * bs,
 	}
+	if nextSample.used > r.maxDiskSpaceSample.used {
+		r.maxDiskSpaceSample = nextSample
+	}
 
 	var delta string
 	if r.lastDiskSpaceSample.hasData {
@@ -568,6 +649,7 @@ func (r *Reporter) doAllStats() {
 func (r *Reporter) run() {
 	defer close(r.flushed)
 
+	r.maxMemSample = make(map[memoryKey]int64)
 	r.reportedStatFile = make(map[string]string)
 
 	if !r.waitForCIDFile() || !r.waitForCgroup() {
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index d98fe1064..88de12f07 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -6,8 +6,10 @@ package crunchstat
 
 import (
 	"bytes"
+	"errors"
 	"fmt"
 	"os"
+	"path"
 	"regexp"
 	"strconv"
 	"testing"
@@ -45,8 +47,9 @@ var _ = Suite(&suite{
 })
 
 type suite struct {
-	logbuf bytes.Buffer
-	logger *logrus.Logger
+	cgroupRoot string
+	logbuf     bytes.Buffer
+	logger     *logrus.Logger
 }
 
 func (s *suite) SetUpSuite(c *C) {
@@ -54,9 +57,49 @@ func (s *suite) SetUpSuite(c *C) {
 }
 
 func (s *suite) SetUpTest(c *C) {
+	s.cgroupRoot = ""
 	s.logbuf.Reset()
 }
 
+func (s *suite) tempCgroup(c *C, sourceDir string) error {
+	tempDir := c.MkDir()
+	dirents, err := os.ReadDir(sourceDir)
+	if err != nil {
+		return err
+	}
+	for _, dirent := range dirents {
+		srcData, err := os.ReadFile(path.Join(sourceDir, dirent.Name()))
+		if err != nil {
+			return err
+		}
+		destPath := path.Join(tempDir, dirent.Name())
+		err = os.WriteFile(destPath, srcData, 0o600)
+		if err != nil {
+			return err
+		}
+	}
+	s.cgroupRoot = tempDir
+	return nil
+}
+
+func (s *suite) addPidToCgroup(pid int) error {
+	if s.cgroupRoot == "" {
+		return errors.New("cgroup has not been set up for this test")
+	}
+	procsPath := path.Join(s.cgroupRoot, "cgroup.procs")
+	procsFile, err := os.OpenFile(procsPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
+	if err != nil {
+		return err
+	}
+	pidLine := strconv.Itoa(pid) + "\n"
+	_, err = procsFile.Write([]byte(pidLine))
+	if err != nil {
+		procsFile.Close()
+		return err
+	}
+	return procsFile.Close()
+}
+
 func (s *suite) TestReadAllOrWarnFail(c *C) {
 	rep := Reporter{Logger: s.logger}
 
@@ -163,3 +206,56 @@ func (s *suite) TestMultipleRSSThresholdsSomePassed(c *C) {
 func (s *suite) TestMultipleRSSThresholdsAllPassed(c *C) {
 	s.testRSSThresholds(c, []int64{1, 2, 3}, 3)
 }
+
+func (s *suite) TestLogMaxima(c *C) {
+	err := s.tempCgroup(c, fakeRSS.cgroupRoot)
+	c.Assert(err, IsNil)
+	rep := Reporter{
+		CgroupRoot: s.cgroupRoot,
+		Logger:     s.logger,
+		PollPeriod: time.Second * 10,
+		TempDir:    s.cgroupRoot,
+	}
+	rep.Start()
+	rep.Stop()
+	rep.LogMaxima(s.logger, map[string]int64{"rss": GiB})
+	logs := s.logbuf.String()
+	c.Logf("%s", logs)
+
+	expectRSS := fmt.Sprintf(`Maximum container memory rss usage was %d%%, %d/%d bytes`,
+		100*fakeRSS.value/GiB, fakeRSS.value, GiB)
+	for _, expected := range []string{
+		`Maximum disk usage was \d+%, \d+/\d+ bytes`,
+		`Maximum container memory cache usage was 73400320 bytes`,
+		`Maximum container memory swap usage was 320 bytes`,
+		`Maximum container memory pgmajfault usage was 20 faults`,
+		expectRSS,
+	} {
+		pattern := logMsgPrefix + expected + `"`
+		c.Check(logs, Matches, pattern)
+	}
+}
+
+func (s *suite) TestLogProcessMemMax(c *C) {
+	err := s.tempCgroup(c, fakeRSS.cgroupRoot)
+	c.Assert(err, IsNil)
+	pid := os.Getpid()
+	err = s.addPidToCgroup(pid)
+	c.Assert(err, IsNil)
+
+	rep := Reporter{
+		CgroupRoot: s.cgroupRoot,
+		Logger:     s.logger,
+		PollPeriod: time.Second * 10,
+		TempDir:    s.cgroupRoot,
+	}
+	rep.ReportPID("test-run", pid)
+	rep.Start()
+	rep.Stop()
+	rep.LogProcessMemMax(s.logger)
+	logs := s.logbuf.String()
+	c.Logf("%s", logs)
+
+	pattern := logMsgPrefix + `Maximum test-run memory rss usage was \d+ bytes"`
+	c.Check(logs, Matches, pattern)
+}
diff --git a/lib/crunchstat/testdata/fakestat/memory.stat b/lib/crunchstat/testdata/fakestat/memory.stat
index 0540eea23..f1245211d 100644
--- a/lib/crunchstat/testdata/fakestat/memory.stat
+++ b/lib/crunchstat/testdata/fakestat/memory.stat
@@ -1,2 +1,6 @@
 rss 990
 total_rss 786432000
+pgmajfault 3200
+total_cache 73400320
+total_pgmajfault 20
+total_swap 320

commit 0559d0dfc8406eadfc1aa71465179eaea2754f12
Author: Brett Smith <brett.smith at curii.com>
Date:   Wed Mar 1 11:35:31 2023 -0500

    19986: Log when a container uses nearly max RAM
    
    This is meant to help users diagnose when their container likely failed
    of OOM.
    
    Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith at curii.com>

diff --git a/.licenseignore b/.licenseignore
index 6ddb5c009..3d24c4ee3 100644
--- a/.licenseignore
+++ b/.licenseignore
@@ -92,4 +92,5 @@ sdk/cwl/tests/wf/hello.txt
 sdk/cwl/tests/wf/indir1/hello2.txt
 sdk/cwl/tests/chipseq/data/Genomes/*
 CITATION.cff
-SECURITY.md
\ No newline at end of file
+SECURITY.md
+*/testdata/fakestat/*
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 79d7f3674..3607cafaf 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -764,12 +764,16 @@ func (runner *ContainerRunner) startCrunchstat() error {
 	}
 	runner.statLogger = NewThrottledLogger(w)
 	runner.statReporter = &crunchstat.Reporter{
-		CID:          runner.executor.CgroupID(),
-		Logger:       log.New(runner.statLogger, "", 0),
 		CgroupParent: runner.expectCgroupParent,
 		CgroupRoot:   runner.cgroupRoot,
-		PollPeriod:   runner.statInterval,
-		TempDir:      runner.parentTemp,
+		CID:          runner.executor.CgroupID(),
+		Logger:       log.New(runner.statLogger, "", 0),
+		MemThresholds: map[string][]crunchstat.Threshold{
+			"rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+		},
+		PollPeriod:      runner.statInterval,
+		TempDir:         runner.parentTemp,
+		ThresholdLogger: runner.CrunchLog,
 	}
 	runner.statReporter.Start()
 	return nil
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 4d127f19c..46a71e592 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -20,6 +20,7 @@ import (
 	"os/exec"
 	"regexp"
 	"runtime/pprof"
+	"strconv"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -43,6 +44,8 @@ func TestCrunchExec(t *testing.T) {
 	TestingT(t)
 }
 
+const logLineStart = `(?m)(.*\n)*\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z `
+
 var _ = Suite(&TestSuite{})
 
 type TestSuite struct {
@@ -985,6 +988,43 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
 	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
 }
 
+func (s *TestSuite) testLogRSSThresholds(c *C, ram int, expected []int, notExpected int) {
+	s.runner.cgroupRoot = "testdata/fakestat"
+	s.fullRunHelper(c, `{
+		"command": ["true"],
+		"container_image": "`+arvadostest.DockerImage112PDH+`",
+		"cwd": ".",
+		"environment": {},
+		"mounts": {"/tmp": {"kind": "tmp"} },
+		"output_path": "/tmp",
+		"priority": 1,
+		"runtime_constraints": {"ram": `+strconv.Itoa(ram)+`},
+		"state": "Locked"
+	}`, nil, func() int { return 0 })
+	logs := s.api.Logs["crunch-run"].String()
+	pattern := logLineStart + `Container using over %d%% of memory \(rss 734003200/%d bytes\)`
+	var threshold int
+	for _, threshold = range expected {
+		c.Check(logs, Matches, fmt.Sprintf(pattern, threshold, ram))
+	}
+	if notExpected > threshold {
+		c.Check(logs, Not(Matches), fmt.Sprintf(pattern, notExpected, ram))
+	}
+}
+
+func (s *TestSuite) TestLogNoRSSThresholds(c *C) {
+	s.testLogRSSThresholds(c, 7340032000, []int{}, 90)
+}
+
+func (s *TestSuite) TestLogSomeRSSThresholds(c *C) {
+	onePercentRSS := 7340032
+	s.testLogRSSThresholds(c, 102*onePercentRSS, []int{90, 95}, 99)
+}
+
+func (s *TestSuite) TestLogAllRSSThresholds(c *C) {
+	s.testLogRSSThresholds(c, 734003299, []int{90, 95, 99}, 0)
+}
+
 func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
 	var collection_create, container_update arvadosclient.Dict
 	s.fullRunHelper(c, `{
diff --git a/lib/crunchrun/testdata/fakestat/cgroup.procs b/lib/crunchrun/testdata/fakestat/cgroup.procs
new file mode 100644
index 000000000..e69de29bb
diff --git a/lib/crunchrun/testdata/fakestat/cgroupid/cgroup.procs b/lib/crunchrun/testdata/fakestat/cgroupid/cgroup.procs
new file mode 100644
index 000000000..e69de29bb
diff --git a/lib/crunchrun/testdata/fakestat/cgroupid/memory.stat b/lib/crunchrun/testdata/fakestat/cgroupid/memory.stat
new file mode 100644
index 000000000..fff3333c4
--- /dev/null
+++ b/lib/crunchrun/testdata/fakestat/cgroupid/memory.stat
@@ -0,0 +1 @@
+rss 734003200
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 5d2059f7e..952e3975c 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -23,6 +23,10 @@ import (
 	"time"
 )
 
+type logPrinter interface {
+	Printf(fmt string, args ...interface{})
+}
+
 // A Reporter gathers statistics for a cgroup and writes them to a
 // log.Logger.
 type Reporter struct {
@@ -49,9 +53,15 @@ type Reporter struct {
 	TempDir string
 
 	// Where to write statistics. Must not be nil.
-	Logger interface {
-		Printf(fmt string, args ...interface{})
-	}
+	Logger logPrinter
+
+	// When stats cross thresholds configured in the fields below,
+	// they are reported to this logger.
+	ThresholdLogger logPrinter
+
+	// MemThresholds maps memory stat names to slices of thresholds.
+	// When the corresponding stat exceeds a threshold, that will be logged.
+	MemThresholds map[string][]Threshold
 
 	kernelPageSize      int64
 	reportedStatFile    map[string]string
@@ -68,6 +78,27 @@ type Reporter struct {
 	flushed chan struct{} // closed when we have made our last report
 }
 
+type Threshold struct {
+	percentage int64
+	threshold  int64
+	total      int64
+}
+
+func NewThresholdFromPercentage(total int64, percentage int64) Threshold {
+	return Threshold{
+		percentage: percentage,
+		threshold:  total * percentage / 100,
+		total:      total,
+	}
+}
+
+func NewThresholdsFromPercentages(total int64, percentages []int64) (thresholds []Threshold) {
+	for _, percentage := range percentages {
+		thresholds = append(thresholds, NewThresholdFromPercentage(total, percentage))
+	}
+	return
+}
+
 // Start starts monitoring in a new goroutine, and returns
 // immediately.
 //
@@ -264,6 +295,32 @@ func (r *Reporter) getMemSample() {
 		thisSample.memStat[stat] = val
 	}
 	r.lastMemSample = thisSample
+
+	if r.ThresholdLogger != nil {
+		for statName, thresholds := range r.MemThresholds {
+			statValue, ok := thisSample.memStat["total_"+statName]
+			if !ok {
+				statValue, ok = thisSample.memStat[statName]
+				if !ok {
+					continue
+				}
+			}
+			var index int
+			var statThreshold Threshold
+			for index, statThreshold = range thresholds {
+				if statValue < statThreshold.threshold {
+					break
+				} else if statThreshold.percentage > 0 {
+					r.ThresholdLogger.Printf("Container using over %d%% of memory (%s %d/%d bytes)",
+						statThreshold.percentage, statName, statValue, statThreshold.total)
+				} else {
+					r.ThresholdLogger.Printf("Container using over %d of memory (%s %s bytes)",
+						statThreshold.threshold, statName, statValue)
+				}
+			}
+			r.MemThresholds[statName] = thresholds[index:]
+		}
+	}
 }
 
 func (r *Reporter) reportMemSample() {
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index b4498a135..d98fe1064 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -6,6 +6,7 @@ package crunchstat
 
 import (
 	"bytes"
+	"fmt"
 	"os"
 	"regexp"
 	"strconv"
@@ -16,6 +17,25 @@ import (
 	. "gopkg.in/check.v1"
 )
 
+const logMsgPrefix = `(?m)(.*\n)*.* msg="`
+const GiB = int64(1024 * 1024 * 1024)
+
+type fakeStat struct {
+	cgroupRoot string
+	statName   string
+	unit       string
+	value      int64
+}
+
+var fakeRSS = fakeStat{
+	cgroupRoot: "testdata/fakestat",
+	statName:   "mem rss",
+	unit:       "bytes",
+	// Note this is the value of total_rss, not rss, because that's what should
+	// always be reported for thresholds and maxima.
+	value: 750 * 1024 * 1024,
+}
+
 func Test(t *testing.T) {
 	TestingT(t)
 }
@@ -90,3 +110,56 @@ func (s *suite) TestReportPIDs(c *C) {
 	}
 	c.Logf("%s", s.logbuf.String())
 }
+
+func (s *suite) testRSSThresholds(c *C, rssPercentages []int64, alertCount int) {
+	c.Assert(alertCount <= len(rssPercentages), Equals, true)
+	rep := Reporter{
+		CgroupRoot: fakeRSS.cgroupRoot,
+		Logger:     s.logger,
+		MemThresholds: map[string][]Threshold{
+			"rss": NewThresholdsFromPercentages(GiB, rssPercentages),
+		},
+		PollPeriod:      time.Second * 10,
+		ThresholdLogger: s.logger,
+	}
+	rep.Start()
+	rep.Stop()
+	logs := s.logbuf.String()
+	c.Logf("%s", logs)
+
+	for index, expectPercentage := range rssPercentages[:alertCount] {
+		var logCheck Checker
+		if index < alertCount {
+			logCheck = Matches
+		} else {
+			logCheck = Not(Matches)
+		}
+		pattern := fmt.Sprintf(`%sContainer using over %d%% of memory \(rss %d/%d bytes\)"`,
+			logMsgPrefix, expectPercentage, fakeRSS.value, GiB)
+		c.Check(logs, logCheck, pattern)
+	}
+}
+
+func (s *suite) TestZeroRSSThresholds(c *C) {
+	s.testRSSThresholds(c, []int64{}, 0)
+}
+
+func (s *suite) TestOneRSSThresholdPassed(c *C) {
+	s.testRSSThresholds(c, []int64{55}, 1)
+}
+
+func (s *suite) TestOneRSSThresholdNotPassed(c *C) {
+	s.testRSSThresholds(c, []int64{85}, 0)
+}
+
+func (s *suite) TestMultipleRSSThresholdsNonePassed(c *C) {
+	s.testRSSThresholds(c, []int64{95, 97, 99}, 0)
+}
+
+func (s *suite) TestMultipleRSSThresholdsSomePassed(c *C) {
+	s.testRSSThresholds(c, []int64{60, 70, 80, 90}, 2)
+}
+
+func (s *suite) TestMultipleRSSThresholdsAllPassed(c *C) {
+	s.testRSSThresholds(c, []int64{1, 2, 3}, 3)
+}
diff --git a/lib/crunchstat/testdata/fakestat/cgroup.procs b/lib/crunchstat/testdata/fakestat/cgroup.procs
new file mode 100644
index 000000000..e69de29bb
diff --git a/lib/crunchstat/testdata/fakestat/memory.stat b/lib/crunchstat/testdata/fakestat/memory.stat
new file mode 100644
index 000000000..0540eea23
--- /dev/null
+++ b/lib/crunchstat/testdata/fakestat/memory.stat
@@ -0,0 +1,2 @@
+rss 990
+total_rss 786432000

commit 4101dd717291bb449b5683a38d9df18f2fabf4eb
Author: Brett Smith <brett.smith at curii.com>
Date:   Fri Feb 10 14:47:19 2023 -0500

    19986: DRY up logger setup in crunchstat tests
    
    Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith at curii.com>

diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index 5e8e93de6..b4498a135 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -6,7 +6,6 @@ package crunchstat
 
 import (
 	"bytes"
-	"log"
 	"os"
 	"regexp"
 	"strconv"
@@ -21,13 +20,25 @@ func Test(t *testing.T) {
 	TestingT(t)
 }
 
-var _ = Suite(&suite{})
+var _ = Suite(&suite{
+	logger: logrus.New(),
+})
 
-type suite struct{}
+type suite struct {
+	logbuf bytes.Buffer
+	logger *logrus.Logger
+}
+
+func (s *suite) SetUpSuite(c *C) {
+	s.logger.Out = &s.logbuf
+}
+
+func (s *suite) SetUpTest(c *C) {
+	s.logbuf.Reset()
+}
 
 func (s *suite) TestReadAllOrWarnFail(c *C) {
-	var logger bytes.Buffer
-	rep := Reporter{Logger: log.New(&logger, "", 0)}
+	rep := Reporter{Logger: s.logger}
 
 	// The special file /proc/self/mem can be opened for
 	// reading, but reading from byte 0 returns an error.
@@ -36,12 +47,11 @@ func (s *suite) TestReadAllOrWarnFail(c *C) {
 	defer f.Close()
 	_, err = rep.readAllOrWarn(f)
 	c.Check(err, NotNil)
-	c.Check(logger.String(), Matches, "^warning: read /proc/self/mem: .*\n")
+	c.Check(s.logbuf.String(), Matches, ".* msg=\"warning: read /proc/self/mem: .*\n")
 }
 
 func (s *suite) TestReadAllOrWarnSuccess(c *C) {
-	var logbuf bytes.Buffer
-	rep := Reporter{Logger: log.New(&logbuf, "", 0)}
+	rep := Reporter{Logger: s.logger}
 
 	f, err := os.Open("./crunchstat_test.go")
 	c.Assert(err, IsNil)
@@ -49,15 +59,12 @@ func (s *suite) TestReadAllOrWarnSuccess(c *C) {
 	data, err := rep.readAllOrWarn(f)
 	c.Check(err, IsNil)
 	c.Check(string(data), Matches, "(?ms).*\npackage crunchstat\n.*")
-	c.Check(logbuf.String(), Equals, "")
+	c.Check(s.logbuf.String(), Equals, "")
 }
 
 func (s *suite) TestReportPIDs(c *C) {
-	var logbuf bytes.Buffer
-	logger := logrus.New()
-	logger.Out = &logbuf
 	r := Reporter{
-		Logger:     logger,
+		Logger:     s.logger,
 		CgroupRoot: "/sys/fs/cgroup",
 		PollPeriod: time.Second,
 	}
@@ -70,7 +77,7 @@ func (s *suite) TestReportPIDs(c *C) {
 			c.Error("timed out")
 			break
 		}
-		if m := regexp.MustCompile(`(?ms).*procmem \d+ init (\d+) test_process.*`).FindSubmatch(logbuf.Bytes()); len(m) > 0 {
+		if m := regexp.MustCompile(`(?ms).*procmem \d+ init (\d+) test_process.*`).FindSubmatch(s.logbuf.Bytes()); len(m) > 0 {
 			size, err := strconv.ParseInt(string(m[1]), 10, 64)
 			c.Check(err, IsNil)
 			// Expect >1 MiB and <100 MiB -- otherwise we
@@ -81,5 +88,5 @@ func (s *suite) TestReportPIDs(c *C) {
 			break
 		}
 	}
-	c.Logf("%s", logbuf.String())
+	c.Logf("%s", s.logbuf.String())
 }

commit 1b92f8cec11d04b5f9464c6ad100f579660f70b9
Author: Brett Smith <brett.smith at curii.com>
Date:   Fri Feb 10 10:28:07 2023 -0500

    19986: Separate collection of cgroup memory stats
    
    This is scaffolding to help us report promptly when a container is
    approaching OOM. This commit does not change any public interface or
    reporting.
    
    Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith at curii.com>

diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 3a473cab8..5d2059f7e 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -59,6 +59,7 @@ type Reporter struct {
 	lastDiskIOSample    map[string]ioSample
 	lastCPUSample       cpuSample
 	lastDiskSpaceSample diskSpaceSample
+	lastMemSample       memSample
 
 	reportPIDs   map[string]int
 	reportPIDsMu sync.Mutex
@@ -246,7 +247,7 @@ type memSample struct {
 	memStat    map[string]int64
 }
 
-func (r *Reporter) doMemoryStats() {
+func (r *Reporter) getMemSample() {
 	c, err := r.openStatFile("memory", "memory.stat", true)
 	if err != nil {
 		return
@@ -254,7 +255,6 @@ func (r *Reporter) doMemoryStats() {
 	defer c.Close()
 	b := bufio.NewScanner(c)
 	thisSample := memSample{time.Now(), make(map[string]int64)}
-	wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
 	for b.Scan() {
 		var stat string
 		var val int64
@@ -263,19 +263,26 @@ func (r *Reporter) doMemoryStats() {
 		}
 		thisSample.memStat[stat] = val
 	}
+	r.lastMemSample = thisSample
+}
+
+func (r *Reporter) reportMemSample() {
 	var outstat bytes.Buffer
+	wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
 	for _, key := range wantStats {
 		// Use "total_X" stats (entire hierarchy) if enabled,
 		// otherwise just the single cgroup -- see
 		// https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt
-		if val, ok := thisSample.memStat["total_"+key]; ok {
+		if val, ok := r.lastMemSample.memStat["total_"+key]; ok {
 			fmt.Fprintf(&outstat, " %d %s", val, key)
-		} else if val, ok := thisSample.memStat[key]; ok {
+		} else if val, ok := r.lastMemSample.memStat[key]; ok {
 			fmt.Fprintf(&outstat, " %d %s", val, key)
 		}
 	}
 	r.Logger.Printf("mem%s\n", outstat.String())
+}
 
+func (r *Reporter) doProcmemStats() {
 	if r.kernelPageSize == 0 {
 		// assign "don't try again" value in case we give up
 		// and return without assigning the real value
@@ -490,6 +497,15 @@ func (r *Reporter) doCPUStats() {
 	r.lastCPUSample = nextSample
 }
 
+func (r *Reporter) doAllStats() {
+	r.reportMemSample()
+	r.doProcmemStats()
+	r.doCPUStats()
+	r.doBlkIOStats()
+	r.doNetworkStats()
+	r.doDiskSpaceStats()
+}
+
 // Report stats periodically until we learn (via r.done) that someone
 // called Stop.
 func (r *Reporter) run() {
@@ -512,17 +528,19 @@ func (r *Reporter) run() {
 		r.Logger.Printf("notice: monitoring temp dir %s\n", r.TempDir)
 	}
 
-	ticker := time.NewTicker(r.PollPeriod)
+	r.getMemSample()
+	r.doAllStats()
+
+	memTicker := time.NewTicker(time.Second)
+	mainTicker := time.NewTicker(r.PollPeriod)
 	for {
-		r.doMemoryStats()
-		r.doCPUStats()
-		r.doBlkIOStats()
-		r.doNetworkStats()
-		r.doDiskSpaceStats()
 		select {
 		case <-r.done:
 			return
-		case <-ticker.C:
+		case <-memTicker.C:
+			r.getMemSample()
+		case <-mainTicker.C:
+			r.doAllStats()
 		}
 	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list