[arvados] created: 2.1.0-2975-g0a0d255e9
git repository hosting
git at public.arvados.org
Mon Oct 24 19:16:35 UTC 2022
at 0a0d255e92c2e5f8b0e534d2c0cde66568c9e8c6 (commit)
commit 0a0d255e92c2e5f8b0e534d2c0cde66568c9e8c6
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 24 15:11:26 2022 -0400
19563: Log memory usage of arv-mount, crunch-run, and keepstore.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index ee9115d8d..2b152f732 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -142,6 +142,7 @@ type ContainerRunner struct {
parentTemp string
costStartTime time.Time
+ keepstore *exec.Cmd
keepstoreLogger io.WriteCloser
keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
@@ -660,6 +661,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
if err != nil {
return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
}
+ runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
for _, p := range collectionPaths {
_, err = os.Stat(p)
@@ -733,6 +735,7 @@ func (runner *ContainerRunner) startHoststat() error {
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
+ runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
return nil
}
@@ -1569,6 +1572,9 @@ func (runner *ContainerRunner) Run() (err error) {
if err != nil {
return
}
+ if runner.keepstore != nil {
+ runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+ }
// set up FUSE mount and binds
bindmounts, err = runner.SetupMounts()
@@ -1853,6 +1859,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
return 1
}
+ cr.keepstore = keepstore
if keepstore == nil {
// Log explanation (if any) for why we're not running
// a local keepstore.
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 10cd7cfce..443d2202c 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -13,10 +13,11 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"os"
+ "sort"
"strconv"
"strings"
+ "sync"
"syscall"
"time"
)
@@ -47,7 +48,9 @@ type Reporter struct {
TempDir string
// Where to write statistics. Must not be nil.
- Logger *log.Logger
+ Logger interface {
+ Printf(fmt string, args ...interface{})
+ }
reportedStatFile map[string]string
lastNetSample map[string]ioSample
@@ -55,6 +58,9 @@ type Reporter struct {
lastCPUSample cpuSample
lastDiskSpaceSample diskSpaceSample
+ reportPIDs map[string]int
+ reportPIDsMu sync.Mutex
+
done chan struct{} // closed when we should stop reporting
flushed chan struct{} // closed when we have made our last report
}
@@ -76,6 +82,17 @@ func (r *Reporter) Start() {
go r.run()
}
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ if r.reportPIDs == nil {
+ r.reportPIDs = map[string]int{name: pid}
+ } else {
+ r.reportPIDs[name] = pid
+ }
+}
+
// Stop reporting. Do not call more than once, or before calling
// Start.
//
@@ -256,6 +273,45 @@ func (r *Reporter) doMemoryStats() {
}
}
r.Logger.Printf("mem%s\n", outstat.String())
+
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ procnames := make([]string, 0, len(r.reportPIDs))
+ for name := range r.reportPIDs {
+ procnames = append(procnames, name)
+ }
+ sort.Strings(procnames)
+ procmem := ""
+ for _, procname := range procnames {
+ pid := r.reportPIDs[procname]
+ buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+ if err != nil {
+ continue
+ }
+ // If the executable name contains a ')' char,
+ // /proc/$pid/stat will look like '1234 (exec name)) S
+ // 123 ...' -- the last ')' is the end of the 2nd
+ // field.
+ paren := bytes.LastIndexByte(buf, ')')
+ if paren < 0 {
+ continue
+ }
+ fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+ if len(fields) < 24 {
+ continue
+ }
+ // rss is the 24th field in .../stat, and fields[0]
+ // here is the last char ')' of the 2nd field, so
+ // rss is fields[22]
+ rss, err := strconv.Atoi(string(fields[22]))
+ if err != nil {
+ continue
+ }
+ procmem += fmt.Sprintf(" %d %s", rss, procname)
+ }
+ if procmem != "" {
+ r.Logger.Printf("procmem%s\n", procmem)
+ }
}
func (r *Reporter) doNetworkStats() {
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index c27e39241..f5e2f8662 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -6,20 +6,33 @@ package crunchstat
import (
"bufio"
+ "bytes"
"io"
"log"
"os"
"regexp"
"testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ . "gopkg.in/check.v1"
)
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+var _ = Suite(&suite{})
+
+type suite struct{}
+
func bufLogger() (*log.Logger, *bufio.Reader) {
r, w := io.Pipe()
logger := log.New(w, "", 0)
return logger, bufio.NewReader(r)
}
-func TestReadAllOrWarnFail(t *testing.T) {
+func (s *suite) TestReadAllOrWarnFail(c *C) {
logger, rcv := bufLogger()
rep := Reporter{Logger: logger}
@@ -35,32 +48,57 @@ func TestReadAllOrWarnFail(t *testing.T) {
// reading, but reading from byte 0 returns an error.
f, err := os.Open("/proc/self/mem")
if err != nil {
- t.Fatalf("Opening /proc/self/mem: %s", err)
+ c.Fatalf("Opening /proc/self/mem: %s", err)
}
if x, err := rep.readAllOrWarn(f); err == nil {
- t.Fatalf("Expected error, got %v", x)
+ c.Fatalf("Expected error, got %v", x)
}
}
<-done
if err != nil {
- t.Fatal(err)
+ c.Fatal(err)
} else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
- t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+ c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
}
}
-func TestReadAllOrWarnSuccess(t *testing.T) {
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
f, err := os.Open("./crunchstat_test.go")
if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+ c.Fatalf("Opening ./crunchstat_test.go: %s", err)
}
data, err := rep.readAllOrWarn(f)
if err != nil {
- t.Fatalf("got error %s", err)
+ c.Fatalf("got error %s", err)
}
if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ c.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ }
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+ var logbuf bytes.Buffer
+ logger := logrus.New()
+ logger.Out = &logbuf
+ r := Reporter{
+ Logger: logger,
+ CgroupRoot: "/sys/fs/cgroup",
+ PollPeriod: time.Second,
+ }
+ r.Start()
+ r.ReportPID("init", 1)
+ r.ReportPID("test_process", os.Getpid())
+ r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+ for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+ if time.Now().After(deadline) {
+ c.Error("timed out")
+ break
+ }
+ if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
+ break
+ }
}
+ c.Logf("%s", logbuf.String())
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list