[arvados] created: 2.1.0-2976-ge55bd0d3b
git repository hosting
git at public.arvados.org
Tue Oct 25 14:23:47 UTC 2022
at e55bd0d3b54494061d54853c4b613ad680ffb6a9 (commit)
commit e55bd0d3b54494061d54853c4b613ad680ffb6a9
Author: Tom Clegg <tom at curii.com>
Date: Tue Oct 25 10:22:53 2022 -0400
19563: Clean up tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index f5e2f8662..922aa369b 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -5,9 +5,7 @@
package crunchstat
import (
- "bufio"
"bytes"
- "io"
"log"
"os"
"regexp"
@@ -26,56 +24,31 @@ var _ = Suite(&suite{})
type suite struct{}
-func bufLogger() (*log.Logger, *bufio.Reader) {
- r, w := io.Pipe()
- logger := log.New(w, "", 0)
- return logger, bufio.NewReader(r)
-}
-
func (s *suite) TestReadAllOrWarnFail(c *C) {
- logger, rcv := bufLogger()
- rep := Reporter{Logger: logger}
+ var logger bytes.Buffer
+ rep := Reporter{Logger: log.New(&logger, "", 0)}
- done := make(chan bool)
- var msg []byte
- var err error
- go func() {
- msg, err = rcv.ReadBytes('\n')
- close(done)
- }()
- {
- // The special file /proc/self/mem can be opened for
- // reading, but reading from byte 0 returns an error.
- f, err := os.Open("/proc/self/mem")
- if err != nil {
- c.Fatalf("Opening /proc/self/mem: %s", err)
- }
- if x, err := rep.readAllOrWarn(f); err == nil {
- c.Fatalf("Expected error, got %v", x)
- }
- }
- <-done
- if err != nil {
- c.Fatal(err)
- } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
- c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
- }
+ // The special file /proc/self/mem can be opened for
+ // reading, but reading from byte 0 returns an error.
+ f, err := os.Open("/proc/self/mem")
+ c.Assert(err, IsNil)
+ defer f.Close()
+ _, err = rep.readAllOrWarn(f)
+ c.Check(err, NotNil)
+ c.Check(logger.String(), Matches, "^warning: read /proc/self/mem: .*\n")
}
func (s *suite) TestReadAllOrWarnSuccess(c *C) {
- rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+ var logbuf bytes.Buffer
+ rep := Reporter{Logger: log.New(&logbuf, "", 0)}
f, err := os.Open("./crunchstat_test.go")
- if err != nil {
- c.Fatalf("Opening ./crunchstat_test.go: %s", err)
- }
+ c.Assert(err, IsNil)
+ defer f.Close()
data, err := rep.readAllOrWarn(f)
- if err != nil {
- c.Fatalf("got error %s", err)
- }
- if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
- c.Fatalf("data failed regexp: err %v, matched %v", err, matched)
- }
+ c.Check(err, IsNil)
+ c.Check(string(data), Matches, "(?ms).*\npackage crunchstat\n.*")
+ c.Check(logbuf.String(), Equals, "")
}
func (s *suite) TestReportPIDs(c *C) {
@@ -96,7 +69,7 @@ func (s *suite) TestReportPIDs(c *C) {
c.Error("timed out")
break
}
- if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
+ if regexp.MustCompile(`(?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
break
}
}
commit ca69f0bebc31124d9b61cec4b790d45a94bff379
Author: Tom Clegg <tom at curii.com>
Date: Mon Oct 24 16:45:45 2022 -0400
19563: Log memory usage of arv-mount, crunch-run, and keepstore.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index ee9115d8d..55790f727 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -142,6 +142,7 @@ type ContainerRunner struct {
parentTemp string
costStartTime time.Time
+ keepstore *exec.Cmd
keepstoreLogger io.WriteCloser
keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
@@ -660,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
if err != nil {
return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
}
+ if runner.hoststatReporter != nil && runner.ArvMount != nil {
+ runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+ }
for _, p := range collectionPaths {
_, err = os.Stat(p)
@@ -733,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error {
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
+ runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
return nil
}
@@ -1569,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) {
if err != nil {
return
}
+ if runner.keepstore != nil {
+ runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+ }
// set up FUSE mount and binds
bindmounts, err = runner.SetupMounts()
@@ -1853,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
return 1
}
+ cr.keepstore = keepstore
if keepstore == nil {
// Log explanation (if any) for why we're not running
// a local keepstore.
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index 10cd7cfce..443d2202c 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -13,10 +13,11 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"os"
+ "sort"
"strconv"
"strings"
+ "sync"
"syscall"
"time"
)
@@ -47,7 +48,9 @@ type Reporter struct {
TempDir string
// Where to write statistics. Must not be nil.
- Logger *log.Logger
+ Logger interface {
+ Printf(fmt string, args ...interface{})
+ }
reportedStatFile map[string]string
lastNetSample map[string]ioSample
@@ -55,6 +58,9 @@ type Reporter struct {
lastCPUSample cpuSample
lastDiskSpaceSample diskSpaceSample
+ reportPIDs map[string]int
+ reportPIDsMu sync.Mutex
+
done chan struct{} // closed when we should stop reporting
flushed chan struct{} // closed when we have made our last report
}
@@ -76,6 +82,17 @@ func (r *Reporter) Start() {
go r.run()
}
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ if r.reportPIDs == nil {
+ r.reportPIDs = map[string]int{name: pid}
+ } else {
+ r.reportPIDs[name] = pid
+ }
+}
+
// Stop reporting. Do not call more than once, or before calling
// Start.
//
@@ -256,6 +273,45 @@ func (r *Reporter) doMemoryStats() {
}
}
r.Logger.Printf("mem%s\n", outstat.String())
+
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ procnames := make([]string, 0, len(r.reportPIDs))
+ for name := range r.reportPIDs {
+ procnames = append(procnames, name)
+ }
+ sort.Strings(procnames)
+ procmem := ""
+ for _, procname := range procnames {
+ pid := r.reportPIDs[procname]
+ buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+ if err != nil {
+ continue
+ }
+ // If the executable name contains a ')' char,
+ // /proc/$pid/stat will look like '1234 (exec name)) S
+ // 123 ...' -- the last ')' is the end of the 2nd
+ // field.
+ paren := bytes.LastIndexByte(buf, ')')
+ if paren < 0 {
+ continue
+ }
+ fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+ if len(fields) < 24 {
+ continue
+ }
+ // rss is the 24th field in .../stat, and fields[0]
+ // here is the last char ')' of the 2nd field, so
+ // rss is fields[22]
+ rss, err := strconv.Atoi(string(fields[22]))
+ if err != nil {
+ continue
+ }
+ procmem += fmt.Sprintf(" %d %s", rss, procname)
+ }
+ if procmem != "" {
+ r.Logger.Printf("procmem%s\n", procmem)
+ }
}
func (r *Reporter) doNetworkStats() {
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
index c27e39241..f5e2f8662 100644
--- a/lib/crunchstat/crunchstat_test.go
+++ b/lib/crunchstat/crunchstat_test.go
@@ -6,20 +6,33 @@ package crunchstat
import (
"bufio"
+ "bytes"
"io"
"log"
"os"
"regexp"
"testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ . "gopkg.in/check.v1"
)
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+var _ = Suite(&suite{})
+
+type suite struct{}
+
func bufLogger() (*log.Logger, *bufio.Reader) {
r, w := io.Pipe()
logger := log.New(w, "", 0)
return logger, bufio.NewReader(r)
}
-func TestReadAllOrWarnFail(t *testing.T) {
+func (s *suite) TestReadAllOrWarnFail(c *C) {
logger, rcv := bufLogger()
rep := Reporter{Logger: logger}
@@ -35,32 +48,57 @@ func TestReadAllOrWarnFail(t *testing.T) {
// reading, but reading from byte 0 returns an error.
f, err := os.Open("/proc/self/mem")
if err != nil {
- t.Fatalf("Opening /proc/self/mem: %s", err)
+ c.Fatalf("Opening /proc/self/mem: %s", err)
}
if x, err := rep.readAllOrWarn(f); err == nil {
- t.Fatalf("Expected error, got %v", x)
+ c.Fatalf("Expected error, got %v", x)
}
}
<-done
if err != nil {
- t.Fatal(err)
+ c.Fatal(err)
} else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
- t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+ c.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
}
}
-func TestReadAllOrWarnSuccess(t *testing.T) {
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
f, err := os.Open("./crunchstat_test.go")
if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+ c.Fatalf("Opening ./crunchstat_test.go: %s", err)
}
data, err := rep.readAllOrWarn(f)
if err != nil {
- t.Fatalf("got error %s", err)
+ c.Fatalf("got error %s", err)
}
if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ c.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ }
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+ var logbuf bytes.Buffer
+ logger := logrus.New()
+ logger.Out = &logbuf
+ r := Reporter{
+ Logger: logger,
+ CgroupRoot: "/sys/fs/cgroup",
+ PollPeriod: time.Second,
+ }
+ r.Start()
+ r.ReportPID("init", 1)
+ r.ReportPID("test_process", os.Getpid())
+ r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+ for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+ if time.Now().After(deadline) {
+ c.Error("timed out")
+ break
+ }
+ if regexp.MustCompile(`(!?ms).*procmem \d+ init \d+ test_process.*`).MatchString(logbuf.String()) {
+ break
+ }
}
+ c.Logf("%s", logbuf.String())
}
diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go
index 6383eae54..d28bee0f5 100644
--- a/services/crunchstat/crunchstat.go
+++ b/services/crunchstat/crunchstat.go
@@ -28,6 +28,10 @@ var (
version = "dev"
)
+type logger interface {
+ Printf(string, ...interface{})
+}
+
func main() {
reporter := crunchstat.Reporter{
Logger: log.New(os.Stderr, "crunchstat: ", 0),
@@ -55,9 +59,11 @@ func main() {
reporter.Logger.Printf("crunchstat %s started", version)
if reporter.CgroupRoot == "" {
- reporter.Logger.Fatal("error: must provide -cgroup-root")
+ reporter.Logger.Printf("error: must provide -cgroup-root")
+ os.Exit(2)
} else if signalOnDeadPPID < 0 {
- reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+ reporter.Logger.Printf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+ os.Exit(2)
}
reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
@@ -76,17 +82,19 @@ func main() {
if status, ok := err.Sys().(syscall.WaitStatus); ok {
os.Exit(status.ExitStatus())
} else {
- reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
+ reporter.Logger.Printf("ExitError without WaitStatus: %v", err)
+ os.Exit(1)
}
} else if err != nil {
- reporter.Logger.Fatalln("error in cmd.Wait:", err)
+ reporter.Logger.Printf("error running command: %v", err)
+ os.Exit(1)
}
}
-func runCommand(argv []string, logger *log.Logger) error {
+func runCommand(argv []string, logger logger) error {
cmd := exec.Command(argv[0], argv[1:]...)
- logger.Println("Running", argv)
+ logger.Printf("Running %v", argv)
// Child process will use our stdin and stdout pipes
// (we close our copies below)
@@ -100,7 +108,7 @@ func runCommand(argv []string, logger *log.Logger) error {
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- logger.Println("notice: caught signal:", catch)
+ logger.Printf("notice: caught signal: %v", catch)
}(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
@@ -113,24 +121,30 @@ func runCommand(argv []string, logger *log.Logger) error {
// Funnel stderr through our channel
stderrPipe, err := cmd.StderrPipe()
if err != nil {
- logger.Fatalln("error in StderrPipe:", err)
+ logger.Printf("error in StderrPipe: %v", err)
+ return err
}
// Run subprocess
if err := cmd.Start(); err != nil {
- logger.Fatalln("error in cmd.Start:", err)
+ logger.Printf("error in cmd.Start: %v", err)
+ return err
}
// Close stdin/stdout in this (parent) process
os.Stdin.Close()
os.Stdout.Close()
- copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+ err = copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+ if err != nil {
+ cmd.Process.Kill()
+ return err
+ }
return cmd.Wait()
}
-func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
+func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger logger) {
ticker := time.NewTicker(intvl)
for range ticker.C {
ppid := os.Getppid()
@@ -152,7 +166,7 @@ func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.C
}
}
-func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+func copyPipeToChildLog(in io.ReadCloser, logger logger) error {
reader := bufio.NewReaderSize(in, MaxLogLine)
var prefix string
for {
@@ -160,13 +174,13 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
if err == io.EOF {
break
} else if err != nil {
- logger.Fatal("error reading child stderr:", err)
+ return fmt.Errorf("error reading child stderr: %w", err)
}
var suffix string
if isPrefix {
suffix = "[...]"
}
- logger.Print(prefix, string(line), suffix)
+ logger.Printf("%s%s%s", prefix, string(line), suffix)
// Set up prefix for following line
if isPrefix {
prefix = "[...]"
@@ -174,5 +188,5 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
prefix = ""
}
}
- in.Close()
+ return in.Close()
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list