[arvados] created: 2.7.0-6514-gdf033ed92b

git repository hosting git at public.arvados.org
Wed May 1 13:47:11 UTC 2024


        at  df033ed92b9991c396004855a56b6cdb32350307 (commit)


commit df033ed92b9991c396004855a56b6cdb32350307
Author: Tom Clegg <tom at curii.com>
Date:   Wed May 1 09:39:29 2024 -0400

    21611: Remove log-to-logs-table code.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 607d569e23..ad2adaa3e2 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -81,9 +81,6 @@ type IKeepClient interface {
 	SetStorageClasses(sc []string)
 }
 
-// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) (io.WriteCloser, error)
-
 type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
@@ -125,8 +122,7 @@ type ContainerRunner struct {
 	Container     arvados.Container
 	token         string
 	ExitCode      *int
-	NewLogWriter  NewLogWriter
-	CrunchLog     *ThrottledLogger
+	CrunchLog     *logWriter
 	logUUID       string
 	logMtx        sync.Mutex
 	LogCollection arvados.CollectionFileSystem
@@ -168,7 +164,7 @@ type ContainerRunner struct {
 	enableNetwork     string // one of "default" or "always"
 	networkMode       string // "none", "host", or "" -- passed through to executor
 	brokenNodeHook    string // script to run if node appears to be broken
-	arvMountLog       *ThrottledLogger
+	arvMountLog       io.WriteCloser
 
 	containerWatchdogInterval time.Duration
 
@@ -303,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
 	}
 	c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
 
-	w, err := runner.NewLogWriter("arv-mount")
+	runner.arvMountLog, err = runner.openLogFile("arv-mount")
 	if err != nil {
 		return nil, err
 	}
-	runner.arvMountLog = NewThrottledLogger(w)
 	scanner := logScanner{
 		Patterns: []string{
 			"Keep write error",
@@ -735,13 +730,13 @@ func (runner *ContainerRunner) stopHoststat() error {
 }
 
 func (runner *ContainerRunner) startHoststat() error {
-	w, err := runner.NewLogWriter("hoststat")
+	var err error
+	runner.hoststatLogger, err = runner.openLogFile("hoststat")
 	if err != nil {
 		return err
 	}
-	runner.hoststatLogger = NewThrottledLogger(w)
 	runner.hoststatReporter = &crunchstat.Reporter{
-		Logger: log.New(runner.hoststatLogger, "", 0),
+		Logger: newLogWriter(runner.hoststatLogger),
 		// Our own cgroup is the "host" cgroup, in the sense
 		// that it accounts for resource usage outside the
 		// container. It doesn't count _all_ resource usage on
@@ -759,15 +754,15 @@ func (runner *ContainerRunner) startHoststat() error {
 }
 
 func (runner *ContainerRunner) startCrunchstat() error {
-	w, err := runner.NewLogWriter("crunchstat")
+	var err error
+	runner.statLogger, err = runner.openLogFile("crunchstat")
 	if err != nil {
 		return err
 	}
-	runner.statLogger = NewThrottledLogger(w)
 	runner.statReporter = &crunchstat.Reporter{
 		Pid:    runner.executor.Pid,
 		FS:     runner.crunchstatFakeFS,
-		Logger: log.New(runner.statLogger, "", 0),
+		Logger: newLogWriter(runner.statLogger),
 		MemThresholds: map[string][]crunchstat.Threshold{
 			"rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
 		},
@@ -790,7 +785,7 @@ type infoCommand struct {
 // might differ from what's described in the node record (see
 // LogNodeRecord).
 func (runner *ContainerRunner) LogHostInfo() (err error) {
-	w, err := runner.NewLogWriter("node-info")
+	w, err := runner.openLogFile("node-info")
 	if err != nil {
 		return
 	}
@@ -875,13 +870,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
 	if err != nil {
 		return false, err
 	}
-	w := &ArvLogWriter{
-		ArvClient:     runner.DispatcherArvClient,
-		UUID:          runner.Container.UUID,
-		loggingStream: label,
-		writeCloser:   writer,
-	}
-
 	reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
 	if err != nil {
 		return false, fmt.Errorf("error getting %s record: %v", label, err)
@@ -901,12 +889,12 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
 		return false, nil
 	}
 	// Re-encode it using indentation to improve readability
-	enc := json.NewEncoder(w)
+	enc := json.NewEncoder(writer)
 	enc.SetIndent("", "    ")
 	if err = enc.Encode(items[0]); err != nil {
 		return false, fmt.Errorf("error logging %s record: %v", label, err)
 	}
-	err = w.Close()
+	err = writer.Close()
 	if err != nil {
 		return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
 	}
@@ -974,10 +962,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
 			return err
 		}
 		stdout = f
-	} else if w, err := runner.NewLogWriter("stdout"); err != nil {
+	} else if w, err := runner.openLogFile("stdout"); err != nil {
 		return err
 	} else {
-		stdout = NewThrottledLogger(w)
+		stdout = w
 	}
 
 	if mnt, ok := runner.Container.Mounts["stderr"]; ok {
@@ -986,10 +974,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
 			return err
 		}
 		stderr = f
-	} else if w, err := runner.NewLogWriter("stderr"); err != nil {
+	} else if w, err := runner.openLogFile("stderr"); err != nil {
 		return err
 	} else {
-		stderr = NewThrottledLogger(w)
+		stderr = w
 	}
 
 	env := runner.Container.Environment
@@ -1441,19 +1429,11 @@ func (runner *ContainerRunner) CommitLogs() error {
 		if runner.arvMountLog != nil {
 			runner.arvMountLog.Close()
 		}
-		runner.CrunchLog.Close()
-
-		// Closing CrunchLog above allows them to be committed to Keep at this
-		// point, but re-open crunch log with ArvClient in case there are any
-		// other further errors (such as failing to write the log to Keep!)
-		// while shutting down
-		runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
-			ArvClient:     runner.DispatcherArvClient,
-			UUID:          runner.Container.UUID,
-			loggingStream: "crunch-run",
-			writeCloser:   nil,
-		})
-		runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+
+		// From now on just log to stderr, in case there are
+		// any other further errors (such as failing to write
+		// the log to Keep!)  while shutting down
+		runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" ")))
 	}()
 
 	if runner.keepstoreLogger != nil {
@@ -1613,18 +1593,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
 	return runner.cCancelled
 }
 
-// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
-	writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
-	if err != nil {
-		return nil, err
-	}
-	return &ArvLogWriter{
-		ArvClient:     runner.DispatcherArvClient,
-		UUID:          runner.Container.UUID,
-		loggingStream: name,
-		writeCloser:   writer,
-	}, nil
+func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) {
+	return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
 }
 
 // Run the full container lifecycle.
@@ -1654,9 +1624,7 @@ func (runner *ContainerRunner) Run() (err error) {
 
 	defer func() {
 		runner.CleanupDirs()
-
 		runner.CrunchLog.Printf("crunch-run finished")
-		runner.CrunchLog.Close()
 	}()
 
 	err = runner.fetchContainerRecord()
@@ -1850,7 +1818,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 		DispatcherArvClient:  dispatcherArvClient,
 		DispatcherKeepClient: dispatcherKeepClient,
 	}
-	cr.NewLogWriter = cr.NewArvLogWriter
 	cr.RunArvMount = cr.ArvMountCmd
 	cr.MkTempDir = ioutil.TempDir
 	cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1873,14 +1840,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 		return nil, err
 	}
 	cr.Container.UUID = containerUUID
-	w, err := cr.NewLogWriter("crunch-run")
+	f, err := cr.openLogFile("crunch-run")
 	if err != nil {
 		return nil, err
 	}
-	cr.CrunchLog = NewThrottledLogger(w)
-	cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+	cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" "))))
 
-	loadLogThrottleParams(dispatcherArvClient)
 	go cr.updateLogs()
 
 	return cr, nil
@@ -2025,12 +1990,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		keepstoreLogbuf.SetWriter(io.Discard)
 	} else {
 		cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
-		logwriter, err := cr.NewLogWriter("keepstore")
+		cr.keepstoreLogger, err = cr.openLogFile("keepstore")
 		if err != nil {
 			log.Print(err)
 			return 1
 		}
-		cr.keepstoreLogger = NewThrottledLogger(logwriter)
 
 		var writer io.WriteCloser = cr.keepstoreLogger
 		if logWhat == "errors" {
@@ -2056,13 +2020,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
 	default:
 		cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
-		cr.CrunchLog.Close()
 		return 1
 	}
 	if err != nil {
 		cr.CrunchLog.Printf("%s: %v", containerUUID, err)
 		cr.checkBrokenNode(err)
-		cr.CrunchLog.Close()
 		return 1
 	}
 	defer cr.executor.Close()
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 5cb982e1bb..58ae1c190c 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -14,7 +14,6 @@ import (
 	"io"
 	"io/fs"
 	"io/ioutil"
-	"log"
 	"math/rand"
 	"net/http"
 	"net/http/httptest"
@@ -121,7 +120,6 @@ type ArvTestClient struct {
 	Content []arvadosclient.Dict
 	arvados.Container
 	secretMounts []byte
-	Logs         map[string]*bytes.Buffer
 	sync.Mutex
 	WasSetRunning bool
 	callraw       bool
@@ -207,14 +205,7 @@ func (client *ArvTestClient) Create(resourceType string,
 	client.Content = append(client.Content, parameters)
 
 	if resourceType == "logs" {
-		et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
-		if client.Logs == nil {
-			client.Logs = make(map[string]*bytes.Buffer)
-		}
-		if client.Logs[et] == nil {
-			client.Logs[et] = &bytes.Buffer{}
-		}
-		client.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
+		panic("logs.create called")
 	}
 
 	if resourceType == "collections" && output != nil {
@@ -607,29 +598,6 @@ func (*KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename
 	return ErrorReader{}, nil
 }
 
-type ClosableBuffer struct {
-	bytes.Buffer
-}
-
-func (*ClosableBuffer) Close() error {
-	return nil
-}
-
-type TestLogs struct {
-	Stdout ClosableBuffer
-	Stderr ClosableBuffer
-}
-
-func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) {
-	if logstr == "stdout" {
-		return &tl.Stdout, nil
-	}
-	if logstr == "stderr" {
-		return &tl.Stderr, nil
-	}
-	return nil, errors.New("???")
-}
-
 func dockerLog(fd byte, msg string) []byte {
 	by := []byte(msg)
 	header := make([]byte, 8+len(by))
@@ -645,8 +613,6 @@ func (s *TestSuite) TestRunContainer(c *C) {
 		return 0
 	}
 
-	var logs TestLogs
-	s.runner.NewLogWriter = logs.NewTestLoggingWriter
 	s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
 	s.runner.Container.Command = []string{"./hw"}
 	s.runner.Container.OutputStorageClasses = []string{"default"}
@@ -663,8 +629,8 @@ func (s *TestSuite) TestRunContainer(c *C) {
 	err = s.runner.WaitFinish()
 	c.Assert(err, IsNil)
 
-	c.Check(logs.Stdout.String(), Matches, ".*Hello world\n")
-	c.Check(logs.Stderr.String(), Equals, "")
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `Hello world\n`)
+	c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ``)
 }
 
 func (s *TestSuite) TestCommitLogs(c *C) {
@@ -673,7 +639,9 @@ func (s *TestSuite) TestCommitLogs(c *C) {
 	defer kc.Close()
 	cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
-	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+	f, err := cr.openLogFile("crunch-run")
+	c.Assert(err, IsNil)
+	cr.CrunchLog = newLogWriter(newTestTimestamper(f))
 
 	cr.CrunchLog.Print("Hello world!")
 	cr.CrunchLog.Print("Goodbye")
@@ -682,10 +650,10 @@ func (s *TestSuite) TestCommitLogs(c *C) {
 	err = cr.CommitLogs()
 	c.Check(err, IsNil)
 
-	c.Check(api.Calls, Equals, 2)
-	c.Check(api.Content[1]["ensure_unique_name"], Equals, true)
-	c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-	c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
+	c.Check(api.Calls, Equals, 1)
+	c.Check(api.Content[0]["ensure_unique_name"], Equals, true)
+	c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
 	c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
 }
 
@@ -806,10 +774,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn
 	}
 
 	if err != nil {
-		for k, v := range s.api.Logs {
-			c.Log(k)
-			c.Log(v.String())
-		}
+		dumpAllLogFiles(c, s.runner)
 	}
 
 	return s.api, s.runner, realTemp
@@ -844,7 +809,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
 
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-	c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello world\n")
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "hello world\n")
 	c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"default"})
 	c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"default"})
 }
@@ -935,13 +900,13 @@ func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
 		time.Sleep(time.Second)
 		return 0
 	})
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
 	if failureRate == 1 {
-		c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+		c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
 	} else {
 		text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
-		c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
+		c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*`+text+`.*`)
 		c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
 		c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
 		c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
@@ -966,7 +931,7 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) {
 	})
 
 	c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*maximum run time exceeded.*")
 }
 
 func (s *TestSuite) TestContainerWaitFails(c *C) {
@@ -984,7 +949,7 @@ func (s *TestSuite) TestContainerWaitFails(c *C) {
 	})
 
 	c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Container is not running.*")
 }
 
 func (s *TestSuite) TestCrunchstat(c *C) {
@@ -1007,11 +972,10 @@ func (s *TestSuite) TestCrunchstat(c *C) {
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-	c.Assert(s.api.Logs["crunchstat"], NotNil)
-	c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
+	c.Check(logFileContent(c, s.runner, "crunchstat.txt"), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
 
 	// Check that we called (*crunchstat.Reporter)Stop().
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
 }
 
 func (s *TestSuite) TestNodeInfoLog(c *C) {
@@ -1032,19 +996,16 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-	buf, err := fs.ReadFile(arvados.FS(s.runner.LogCollection), "/node.json")
-	c.Assert(err, IsNil)
-	json := string(buf)
+	json := logFileContent(c, s.runner, "node.json")
 	c.Check(json, Matches, `(?ms).*"ProviderType": *"a1\.2xlarge".*`)
 	c.Check(json, Matches, `(?ms).*"Price": *1\.2.*`)
 
-	c.Assert(s.api.Logs["node-info"], NotNil)
-	json = s.api.Logs["node-info"].String()
-	c.Check(json, Matches, `(?ms).*Host Information.*`)
-	c.Check(json, Matches, `(?ms).*CPU Information.*`)
-	c.Check(json, Matches, `(?ms).*Memory Information.*`)
-	c.Check(json, Matches, `(?ms).*Disk Space.*`)
-	c.Check(json, Matches, `(?ms).*Disk INodes.*`)
+	nodeinfo := logFileContent(c, s.runner, "node-info.txt")
+	c.Check(nodeinfo, Matches, `(?ms).*Host Information.*`)
+	c.Check(nodeinfo, Matches, `(?ms).*CPU Information.*`)
+	c.Check(nodeinfo, Matches, `(?ms).*Memory Information.*`)
+	c.Check(nodeinfo, Matches, `(?ms).*Disk Space.*`)
+	c.Check(nodeinfo, Matches, `(?ms).*Disk INodes.*`)
 }
 
 func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
@@ -1062,11 +1023,10 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
 		return 0
 	})
 
-	c.Assert(s.api.Logs["crunch-run"], NotNil)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Using container runtime: stub.*`)
 }
 
 func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExpected int) {
@@ -1082,8 +1042,9 @@ func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExp
 		"runtime_constraints": {"ram": `+fmt.Sprintf("%d", ram)+`},
 		"state": "Locked"
 	}`, nil, func() int { return 0 })
-	c.Logf("=== crunchstat logs\n%s\n", s.api.Logs["crunchstat"].String())
-	logs := s.api.Logs["crunch-run"].String()
+	logs := logFileContent(c, s.runner, "crunch-run.txt")
+	c.Log("=== crunchstat logs")
+	c.Log(logs)
 	pattern := logLineStart + `Container using over %d%% of memory \(rss %d/%d bytes\)`
 	var threshold int
 	for _, threshold = range expected {
@@ -1121,7 +1082,7 @@ func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
         "runtime_constraints": {"ram": `+fmt.Sprintf("%d", s.debian12MemoryCurrent*10)+`},
         "state": "Locked"
     }`, nil, func() int { return 0 })
-	logs := s.api.Logs["crunch-run"].String()
+	logs := logFileContent(c, s.runner, "crunch-run.txt")
 	for _, expected := range []string{
 		`Maximum disk usage was \d+%, \d+/\d+ bytes`,
 		fmt.Sprintf(`Maximum container memory swap usage was %d bytes`, s.debian12SwapCurrent),
@@ -1189,8 +1150,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-	c.Assert(s.api.Logs["container"], NotNil)
-	c.Check(s.api.Logs["container"].String(), Matches, `(?ms).*container_image.*`)
+	c.Check(logFileContent(c, s.runner, "container.json"), Matches, `(?ms).*container_image.*`)
 }
 
 func (s *TestSuite) TestFullRunStderr(c *C) {
@@ -1215,8 +1175,8 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
 	c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
 	c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
 
-	c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello\n")
-	c.Check(s.api.Logs["stderr"].String(), Matches, ".*world\n")
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*hello\n")
+	c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ".*world\n")
 }
 
 func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
@@ -1237,8 +1197,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
 
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-	c.Log(s.api.Logs["stdout"])
-	c.Check(s.api.Logs["stdout"].String(), Matches, `.*workdir=""\n`)
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*workdir=""`)
 }
 
 func (s *TestSuite) TestFullRunSetCwd(c *C) {
@@ -1259,7 +1218,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
 
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-	c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n")
 }
 
 func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
@@ -1281,7 +1240,7 @@ func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
 
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-	c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n")
 	c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
 	c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
 }
@@ -1378,14 +1337,11 @@ func (s *TestSuite) testStopContainer(c *C) {
 	case err = <-done:
 		c.Check(err, IsNil)
 	}
-	for k, v := range s.api.Logs {
-		c.Log(k)
-		c.Log(v.String(), "\n")
-	}
+	dumpAllLogFiles(c, s.runner)
 
 	c.Check(s.api.CalledWith("container.log", nil), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-	c.Check(s.api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$")
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "(?ms).*foo\n$")
 }
 
 func (s *TestSuite) TestFullRunSetEnv(c *C) {
@@ -1406,7 +1362,7 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
 
 	c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-	c.Check(s.api.Logs["stdout"].String(), Matches, `.*map\[FROBIZ:bilbo\]\n`)
+	c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*map\[FROBIZ:bilbo\]`)
 }
 
 type ArvMountCmdLine struct {
@@ -1831,7 +1787,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
 	})
 	c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*status code 3\n.*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*status code 3\n.*`)
 }
 
 func (s *TestSuite) TestFullRunSetOutput(c *C) {
@@ -1886,7 +1842,7 @@ func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) {
 	c.Check(s.api.CalledWith("container.runtime_status.warning", "arv-mount: Keep write error"), NotNil)
 	c.Check(s.api.CalledWith("container.runtime_status.warningDetail", "Test: Keep write error: I am a teapot"), NotNil)
 	c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
+	c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
 }
 
 func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
@@ -2196,13 +2152,14 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
     "state": "Locked"
 }`, nil, func() int { return 0 })
 		c.Check(s.api.CalledWith("container.state", nextState), NotNil)
-		c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+		logs := logFileContent(c, s.runner, "crunch-run.txt")
+		c.Check(logs, Matches, "(?ms).*unable to run containers.*")
 		if s.runner.brokenNodeHook != "" {
-			c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
-			c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
-			c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+			c.Check(logs, Matches, "(?ms).*Running broken node hook.*")
+			c.Check(logs, Matches, "(?ms).*killme.*")
+			c.Check(logs, Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
 		} else {
-			c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+			c.Check(logs, Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
 		}
 	}
 }
@@ -2227,7 +2184,7 @@ func (s *TestSuite) TestBadCommand(c *C) {
     "state": "Locked"
 }`, nil, func() int { return 0 })
 		c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-		c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+		c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Possible causes:.*is missing.*")
 	}
 }
 
@@ -2337,7 +2294,7 @@ func (s *TestSuite) TestCalculateCost(c *C) {
 	cr := s.runner
 	cr.costStartTime = now.Add(-time.Hour)
 	var logbuf bytes.Buffer
-	cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
+	cr.CrunchLog = newLogWriter(&logbuf)
 
 	// if there's no InstanceType env var, cost is calculated as 0
 	os.Unsetenv("InstanceType")
@@ -2463,3 +2420,20 @@ type FakeProcess struct {
 func (fp FakeProcess) CmdlineSlice() ([]string, error) {
 	return fp.cmdLine, nil
 }
+
+func logFileContent(c *C, cr *ContainerRunner, fnm string) string {
+	buf, err := fs.ReadFile(arvados.FS(cr.LogCollection), fnm)
+	c.Assert(err, IsNil)
+	return string(buf)
+}
+
+func dumpAllLogFiles(c *C, cr *ContainerRunner) {
+	d, err := cr.LogCollection.OpenFile("/", os.O_RDONLY, 0)
+	c.Assert(err, IsNil)
+	fis, err := d.Readdir(-1)
+	c.Assert(err, IsNil)
+	for _, fi := range fis {
+		c.Logf("=== %s", fi.Name())
+		c.Log(logFileContent(c, cr, fi.Name()))
+	}
+}
diff --git a/lib/crunchrun/cuda.go b/lib/crunchrun/cuda.go
index c693dbcb96..f91a5c62cd 100644
--- a/lib/crunchrun/cuda.go
+++ b/lib/crunchrun/cuda.go
@@ -5,13 +5,15 @@
 package crunchrun
 
 import (
+	"fmt"
+	"io"
 	"os/exec"
 )
 
 // nvidiaModprobe makes sure all the nvidia kernel modules and devices
 // are set up.  If we don't have all the modules/devices set up we get
 // "CUDA_ERROR_UNKNOWN".
-func nvidiaModprobe(writer *ThrottledLogger) {
+func nvidiaModprobe(writer io.Writer) {
 	// The underlying problem is that when normally running
 	// directly on the host, the CUDA SDK will automatically
 	// detect and set up the devices on demand.  However, when
@@ -42,7 +44,7 @@ func nvidiaModprobe(writer *ThrottledLogger) {
 	nvidiaSmi.Stderr = writer
 	err := nvidiaSmi.Run()
 	if err != nil {
-		writer.Printf("Warning %v: %v", nvidiaSmi.Args, err)
+		fmt.Fprintf(writer, "Warning %v: %v\n", nvidiaSmi.Args, err)
 	}
 
 	// Load the kernel modules & devices associated with
@@ -63,7 +65,7 @@ func nvidiaModprobe(writer *ThrottledLogger) {
 		nvmodprobe.Stderr = writer
 		err = nvmodprobe.Run()
 		if err != nil {
-			writer.Printf("Warning %v: %v", nvmodprobe.Args, err)
+			fmt.Fprintf(writer, "Warning %v: %v\n", nvmodprobe.Args, err)
 		}
 	}
 }
diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go
index 91a1b77cf4..04c3724969 100644
--- a/lib/crunchrun/logging.go
+++ b/lib/crunchrun/logging.go
@@ -5,373 +5,82 @@
 package crunchrun
 
 import (
-	"bufio"
 	"bytes"
 	"encoding/json"
-	"fmt"
 	"io"
 	"log"
-	"regexp"
-	"strings"
-	"sync"
 	"time"
-
-	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 )
 
-// Timestamper is the signature for a function that takes a timestamp and
-// return a formated string value.
-type Timestamper func(t time.Time) string
-
-// Logging plumbing:
-//
-// ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher ->
-// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
-//
-// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
-// data from the stdout/stderr Reader and send to the Logger.
+// rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
-// ThrottledLogger accepts writes, prepends a timestamp to each line of the
-// write, and periodically flushes to a downstream writer.  It supports the
-// "Logger" and "WriteCloser" interfaces.
-type ThrottledLogger struct {
-	*log.Logger
-	buf *bytes.Buffer
-	sync.Mutex
-	writer   io.WriteCloser
-	flush    chan struct{}
-	stopped  chan struct{}
-	stopping chan struct{}
-	Timestamper
-	Immediate    *log.Logger
-	pendingFlush bool
+// prefixer wraps an io.Writer, inserting a string returned by
+// prefixFunc at the beginning of each line.
+type prefixer struct {
+	writer     io.Writer
+	prefixFunc func() string
+	unfinished bool // true if the most recent write ended with a non-newline char
 }
 
-// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
-const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-
-// RFC3339Timestamp formats t as RFC3339NanoFixed.
-func RFC3339Timestamp(t time.Time) string {
-	return t.Format(RFC3339NanoFixed)
-}
-
-// Write prepends a timestamp to each line of the input data and
-// appends to the internal buffer. Each line is also logged to
-// tl.Immediate, if tl.Immediate is not nil.
-func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
-	tl.Mutex.Lock()
-	defer tl.Mutex.Unlock()
-
-	if tl.buf == nil {
-		tl.buf = &bytes.Buffer{}
-	}
-
-	now := tl.Timestamper(time.Now().UTC())
-	sc := bufio.NewScanner(bytes.NewBuffer(p))
-	for err == nil && sc.Scan() {
-		out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
-		if tl.Immediate != nil {
-			tl.Immediate.Print(out[:len(out)-1])
-		}
-		_, err = io.WriteString(tl.buf, out)
-	}
-	if err == nil {
-		err = sc.Err()
-		if err == nil {
-			n = len(p)
-		}
-	}
-
-	if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
-		// Non-blocking send.  Try send a flush if it is ready to
-		// accept it.  Otherwise do nothing because a flush is already
-		// pending.
-		select {
-		case tl.flush <- struct{}{}:
-		default:
-		}
+// newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed
+// timestamp at the beginning of each line.
+func newTimestamper(w io.Writer) *prefixer {
+	return &prefixer{
+		writer:     w,
+		prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") },
 	}
-
-	return
 }
 
-// Periodically check the current buffer; if not empty, send it on the
-// channel to the goWriter goroutine.
-func (tl *ThrottledLogger) flusher() {
-	ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
-	defer ticker.Stop()
-	for stopping := false; !stopping; {
-		select {
-		case <-tl.stopping:
-			// flush tl.buf and exit the loop
-			stopping = true
-		case <-tl.flush:
-		case <-ticker.C:
-		}
-
-		var ready *bytes.Buffer
-
-		tl.Mutex.Lock()
-		ready, tl.buf = tl.buf, &bytes.Buffer{}
-		tl.Mutex.Unlock()
-
-		if ready != nil && ready.Len() > 0 {
-			tl.writer.Write(ready.Bytes())
-		}
-	}
-	close(tl.stopped)
-}
-
-// Close the flusher goroutine and wait for it to complete, then close the
-// underlying Writer.
-func (tl *ThrottledLogger) Close() error {
-	select {
-	case <-tl.stopping:
-		// already stopped
-	default:
-		close(tl.stopping)
+// newStringPrefixer wraps an io.Writer, inserting the given string at
+// the beginning of each line. The given string should include a
+// trailing space for readability.
+func newStringPrefixer(w io.Writer, s string) *prefixer {
+	return &prefixer{
+		writer:     w,
+		prefixFunc: func() string { return s },
 	}
-	<-tl.stopped
-	return tl.writer.Close()
 }
 
-const (
-	// MaxLogLine is the maximum length of stdout/stderr lines before they are split.
-	MaxLogLine = 1 << 12
-)
-
-// ReadWriteLines reads lines from a reader and writes to a Writer, with long
-// line splitting.
-func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
-	reader := bufio.NewReaderSize(in, MaxLogLine)
-	var prefix string
-	for {
-		line, isPrefix, err := reader.ReadLine()
-		if err == io.EOF {
-			break
-		} else if err != nil {
-			writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
-		}
-		var suffix string
-		if isPrefix {
-			suffix = "[...]\n"
-		}
-
-		if prefix == "" && suffix == "" {
-			writer.Write(line)
-		} else {
-			writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
+func (tp *prefixer) Write(p []byte) (n int, err error) {
+	for len(p) > 0 && err == nil {
+		if !tp.unfinished {
+			_, err = io.WriteString(tp.writer, tp.prefixFunc())
+			if err != nil {
+				return
+			}
 		}
-
-		// Set up prefix for following line
-		if isPrefix {
-			prefix = "[...]"
+		newline := bytes.IndexRune(p, '\n')
+		var nn int
+		if newline < 0 {
+			tp.unfinished = true
+			nn, err = tp.writer.Write(p)
+			p = nil
 		} else {
-			prefix = ""
+			tp.unfinished = false
+			nn, err = tp.writer.Write(p[:newline+1])
+			p = p[nn:]
 		}
+		n += nn
 	}
-	done <- true
-}
-
-// NewThrottledLogger creates a new thottled logger that
-//   - prepends timestamps to each line, and
-//   - batches log messages and only calls the underlying Writer
-//     at most once per "crunchLogSecondsBetweenEvents" seconds.
-func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
-	tl := &ThrottledLogger{}
-	tl.flush = make(chan struct{}, 1)
-	tl.stopped = make(chan struct{})
-	tl.stopping = make(chan struct{})
-	tl.writer = writer
-	tl.Logger = log.New(tl, "", 0)
-	tl.Timestamper = RFC3339Timestamp
-	go tl.flusher()
-	return tl
-}
-
-// Log throttling rate limiting config parameters
-var crunchLimitLogBytesPerJob int64 = 67108864
-var crunchLogThrottleBytes int64 = 65536
-var crunchLogThrottlePeriod time.Duration = time.Second * 60
-var crunchLogThrottleLines int64 = 1024
-var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
-var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents = time.Second
-var crunchLogUpdatePeriod = time.Hour / 2
-var crunchLogUpdateSize = int64(1 << 25)
-
-// ArvLogWriter is an io.WriteCloser that processes each write by
-// writing it through to another io.WriteCloser (typically a
-// CollectionFileWriter) and creating an Arvados log entry.
-type ArvLogWriter struct {
-	ArvClient     IArvadosClient
-	UUID          string
-	loggingStream string
-	writeCloser   io.WriteCloser
-
-	// for rate limiting
-	bytesLogged                  int64
-	logThrottleResetTime         time.Time
-	logThrottleLinesSoFar        int64
-	logThrottleBytesSoFar        int64
-	logThrottleBytesSkipped      int64
-	logThrottleIsOpen            bool
-	logThrottlePartialLineNextAt time.Time
-	logThrottleFirstPartialLine  bool
-	bufToFlush                   bytes.Buffer
-	bufFlushedAt                 time.Time
-	closing                      bool
+	return
 }
 
-func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
-	// Write to the next writer in the chain (a file in Keep)
-	var err1 error
-	if arvlog.writeCloser != nil {
-		_, err1 = arvlog.writeCloser.Write(p)
-	}
-
-	// write to API after checking rate limit
-	now := time.Now()
-
-	if now.After(arvlog.logThrottleResetTime) {
-		// It has been more than throttle_period seconds since the last
-		// checkpoint; so reset the throttle
-		if arvlog.logThrottleBytesSkipped > 0 {
-			arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
-		}
-
-		arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
-		arvlog.logThrottleBytesSoFar = 0
-		arvlog.logThrottleLinesSoFar = 0
-		arvlog.logThrottleBytesSkipped = 0
-		arvlog.logThrottleIsOpen = true
-	}
-
-	lines := bytes.Split(p, []byte("\n"))
-
-	for _, line := range lines {
-		// Short circuit the counting code if we're just going to throw
-		// away the data anyway.
-		if !arvlog.logThrottleIsOpen {
-			arvlog.logThrottleBytesSkipped += int64(len(line))
-			continue
-		} else if len(line) == 0 {
-			continue
-		}
-
-		// check rateLimit
-		logOpen, msg := arvlog.rateLimit(line, now)
-		if logOpen {
-			arvlog.bufToFlush.WriteString(string(msg) + "\n")
-		}
-	}
-
-	if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
-		(now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
-		arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
-		// write to API
-		lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-			"object_uuid": arvlog.UUID,
-			"event_type":  arvlog.loggingStream,
-			"properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
-		err2 := arvlog.ArvClient.Create("logs", lr, nil)
-
-		arvlog.bufToFlush = bytes.Buffer{}
-		arvlog.bufFlushedAt = now
-
-		if err1 != nil || err2 != nil {
-			return 0, fmt.Errorf("%s ; %s", err1, err2)
-		}
-	}
-
-	return len(p), nil
+// logWriter adds log.Logger methods to an io.Writer.
+type logWriter struct {
+	io.Writer
+	*log.Logger
 }
 
-// Close the underlying writer
-func (arvlog *ArvLogWriter) Close() (err error) {
-	arvlog.closing = true
-	arvlog.Write([]byte{})
-	if arvlog.writeCloser != nil {
-		err = arvlog.writeCloser.Close()
-		arvlog.writeCloser = nil
+func newLogWriter(w io.Writer) *logWriter {
+	return &logWriter{
+		Writer: w,
+		Logger: log.New(w, "", 0),
 	}
-	return err
 }
 
-var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
-
-// Test for hard cap on total output and for log throttling. Returns whether
-// the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
-	message := ""
-	lineSize := int64(len(line))
-
-	if arvlog.logThrottleIsOpen {
-		matches := lineRegexp.FindStringSubmatch(string(line))
-
-		if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
-			// This is a partial line.
-
-			if arvlog.logThrottleFirstPartialLine {
-				// Partial should be suppressed.  First time this is happening for this line so provide a message instead.
-				arvlog.logThrottleFirstPartialLine = false
-				arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
-				arvlog.logThrottleBytesSkipped += lineSize
-				return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
-					RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
-			} else if now.After(arvlog.logThrottlePartialLineNextAt) {
-				// The throttle period has passed.  Update timestamp and let it through.
-				arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
-			} else {
-				// Suppress line.
-				arvlog.logThrottleBytesSkipped += lineSize
-				return false, line
-			}
-		} else {
-			// Not a partial line so reset.
-			arvlog.logThrottlePartialLineNextAt = time.Time{}
-			arvlog.logThrottleFirstPartialLine = true
-		}
-
-		arvlog.bytesLogged += lineSize
-		arvlog.logThrottleBytesSoFar += lineSize
-		arvlog.logThrottleLinesSoFar++
-
-		if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
-			message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
-				RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
-			arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
-			arvlog.logThrottleIsOpen = false
-
-		} else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
-			remainingTime := arvlog.logThrottleResetTime.Sub(now)
-			message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
-				RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
-			arvlog.logThrottleIsOpen = false
-
-		} else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
-			remainingTime := arvlog.logThrottleResetTime.Sub(now)
-			message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
-				RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
-			arvlog.logThrottleIsOpen = false
-
-		}
-	}
-
-	if !arvlog.logThrottleIsOpen {
-		// Don't log anything if any limit has been exceeded. Just count lossage.
-		arvlog.logThrottleBytesSkipped += lineSize
-	}
-
-	if message != "" {
-		// Yes, write to logs, but use our "rate exceeded" message
-		// instead of the log message that exceeded the limit.
-		message += " A complete log is still being written to Keep, and will be available when the job finishes."
-		return true, []byte(message)
-	}
-	return arvlog.logThrottleIsOpen, line
-}
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
 
 // load the rate limit discovery config parameters
 func loadLogThrottleParams(clnt IArvadosClient) {
@@ -394,13 +103,6 @@ func loadLogThrottleParams(clnt IArvadosClient) {
 		}
 	}
 
-	loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
-	loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
-	loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
-	loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
-	loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
-	loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
-	loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
 	loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
 	loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
diff --git a/lib/crunchrun/logging_test.go b/lib/crunchrun/logging_test.go
index ee3320c7c3..e747a2069e 100644
--- a/lib/crunchrun/logging_test.go
+++ b/lib/crunchrun/logging_test.go
@@ -13,26 +13,25 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
-	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	. "gopkg.in/check.v1"
 	check "gopkg.in/check.v1"
 )
 
-type LoggingTestSuite struct {
-	client *arvados.Client
-}
-
-type TestTimestamper struct {
-	count int
+// newTestTimestamper wraps an io.Writer, inserting a predictable
+// RFC3339NanoFixed timestamp at the beginning of each line.
+func newTestTimestamper(w io.Writer) *prefixer {
+	count := 0
+	return &prefixer{
+		writer: w,
+		prefixFunc: func() string {
+			count++
+			return fmt.Sprintf("2015-12-29T15:51:45.%09dZ ", count)
+		},
+	}
 }
 
-func (stamper *TestTimestamper) Timestamp(t time.Time) string {
-	stamper.count++
-	t, err := time.ParseInLocation(time.RFC3339Nano, fmt.Sprintf("2015-12-29T15:51:45.%09dZ", stamper.count), t.Location())
-	if err != nil {
-		panic(err)
-	}
-	return RFC3339Timestamp(t)
+type LoggingTestSuite struct {
+	client *arvados.Client
 }
 
 // Gocheck boilerplate
@@ -48,26 +47,20 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	defer kc.Close()
-	cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
+	cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
-	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+	f, err := cr.openLogFile("crunch-run")
+	c.Assert(err, IsNil)
+	cr.CrunchLog = newLogWriter(newTestTimestamper(f))
 
 	cr.CrunchLog.Print("Hello world!")
 	cr.CrunchLog.Print("Goodbye")
-	cr.CrunchLog.Close()
-
-	c.Check(api.Calls, Equals, 1)
-
-	mt, err := cr.LogCollection.MarshalManifest(".")
-	c.Check(err, IsNil)
-	c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
 
-	logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
-		"2015-12-29T15:51:45.000000002Z Goodbye\n"
+	c.Check(api.Calls, Equals, 0)
 
-	c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
-	c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
-	s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext)
+	logs := logFileContent(c, cr, "crunch-run.txt")
+	c.Check(logs, Matches, `....-..-..T..:..:..\..........Z Hello world!\n`+
+		`....-..-..T..:..:..\..........Z Goodbye\n`)
 }
 
 func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
@@ -79,59 +72,34 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
 	defer kc.Close()
 	cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
 	c.Assert(err, IsNil)
-	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
-	cr.CrunchLog.Immediate = nil
-
+	f, err := cr.openLogFile("crunch-run")
+	c.Assert(err, IsNil)
+	cr.CrunchLog = newLogWriter(newTestTimestamper(f))
 	for i := 0; i < 2000000; i++ {
 		cr.CrunchLog.Printf("Hello %d", i)
 	}
 	cr.CrunchLog.Print("Goodbye")
-	cr.CrunchLog.Close()
-
-	c.Check(api.Calls > 0, Equals, true)
-	c.Check(api.Calls < 2000000, Equals, true)
-
-	mt, err := cr.LogCollection.MarshalManifest(".")
-	c.Check(err, IsNil)
-	c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
-}
-
-func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
-	api := &ArvTestClient{}
-	kc := &KeepTestClient{}
-	defer kc.Close()
-	cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
-	c.Assert(err, IsNil)
-	ts := &TestTimestamper{}
-	cr.CrunchLog.Timestamper = ts.Timestamp
-	w, err := cr.NewLogWriter("stdout")
-	c.Assert(err, IsNil)
-	stdout := NewThrottledLogger(w)
-	stdout.Timestamper = ts.Timestamp
-
-	cr.CrunchLog.Print("Hello world!")
-	stdout.Print("Doing stuff")
-	cr.CrunchLog.Print("Goodbye")
-	stdout.Print("Blurb")
-	cr.CrunchLog.Close()
-	stdout.Close()
-
-	logText := make(map[string]string)
-	for _, content := range api.Content {
-		log := content["log"].(arvadosclient.Dict)
-		logText[log["event_type"].(string)] += log["properties"].(map[string]string)["text"]
-	}
 
-	c.Check(logText["crunch-run"], Equals, `2015-12-29T15:51:45.000000001Z Hello world!
-2015-12-29T15:51:45.000000003Z Goodbye
-`)
-	c.Check(logText["stdout"], Equals, `2015-12-29T15:51:45.000000002Z Doing stuff
-2015-12-29T15:51:45.000000004Z Blurb
+	logs := logFileContent(c, cr, "crunch-run.txt")
+	c.Check(strings.Count(logs, "\n"), Equals, 2000001)
+	// Redact most of the logs except the start/end for the regexp
+	// match -- otherwise, when the regexp fails, gocheck spams
+	// the test logs with tens of megabytes of quoted strings.
+	c.Assert(len(logs) > 10000, Equals, true)
+	c.Check(logs[:500]+"\n...\n"+logs[len(logs)-500:], Matches, `(?ms)2015-12-29T15:51:45.000000001Z Hello 0
+2015-12-29T15:51:45.000000002Z Hello 1
+2015-12-29T15:51:45.000000003Z Hello 2
+2015-12-29T15:51:45.000000004Z Hello 3
+.*
+2015-12-29T15:51:45.001999998Z Hello 1999997
+2015-12-29T15:51:45.001999999Z Hello 1999998
+2015-12-29T15:51:45.002000000Z Hello 1999999
+2015-12-29T15:51:45.002000001Z Goodbye
 `)
 
 	mt, err := cr.LogCollection.MarshalManifest(".")
 	c.Check(err, IsNil)
-	c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
+	c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
 }
 
 func (s *LoggingTestSuite) TestLogUpdate(c *C) {
@@ -149,14 +117,13 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) {
 		api := &ArvTestClient{}
 		kc := &KeepTestClient{}
 		defer kc.Close()
-		cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
+		cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz")
 		c.Assert(err, IsNil)
-		ts := &TestTimestamper{}
-		cr.CrunchLog.Timestamper = ts.Timestamp
-		w, err := cr.NewLogWriter("stdout")
+		f, err := cr.openLogFile("crunch-run")
+		c.Assert(err, IsNil)
+		cr.CrunchLog = newLogWriter(newTestTimestamper(f))
+		stdout, err := cr.openLogFile("stdout")
 		c.Assert(err, IsNil)
-		stdout := NewThrottledLogger(w)
-		stdout.Timestamper = ts.Timestamp
 
 		c.Check(cr.logUUID, Equals, "")
 		cr.CrunchLog.Printf("Hello %1000s", "space")
@@ -165,75 +132,18 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) {
 		}
 		c.Check(cr.logUUID, Not(Equals), "")
 		cr.CrunchLog.Print("Goodbye")
-		fmt.Fprint(stdout, "Goodbye\n")
-		cr.CrunchLog.Close()
-		stdout.Close()
-		w.Close()
+		fmt.Fprintln(stdout, "Goodbye")
+
+		c.Check(logFileContent(c, cr, "crunch-run.txt"), Matches, `....-..-..T..:..:............Z Hello  {995}space\n`+
+			`....-..-..T..:..:............Z Goodbye\n`)
+		c.Check(logFileContent(c, cr, "stdout.txt"), Matches, `Goodbye\n`)
 
 		mt, err := cr.LogCollection.MarshalManifest(".")
 		c.Check(err, IsNil)
-		// Block packing depends on whether there's an update
-		// between the two Goodbyes -- either way the first
-		// block will be 4dc76.
-		c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 (afc503bc1b9a828b4bb543cb629e936c\+78|90699dc22545cd74a0664303f70bc05a\+39 276b49339fd5203d15a93ff3de11bfb9\+39) 0:1077:crunch-run.txt 1077:39:stdout.txt\n`)
+		c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 5be52044a8c51e7b62dd62be07872968\+47 0:1077:crunch-run.txt 1077:8:stdout.txt\n`)
 	}
 }
 
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
-	s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) {
-	s.testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) {
-	s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithZeroBytesPerJob(c *C) {
-	s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 0, 67108864, "Exceeded log limit 0 bytes (crunch_limit_log_bytes_per_job)")
-}
-
-func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
-	discoveryMap[throttleParam] = float64(throttleValue)
-	defer func() {
-		discoveryMap[throttleParam] = float64(throttleDefault)
-	}()
-
-	api := &ArvTestClient{}
-	kc := &KeepTestClient{}
-	defer kc.Close()
-	cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
-	c.Assert(err, IsNil)
-	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
-
-	cr.CrunchLog.Print("Hello world!")
-	cr.CrunchLog.Print("Goodbye")
-	cr.CrunchLog.Close()
-
-	c.Check(api.Calls, Equals, 1)
-
-	mt, err := cr.LogCollection.MarshalManifest(".")
-	c.Check(err, IsNil)
-	c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
-
-	logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
-		"2015-12-29T15:51:45.000000002Z Goodbye\n"
-
-	c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
-	stderrLog := api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]
-	c.Check(true, Equals, strings.Contains(stderrLog, expected))
-	s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext)
-}
-
-func (s *LoggingTestSuite) checkWroteBlock(c *C, kc *KeepTestClient, locator, expect string) {
-	buf := make([]byte, len([]byte(expect))+1)
-	n, err := kc.ReadAt(locator, buf, 0)
-	c.Check(err, IsNil)
-	c.Check(string(buf[:n]), Equals, expect)
-}
-
 type filterSuite struct{}
 
 var _ = Suite(&filterSuite{})

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list