[arvados] created: 2.1.0-2975-g0a0d255e9

git repository hosting git at public.arvados.org
Mon Oct 24 19:16:35 UTC 2022


        at  0a0d255e92c2e5f8b0e534d2c0cde66568c9e8c6 (commit)


commit 0a0d255e92c2e5f8b0e534d2c0cde66568c9e8c6
Author: Tom Clegg <tom at curii.com>
Date:   Mon Oct 24 15:11:26 2022 -0400

    19563: Log memory usage of arv-mount, crunch-run, and keepstore.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index ee9115d8d..2b152f732 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -142,6 +142,7 @@ type ContainerRunner struct {
 	parentTemp    string
 	costStartTime time.Time
 
+	keepstore        *exec.Cmd
 	keepstoreLogger  io.WriteCloser
 	keepstoreLogbuf  *bufThenWrite
 	statLogger       io.WriteCloser
@@ -660,6 +661,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
 	if err != nil {
 		return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
 	}
+	runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
 
 	for _, p := range collectionPaths {
 		_, err = os.Stat(p)
@@ -733,6 +735,7 @@ func (runner *ContainerRunner) startHoststat() error {
 		PollPeriod: runner.statInterval,
 	}
 	runner.hoststatReporter.Start()
+	runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
 	return nil
 }
 
@@ -1569,6 +1572,9 @@ func (runner *ContainerRunner) Run() (err error) {
 	if err != nil {
 		return
 	}
+	if runner.keepstore != nil {
+		runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+	}
 
 	// set up FUSE mount and binds
 	bindmounts, err = runner.SetupMounts()
@@ -1853,6 +1859,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		return 1
 	}
 
+	cr.keepstore = keepstore
 	if keepstore == nil {
 		// Log explanation (if any) for why we're not running
 		// a local keepstore.
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 10cd7cfce..443d2202c 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -13,10 +13,11 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"os"
+	"sort"
 	"strconv"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 )
@@ -47,7 +48,9 @@ type Reporter struct {
 	TempDir string
 
 	// Where to write statistics. Must not be nil.
-	Logger *log.Logger
+	Logger interface {
+		Printf(fmt string, args ...interface{})
+	}
 
 	reportedStatFile    map[string]string
 	lastNetSample       map[string]ioSample
@@ -55,6 +58,9 @@ type Reporter struct {
 	lastCPUSample       cpuSample
 	lastDiskSpaceSample diskSpaceSample
 
+	reportPIDs   map[string]int
+	reportPIDsMu sync.Mutex
+
 	done    chan struct{} // closed when we should stop reporting
 	flushed chan struct{} // closed when we have made our last report
 }
@@ -76,6 +82,17 @@ func (r *Reporter) Start() {
 	go r.run()
 }
 
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+	r.reportPIDsMu.Lock()
+	defer r.reportPIDsMu.Unlock()
+	if r.reportPIDs == nil {
+		r.reportPIDs = map[string]int{name: pid}
+	} else {
+		r.reportPIDs[name] = pid
+	}
+}
+
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
@@ -256,6 +273,45 @@ func (r *Reporter) doMemoryStats() {
 		}
 	}
 	r.Logger.Printf("mem%s\n", outstat.String())
+
+	r.reportPIDsMu.Lock()
+	defer r.reportPIDsMu.Unlock()
+	procnames := make([]string, 0, len(r.reportPIDs))
+	for name := range r.reportPIDs {
+		procnames = append(procnames, name)
+	}
+	sort.Strings(procnames)
+	procmem := ""
+	for _, procname := range procnames {
+		pid := r.reportPIDs[procname]
+		buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+		if err != nil {
+			continue
+		}
+		// If the executable name contains a ')' char,
+		// /proc/$pid/stat will look like '1234 (exec name)) S
+		// 123 ...' -- the last ')' is the end of the 2nd
+		// field.
+		paren := bytes.LastIndexByte(buf, ')')
+		if paren < 0 {
+			continue
+		}
+		fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+		if len(fields) < 24 {
+			continue
+		}
+		// rss is the 24th field in .../stat, and fields[0]
+		// here is the last char ')' of the 2nd field, so
+		// rss is fields[22]
+		rss, err := strconv.Atoi(string(fields[22]))
+		if err != nil {
+			continue
+		}
+		procmem += fmt.Sprintf(" %d %s", rss, procname)
+	}
+	if procmem != "" {
+		r.Logger.Printf("procmem%s\n", procmem)
+	}
 }
 
 func (r *Reporter) doNetworkStats() {
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index c27e39241..f5e2f8662 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -6,20 +6,33 @@ package crunchstat
 
 import (
 	"bufio"
+	"bytes"
 	"io"
 	"log"
 	"os"
 	"regexp"
 	"testing"
+	"time"
+
+	"github.com/sirupsen/logrus"
+	. "gopkg.in/check.v1"
 )
 
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+var _ = Suite(&suite{})
+
+type suite struct{}
+
 func bufLogger() (*log.Logger, *bufio.Reader) {
 	r, w := io.Pipe()
 	logger := log.New(w, "", 0)
 	return logger, bufio.NewReader(r)
 }
 
-func TestReadAllOrWarnFail(t *testing.T) {
+func (s *suite) TestReadAllOrWarnFail(c *C) {
 	logger, rcv := bufLogger()
 	rep := Reporter{Logger: logger}
 
@@ -35,32 +48,57 @@ func TestReadAllOrWarnFail(t *testing.T) {
 		// reading, but reading from byte 0 returns an error.
 		f, err := os.Open("/proc/self/mem")
 		if err != nil {
-			t.Fatalf("Opening /proc/self/mem: %s", err)
+			c.Fatalf("Opening /proc/self/mem: %s", err)
 		}
 		if x, err := rep.readAllOrWarn(f); err == nil {
-			t.Fatalf("Expected error, got %v", x)
+			c.Fatalf("Expected error, got %v", x)
 		}
 	}
 	<-done
 	if err != nil {
-		t.Fatal(err)
+		c.Fatal(err)
 	} else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
-		t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+		c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
 	}
 }
 
-func TestReadAllOrWarnSuccess(t *testing.T) {
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
 	rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
 
 	f, err := os.Open("./crunchstat_test.go")
 	if err != nil {
-		t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+		c.Fatalf("Opening ./crunchstat_test.go: %s", err)
 	}
 	data, err := rep.readAllOrWarn(f)
 	if err != nil {
-		t.Fatalf("got error %s", err)
+		c.Fatalf("got error %s", err)
 	}
 	if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
-		t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+		c.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+	}
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+	var logbuf bytes.Buffer
+	logger := logrus.New()
+	logger.Out = &logbuf
+	r := Reporter{
+		Logger:     logger,
+		CgroupRoot: "/sys/fs/cgroup",
+		PollPeriod: time.Second,
+	}
+	r.Start()
+	r.ReportPID("init", 1)
+	r.ReportPID("test_process", os.Getpid())
+	r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+	for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+		if time.Now().After(deadline) {
+			c.Error("timed out")
+			break
+		}
+		if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
+			break
+		}
 	}
+	c.Logf("%s", logbuf.String())
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list