[arvados] created: 2.1.0-2976-ge55bd0d3b

git repository hosting git at public.arvados.org
Tue Oct 25 14:23:47 UTC 2022


        at  e55bd0d3b54494061d54853c4b613ad680ffb6a9 (commit)


commit e55bd0d3b54494061d54853c4b613ad680ffb6a9
Author: Tom Clegg <tom at curii.com>
Date:   Tue Oct 25 10:22:53 2022 -0400

    19563: Clean up tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index f5e2f8662..922aa369b 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -5,9 +5,7 @@
 package crunchstat
 
 import (
-	"bufio"
 	"bytes"
-	"io"
 	"log"
 	"os"
 	"regexp"
@@ -26,56 +24,31 @@ var _ = Suite(&suite{})
 
 type suite struct{}
 
-func bufLogger() (*log.Logger, *bufio.Reader) {
-	r, w := io.Pipe()
-	logger := log.New(w, "", 0)
-	return logger, bufio.NewReader(r)
-}
-
 func (s *suite) TestReadAllOrWarnFail(c *C) {
-	logger, rcv := bufLogger()
-	rep := Reporter{Logger: logger}
+	var logger bytes.Buffer
+	rep := Reporter{Logger: log.New(&logger, "", 0)}
 
-	done := make(chan bool)
-	var msg []byte
-	var err error
-	go func() {
-		msg, err = rcv.ReadBytes('\n')
-		close(done)
-	}()
-	{
-		// The special file /proc/self/mem can be opened for
-		// reading, but reading from byte 0 returns an error.
-		f, err := os.Open("/proc/self/mem")
-		if err != nil {
-			c.Fatalf("Opening /proc/self/mem: %s", err)
-		}
-		if x, err := rep.readAllOrWarn(f); err == nil {
-			c.Fatalf("Expected error, got %v", x)
-		}
-	}
-	<-done
-	if err != nil {
-		c.Fatal(err)
-	} else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
-		c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
-	}
+	// The special file /proc/self/mem can be opened for
+	// reading, but reading from byte 0 returns an error.
+	f, err := os.Open("/proc/self/mem")
+	c.Assert(err, IsNil)
+	defer f.Close()
+	_, err = rep.readAllOrWarn(f)
+	c.Check(err, NotNil)
+	c.Check(logger.String(), Matches, "^warning: read /proc/self/mem: .*\n")
 }
 
 func (s *suite) TestReadAllOrWarnSuccess(c *C) {
-	rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+	var logbuf bytes.Buffer
+	rep := Reporter{Logger: log.New(&logbuf, "", 0)}
 
 	f, err := os.Open("./crunchstat_test.go")
-	if err != nil {
-		c.Fatalf("Opening ./crunchstat_test.go: %s", err)
-	}
+	c.Assert(err, IsNil)
+	defer f.Close()
 	data, err := rep.readAllOrWarn(f)
-	if err != nil {
-		c.Fatalf("got error %s", err)
-	}
-	if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
-		c.Fatalf("data failed regexp: err %v, matched %v", err, matched)
-	}
+	c.Check(err, IsNil)
+	c.Check(string(data), Matches, "(?ms).*\npackage crunchstat\n.*")
+	c.Check(logbuf.String(), Equals, "")
 }
 
 func (s *suite) TestReportPIDs(c *C) {
@@ -96,7 +69,7 @@ func (s *suite) TestReportPIDs(c *C) {
 			c.Error("timed out")
 			break
 		}
-		if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
+		if regexp.MustCompile(`(?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
 			break
 		}
 	}

commit ca69f0bebc31124d9b61cec4b790d45a94bff379
Author: Tom Clegg <tom at curii.com>
Date:   Mon Oct 24 16:45:45 2022 -0400

    19563: Log memory usage of arv-mount, crunch-run, and keepstore.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index ee9115d8d..55790f727 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -142,6 +142,7 @@ type ContainerRunner struct {
 	parentTemp    string
 	costStartTime time.Time
 
+	keepstore        *exec.Cmd
 	keepstoreLogger  io.WriteCloser
 	keepstoreLogbuf  *bufThenWrite
 	statLogger       io.WriteCloser
@@ -660,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
 	if err != nil {
 		return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
 	}
+	if runner.hoststatReporter != nil && runner.ArvMount != nil {
+		runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+	}
 
 	for _, p := range collectionPaths {
 		_, err = os.Stat(p)
@@ -733,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error {
 		PollPeriod: runner.statInterval,
 	}
 	runner.hoststatReporter.Start()
+	runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
 	return nil
 }
 
@@ -1569,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) {
 	if err != nil {
 		return
 	}
+	if runner.keepstore != nil {
+		runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+	}
 
 	// set up FUSE mount and binds
 	bindmounts, err = runner.SetupMounts()
@@ -1853,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		return 1
 	}
 
+	cr.keepstore = keepstore
 	if keepstore == nil {
 		// Log explanation (if any) for why we're not running
 		// a local keepstore.
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 10cd7cfce..443d2202c 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -13,10 +13,11 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"os"
+	"sort"
 	"strconv"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 )
@@ -47,7 +48,9 @@ type Reporter struct {
 	TempDir string
 
 	// Where to write statistics. Must not be nil.
-	Logger *log.Logger
+	Logger interface {
+		Printf(fmt string, args ...interface{})
+	}
 
 	reportedStatFile    map[string]string
 	lastNetSample       map[string]ioSample
@@ -55,6 +58,9 @@ type Reporter struct {
 	lastCPUSample       cpuSample
 	lastDiskSpaceSample diskSpaceSample
 
+	reportPIDs   map[string]int
+	reportPIDsMu sync.Mutex
+
 	done    chan struct{} // closed when we should stop reporting
 	flushed chan struct{} // closed when we have made our last report
 }
@@ -76,6 +82,17 @@ func (r *Reporter) Start() {
 	go r.run()
 }
 
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+	r.reportPIDsMu.Lock()
+	defer r.reportPIDsMu.Unlock()
+	if r.reportPIDs == nil {
+		r.reportPIDs = map[string]int{name: pid}
+	} else {
+		r.reportPIDs[name] = pid
+	}
+}
+
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
@@ -256,6 +273,45 @@ func (r *Reporter) doMemoryStats() {
 		}
 	}
 	r.Logger.Printf("mem%s\n", outstat.String())
+
+	r.reportPIDsMu.Lock()
+	defer r.reportPIDsMu.Unlock()
+	procnames := make([]string, 0, len(r.reportPIDs))
+	for name := range r.reportPIDs {
+		procnames = append(procnames, name)
+	}
+	sort.Strings(procnames)
+	procmem := ""
+	for _, procname := range procnames {
+		pid := r.reportPIDs[procname]
+		buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+		if err != nil {
+			continue
+		}
+		// If the executable name contains a ')' char,
+		// /proc/$pid/stat will look like '1234 (exec name)) S
+		// 123 ...' -- the last ')' is the end of the 2nd
+		// field.
+		paren := bytes.LastIndexByte(buf, ')')
+		if paren < 0 {
+			continue
+		}
+		fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+		if len(fields) < 24 {
+			continue
+		}
+		// rss is the 24th field in .../stat, and fields[0]
+		// here is the last char ')' of the 2nd field, so
+		// rss is fields[22]
+		rss, err := strconv.Atoi(string(fields[22]))
+		if err != nil {
+			continue
+		}
+		procmem += fmt.Sprintf(" %d %s", rss, procname)
+	}
+	if procmem != "" {
+		r.Logger.Printf("procmem%s\n", procmem)
+	}
 }
 
 func (r *Reporter) doNetworkStats() {
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index c27e39241..f5e2f8662 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -6,20 +6,33 @@ package crunchstat
 
 import (
 	"bufio"
+	"bytes"
 	"io"
 	"log"
 	"os"
 	"regexp"
 	"testing"
+	"time"
+
+	"github.com/sirupsen/logrus"
+	. "gopkg.in/check.v1"
 )
 
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+var _ = Suite(&suite{})
+
+type suite struct{}
+
 func bufLogger() (*log.Logger, *bufio.Reader) {
 	r, w := io.Pipe()
 	logger := log.New(w, "", 0)
 	return logger, bufio.NewReader(r)
 }
 
-func TestReadAllOrWarnFail(t *testing.T) {
+func (s *suite) TestReadAllOrWarnFail(c *C) {
 	logger, rcv := bufLogger()
 	rep := Reporter{Logger: logger}
 
@@ -35,32 +48,57 @@ func TestReadAllOrWarnFail(t *testing.T) {
 		// reading, but reading from byte 0 returns an error.
 		f, err := os.Open("/proc/self/mem")
 		if err != nil {
-			t.Fatalf("Opening /proc/self/mem: %s", err)
+			c.Fatalf("Opening /proc/self/mem: %s", err)
 		}
 		if x, err := rep.readAllOrWarn(f); err == nil {
-			t.Fatalf("Expected error, got %v", x)
+			c.Fatalf("Expected error, got %v", x)
 		}
 	}
 	<-done
 	if err != nil {
-		t.Fatal(err)
+		c.Fatal(err)
 	} else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
-		t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+		c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
 	}
 }
 
-func TestReadAllOrWarnSuccess(t *testing.T) {
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
 	rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
 
 	f, err := os.Open("./crunchstat_test.go")
 	if err != nil {
-		t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+		c.Fatalf("Opening ./crunchstat_test.go: %s", err)
 	}
 	data, err := rep.readAllOrWarn(f)
 	if err != nil {
-		t.Fatalf("got error %s", err)
+		c.Fatalf("got error %s", err)
 	}
 	if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
-		t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+		c.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+	}
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+	var logbuf bytes.Buffer
+	logger := logrus.New()
+	logger.Out = &logbuf
+	r := Reporter{
+		Logger:     logger,
+		CgroupRoot: "/sys/fs/cgroup",
+		PollPeriod: time.Second,
+	}
+	r.Start()
+	r.ReportPID("init", 1)
+	r.ReportPID("test_process", os.Getpid())
+	r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+	for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+		if time.Now().After(deadline) {
+			c.Error("timed out")
+			break
+		}
+		if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
+			break
+		}
 	}
+	c.Logf("%s", logbuf.String())
 }
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 6383eae54..d28bee0f5 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -28,6 +28,10 @@ var (
 	version               = "dev"
 )
 
+type logger interface {
+	Printf(string, ...interface{})
+}
+
 func main() {
 	reporter := crunchstat.Reporter{
 		Logger: log.New(os.Stderr, "crunchstat: ", 0),
@@ -55,9 +59,11 @@ func main() {
 	reporter.Logger.Printf("crunchstat %s started", version)
 
 	if reporter.CgroupRoot == "" {
-		reporter.Logger.Fatal("error: must provide -cgroup-root")
+		reporter.Logger.Printf("error: must provide -cgroup-root")
+		os.Exit(2)
 	} else if signalOnDeadPPID < 0 {
-		reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+		reporter.Logger.Printf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+		os.Exit(2)
 	}
 	reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
 
@@ -76,17 +82,19 @@ func main() {
 		if status, ok := err.Sys().(syscall.WaitStatus); ok {
 			os.Exit(status.ExitStatus())
 		} else {
-			reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
+			reporter.Logger.Printf("ExitError without WaitStatus: %v", err)
+			os.Exit(1)
 		}
 	} else if err != nil {
-		reporter.Logger.Fatalln("error in cmd.Wait:", err)
+		reporter.Logger.Printf("error running command: %v", err)
+		os.Exit(1)
 	}
 }
 
-func runCommand(argv []string, logger *log.Logger) error {
+func runCommand(argv []string, logger logger) error {
 	cmd := exec.Command(argv[0], argv[1:]...)
 
-	logger.Println("Running", argv)
+	logger.Printf("Running %v", argv)
 
 	// Child process will use our stdin and stdout pipes
 	// (we close our copies below)
@@ -100,7 +108,7 @@ func runCommand(argv []string, logger *log.Logger) error {
 		if cmd.Process != nil {
 			cmd.Process.Signal(catch)
 		}
-		logger.Println("notice: caught signal:", catch)
+		logger.Printf("notice: caught signal: %v", catch)
 	}(sigChan)
 	signal.Notify(sigChan, syscall.SIGTERM)
 	signal.Notify(sigChan, syscall.SIGINT)
@@ -113,24 +121,30 @@ func runCommand(argv []string, logger *log.Logger) error {
 	// Funnel stderr through our channel
 	stderrPipe, err := cmd.StderrPipe()
 	if err != nil {
-		logger.Fatalln("error in StderrPipe:", err)
+		logger.Printf("error in StderrPipe: %v", err)
+		return err
 	}
 
 	// Run subprocess
 	if err := cmd.Start(); err != nil {
-		logger.Fatalln("error in cmd.Start:", err)
+		logger.Printf("error in cmd.Start: %v", err)
+		return err
 	}
 
 	// Close stdin/stdout in this (parent) process
 	os.Stdin.Close()
 	os.Stdout.Close()
 
-	copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+	err = copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+	if err != nil {
+		cmd.Process.Kill()
+		return err
+	}
 
 	return cmd.Wait()
 }
 
-func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
+func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger logger) {
 	ticker := time.NewTicker(intvl)
 	for range ticker.C {
 		ppid := os.Getppid()
@@ -152,7 +166,7 @@ func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.C
 	}
 }
 
-func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+func copyPipeToChildLog(in io.ReadCloser, logger logger) error {
 	reader := bufio.NewReaderSize(in, MaxLogLine)
 	var prefix string
 	for {
@@ -160,13 +174,13 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
 		if err == io.EOF {
 			break
 		} else if err != nil {
-			logger.Fatal("error reading child stderr:", err)
+			return fmt.Errorf("error reading child stderr: %w", err)
 		}
 		var suffix string
 		if isPrefix {
 			suffix = "[...]"
 		}
-		logger.Print(prefix, string(line), suffix)
+		logger.Printf("%s%s%s", prefix, string(line), suffix)
 		// Set up prefix for following line
 		if isPrefix {
 			prefix = "[...]"
@@ -174,5 +188,5 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
 			prefix = ""
 		}
 	}
-	in.Close()
+	return in.Close()
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list