[arvados] created: 2.7.0-6544-g09fdded285
git repository hosting
git at public.arvados.org
Fri May 3 18:04:57 UTC 2024
at 09fdded2857597bd3d3ea5a0029fecbe993130ee (commit)
commit 09fdded2857597bd3d3ea5a0029fecbe993130ee
Author: Tom Clegg <tom at curii.com>
Date: Fri May 3 13:31:59 2024 -0400
21611: Test that log is saved after logging preemption notice.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 58ae1c190c..b7635f0ecd 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -302,7 +302,7 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
} else if resourceType == "collections" && output != nil {
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
output.(*arvados.Collection).UUID = uuid
- output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
+ output.(*arvados.Collection).PortableDataHash = arvados.PortableDataHash(mt)
}
return nil
}
@@ -882,6 +882,48 @@ func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
spotInterruptionCheckInterval = time.Second / 8
ec2MetadataBaseURL = stub.URL
+ checkedLogs := false
+ checkLogs := func() {
+ checkedLogs = true
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+ if failureRate == 1 {
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+ return
+ }
+ text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*`+text+`.*`)
+ c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
+
+ // Check that the log collection was saved, and the
+ // container record updated with the new PDH,
+ // immediately after the preemption notice was
+ // received -- i.e., while the container is still
+ // running.
+ lastpdh := ""
+ saved := make(map[string]string) // pdh => manifest_text
+ for _, call := range s.api.Content {
+ if ctr, ok := call["container"].(arvadosclient.Dict); ok {
+ if pdh, ok := ctr["log"].(string); ok {
+ lastpdh = pdh
+ }
+ }
+ if coll, ok := call["collection"].(arvadosclient.Dict); ok {
+ mt, _ := coll["manifest_text"].(string)
+ if strings.Contains(mt, ":crunch-run.txt") {
+ saved[arvados.PortableDataHash(mt)] = mt
+ }
+ }
+ }
+ logfs, err := (&arvados.Collection{ManifestText: saved[lastpdh]}).FileSystem(s.runner.dispatcherClient, s.runner.DispatcherKeepClient)
+ c.Assert(err, IsNil)
+ log, err := fs.ReadFile(arvados.FS(logfs), "crunch-run.txt")
+ c.Check(err, IsNil)
+ c.Check(string(log), Matches, `(?ms).*\Q`+text+`\E.*`)
+ }
+
go s.runner.checkSpotInterruptionNotices()
s.fullRunHelper(c, `{
"command": ["sleep", "3"],
@@ -898,19 +940,10 @@ func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
stoptime.Store(time.Now().Add(time.Minute).UTC())
token = "different-fake-ec2-metadata-token"
time.Sleep(time.Second)
+ checkLogs()
return 0
})
- c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
- c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
- if failureRate == 1 {
- c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
- } else {
- text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
- c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*`+text+`.*`)
- c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
- c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
- c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
- }
+ c.Check(checkedLogs, Equals, true)
}
func (s *TestSuite) TestRunTimeExceeded(c *C) {
commit c57b6ac55b1eaeae3786d1d83158f22e3d8e253f
Author: Tom Clegg <tom at curii.com>
Date: Thu May 2 10:18:51 2024 -0400
21611: Fix unreliable test.
https://ci.arvados.org/job/developer-run-tests-remainder/4413/consoleFull
FAIL: integration_test.go:1158: IntegrationSuite.TestRunTrivialContainer
integration_test.go:1313:
c.Check(allStatus, check.Matches, `Queued, waiting for dispatch\n`+
`(Queued, waiting.*\n)*`+
`(Locked, waiting for dispatch\n)?`+
`(Locked, waiting for new instance to be ready\n)?`+
`(Locked, preparing runtime environment\n)?`+
`(Running, \n)?`+
`Complete, \n`)
... value string = "" +
... "Queued, waiting for dispatch\n" +
... "Locked, waiting for new instance to be ready\n" +
... "Locked, preparing runtime environment\n" +
... "Queued, preparing runtime environment\n" +
... "Locked, waiting for new instance to be ready\n" +
... "Locked, preparing runtime environment\n" +
... "Running, \n" +
... "Complete, \n"
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/controller/integration_test.go b/lib/controller/integration_test.go
index 6b603f178e..18a0c1992b 100644
--- a/lib/controller/integration_test.go
+++ b/lib/controller/integration_test.go
@@ -1311,10 +1311,12 @@ func (s *IntegrationSuite) runContainer(c *check.C, clusterID string, token stri
err = ac.RequestAndDecode(&outcoll, "GET", "/arvados/v1/collections/"+cr.OutputUUID, nil, nil)
c.Assert(err, check.IsNil)
c.Check(allStatus, check.Matches, `Queued, waiting for dispatch\n`+
- `(Queued, waiting.*\n)*`+
- `(Locked, waiting for dispatch\n)?`+
- `(Locked, waiting for new instance to be ready\n)?`+
- `(Locked, preparing runtime environment\n)?`+
+ // Occasionally the dispatcher will
+ // unlock/retry, and we get state/status from
+ // database/dispatcher via separate API calls,
+ // so we can also see "Queued, preparing
+ // runtime environment".
+ `((Queued|Locked), (waiting .*|preparing runtime environment)\n)*`+
`(Running, \n)?`+
`Complete, \n`)
}
commit 5dfd9a57f9d65ea91295e8502cf7f6e748fc4010
Author: Tom Clegg <tom at curii.com>
Date: Wed May 1 09:39:29 2024 -0400
21611: Remove log-to-logs-table code.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index f8941162ec..8627045411 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -81,9 +81,6 @@ type IKeepClient interface {
SetStorageClasses(sc []string)
}
-// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) (io.WriteCloser, error)
-
type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
type MkTempDir func(string, string) (string, error)
@@ -125,8 +122,7 @@ type ContainerRunner struct {
Container arvados.Container
token string
ExitCode *int
- NewLogWriter NewLogWriter
- CrunchLog *ThrottledLogger
+ CrunchLog *logWriter
logUUID string
logMtx sync.Mutex
LogCollection arvados.CollectionFileSystem
@@ -168,7 +164,7 @@ type ContainerRunner struct {
enableNetwork string // one of "default" or "always"
networkMode string // "none", "host", or "" -- passed through to executor
brokenNodeHook string // script to run if node appears to be broken
- arvMountLog *ThrottledLogger
+ arvMountLog io.WriteCloser
containerWatchdogInterval time.Duration
@@ -303,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
}
c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
- w, err := runner.NewLogWriter("arv-mount")
+ runner.arvMountLog, err = runner.openLogFile("arv-mount")
if err != nil {
return nil, err
}
- runner.arvMountLog = NewThrottledLogger(w)
scanner := logScanner{
Patterns: []string{
"Keep write error",
@@ -735,13 +730,13 @@ func (runner *ContainerRunner) stopHoststat() error {
}
func (runner *ContainerRunner) startHoststat() error {
- w, err := runner.NewLogWriter("hoststat")
+ var err error
+ runner.hoststatLogger, err = runner.openLogFile("hoststat")
if err != nil {
return err
}
- runner.hoststatLogger = NewThrottledLogger(w)
runner.hoststatReporter = &crunchstat.Reporter{
- Logger: log.New(runner.hoststatLogger, "", 0),
+ Logger: newLogWriter(runner.hoststatLogger),
// Our own cgroup is the "host" cgroup, in the sense
// that it accounts for resource usage outside the
// container. It doesn't count _all_ resource usage on
@@ -759,15 +754,15 @@ func (runner *ContainerRunner) startHoststat() error {
}
func (runner *ContainerRunner) startCrunchstat() error {
- w, err := runner.NewLogWriter("crunchstat")
+ var err error
+ runner.statLogger, err = runner.openLogFile("crunchstat")
if err != nil {
return err
}
- runner.statLogger = NewThrottledLogger(w)
runner.statReporter = &crunchstat.Reporter{
Pid: runner.executor.Pid,
FS: runner.crunchstatFakeFS,
- Logger: log.New(runner.statLogger, "", 0),
+ Logger: newLogWriter(runner.statLogger),
MemThresholds: map[string][]crunchstat.Threshold{
"rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
},
@@ -790,7 +785,7 @@ type infoCommand struct {
// might differ from what's described in the node record (see
// LogNodeRecord).
func (runner *ContainerRunner) LogHostInfo() (err error) {
- w, err := runner.NewLogWriter("node-info")
+ w, err := runner.openLogFile("node-info")
if err != nil {
return
}
@@ -875,13 +870,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
if err != nil {
return false, err
}
- w := &ArvLogWriter{
- ArvClient: runner.DispatcherArvClient,
- UUID: runner.Container.UUID,
- loggingStream: label,
- writeCloser: writer,
- }
-
reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
if err != nil {
return false, fmt.Errorf("error getting %s record: %v", label, err)
@@ -901,12 +889,12 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
return false, nil
}
// Re-encode it using indentation to improve readability
- enc := json.NewEncoder(w)
+ enc := json.NewEncoder(writer)
enc.SetIndent("", " ")
if err = enc.Encode(items[0]); err != nil {
return false, fmt.Errorf("error logging %s record: %v", label, err)
}
- err = w.Close()
+ err = writer.Close()
if err != nil {
return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
}
@@ -974,10 +962,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
return err
}
stdout = f
- } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+ } else if w, err := runner.openLogFile("stdout"); err != nil {
return err
} else {
- stdout = NewThrottledLogger(w)
+ stdout = w
}
if mnt, ok := runner.Container.Mounts["stderr"]; ok {
@@ -986,10 +974,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
return err
}
stderr = f
- } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+ } else if w, err := runner.openLogFile("stderr"); err != nil {
return err
} else {
- stderr = NewThrottledLogger(w)
+ stderr = w
}
env := runner.Container.Environment
@@ -1442,19 +1430,11 @@ func (runner *ContainerRunner) CommitLogs() error {
if runner.arvMountLog != nil {
runner.arvMountLog.Close()
}
- runner.CrunchLog.Close()
-
- // Closing CrunchLog above allows them to be committed to Keep at this
- // point, but re-open crunch log with ArvClient in case there are any
- // other further errors (such as failing to write the log to Keep!)
- // while shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
- ArvClient: runner.DispatcherArvClient,
- UUID: runner.Container.UUID,
- loggingStream: "crunch-run",
- writeCloser: nil,
- })
- runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+
+ // From now on just log to stderr, in case there are
+ // any other further errors (such as failing to write
+ // the log to Keep!) while shutting down
+ runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" ")))
}()
if runner.keepstoreLogger != nil {
@@ -1614,18 +1594,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
return runner.cCancelled
}
-// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
- writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
- if err != nil {
- return nil, err
- }
- return &ArvLogWriter{
- ArvClient: runner.DispatcherArvClient,
- UUID: runner.Container.UUID,
- loggingStream: name,
- writeCloser: writer,
- }, nil
+func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) {
+ return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
}
// Run the full container lifecycle.
@@ -1655,9 +1625,7 @@ func (runner *ContainerRunner) Run() (err error) {
defer func() {
runner.CleanupDirs()
-
runner.CrunchLog.Printf("crunch-run finished")
- runner.CrunchLog.Close()
}()
err = runner.fetchContainerRecord()
@@ -1851,7 +1819,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
DispatcherArvClient: dispatcherArvClient,
DispatcherKeepClient: dispatcherKeepClient,
}
- cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1874,14 +1841,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
return nil, err
}
cr.Container.UUID = containerUUID
- w, err := cr.NewLogWriter("crunch-run")
+ f, err := cr.openLogFile("crunch-run")
if err != nil {
return nil, err
}
- cr.CrunchLog = NewThrottledLogger(w)
- cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+ cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" "))))
- loadLogThrottleParams(dispatcherArvClient)
go cr.updateLogs()
return cr, nil
@@ -2026,12 +1991,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
keepstoreLogbuf.SetWriter(io.Discard)
} else {
cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
- logwriter, err := cr.NewLogWriter("keepstore")
+ cr.keepstoreLogger, err = cr.openLogFile("keepstore")
if err != nil {
log.Print(err)
return 1
}
- cr.keepstoreLogger = NewThrottledLogger(logwriter)
var writer io.WriteCloser = cr.keepstoreLogger
if logWhat == "errors" {
@@ -2057,13 +2021,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
default:
cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
- cr.CrunchLog.Close()
return 1
}
if err != nil {
cr.CrunchLog.Printf("%s: %v", containerUUID, err)
cr.checkBrokenNode(err)
- cr.CrunchLog.Close()
return 1
}
defer cr.executor.Close()
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 5cb982e1bb..58ae1c190c 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -14,7 +14,6 @@ import (
"io"
"io/fs"
"io/ioutil"
- "log"
"math/rand"
"net/http"
"net/http/httptest"
@@ -121,7 +120,6 @@ type ArvTestClient struct {
Content []arvadosclient.Dict
arvados.Container
secretMounts []byte
- Logs map[string]*bytes.Buffer
sync.Mutex
WasSetRunning bool
callraw bool
@@ -207,14 +205,7 @@ func (client *ArvTestClient) Create(resourceType string,
client.Content = append(client.Content, parameters)
if resourceType == "logs" {
- et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
- if client.Logs == nil {
- client.Logs = make(map[string]*bytes.Buffer)
- }
- if client.Logs[et] == nil {
- client.Logs[et] = &bytes.Buffer{}
- }
- client.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
+ panic("logs.create called")
}
if resourceType == "collections" && output != nil {
@@ -607,29 +598,6 @@ func (*KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename
return ErrorReader{}, nil
}
-type ClosableBuffer struct {
- bytes.Buffer
-}
-
-func (*ClosableBuffer) Close() error {
- return nil
-}
-
-type TestLogs struct {
- Stdout ClosableBuffer
- Stderr ClosableBuffer
-}
-
-func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) {
- if logstr == "stdout" {
- return &tl.Stdout, nil
- }
- if logstr == "stderr" {
- return &tl.Stderr, nil
- }
- return nil, errors.New("???")
-}
-
func dockerLog(fd byte, msg string) []byte {
by := []byte(msg)
header := make([]byte, 8+len(by))
@@ -645,8 +613,6 @@ func (s *TestSuite) TestRunContainer(c *C) {
return 0
}
- var logs TestLogs
- s.runner.NewLogWriter = logs.NewTestLoggingWriter
s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
s.runner.Container.Command = []string{"./hw"}
s.runner.Container.OutputStorageClasses = []string{"default"}
@@ -663,8 +629,8 @@ func (s *TestSuite) TestRunContainer(c *C) {
err = s.runner.WaitFinish()
c.Assert(err, IsNil)
- c.Check(logs.Stdout.String(), Matches, ".*Hello world\n")
- c.Check(logs.Stderr.String(), Equals, "")
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `Hello world\n`)
+ c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ``)
}
func (s *TestSuite) TestCommitLogs(c *C) {
@@ -673,7 +639,9 @@ func (s *TestSuite) TestCommitLogs(c *C) {
defer kc.Close()
cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
- cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+ f, err := cr.openLogFile("crunch-run")
+ c.Assert(err, IsNil)
+ cr.CrunchLog = newLogWriter(newTestTimestamper(f))
cr.CrunchLog.Print("Hello world!")
cr.CrunchLog.Print("Goodbye")
@@ -682,10 +650,10 @@ func (s *TestSuite) TestCommitLogs(c *C) {
err = cr.CommitLogs()
c.Check(err, IsNil)
- c.Check(api.Calls, Equals, 2)
- c.Check(api.Content[1]["ensure_unique_name"], Equals, true)
- c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
+ c.Check(api.Calls, Equals, 1)
+ c.Check(api.Content[0]["ensure_unique_name"], Equals, true)
+ c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
}
@@ -806,10 +774,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn
}
if err != nil {
- for k, v := range s.api.Logs {
- c.Log(k)
- c.Log(v.String())
- }
+ dumpAllLogFiles(c, s.runner)
}
return s.api, s.runner, realTemp
@@ -844,7 +809,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello world\n")
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "hello world\n")
c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"default"})
c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"default"})
}
@@ -935,13 +900,13 @@ func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
time.Sleep(time.Second)
return 0
})
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
if failureRate == 1 {
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
} else {
text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*`+text+`.*`)
c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
@@ -966,7 +931,7 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) {
})
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*maximum run time exceeded.*")
}
func (s *TestSuite) TestContainerWaitFails(c *C) {
@@ -984,7 +949,7 @@ func (s *TestSuite) TestContainerWaitFails(c *C) {
})
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Container is not running.*")
}
func (s *TestSuite) TestCrunchstat(c *C) {
@@ -1007,11 +972,10 @@ func (s *TestSuite) TestCrunchstat(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Assert(s.api.Logs["crunchstat"], NotNil)
- c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
+ c.Check(logFileContent(c, s.runner, "crunchstat.txt"), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
// Check that we called (*crunchstat.Reporter)Stop().
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
}
func (s *TestSuite) TestNodeInfoLog(c *C) {
@@ -1032,19 +996,16 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- buf, err := fs.ReadFile(arvados.FS(s.runner.LogCollection), "/node.json")
- c.Assert(err, IsNil)
- json := string(buf)
+ json := logFileContent(c, s.runner, "node.json")
c.Check(json, Matches, `(?ms).*"ProviderType": *"a1\.2xlarge".*`)
c.Check(json, Matches, `(?ms).*"Price": *1\.2.*`)
- c.Assert(s.api.Logs["node-info"], NotNil)
- json = s.api.Logs["node-info"].String()
- c.Check(json, Matches, `(?ms).*Host Information.*`)
- c.Check(json, Matches, `(?ms).*CPU Information.*`)
- c.Check(json, Matches, `(?ms).*Memory Information.*`)
- c.Check(json, Matches, `(?ms).*Disk Space.*`)
- c.Check(json, Matches, `(?ms).*Disk INodes.*`)
+ nodeinfo := logFileContent(c, s.runner, "node-info.txt")
+ c.Check(nodeinfo, Matches, `(?ms).*Host Information.*`)
+ c.Check(nodeinfo, Matches, `(?ms).*CPU Information.*`)
+ c.Check(nodeinfo, Matches, `(?ms).*Memory Information.*`)
+ c.Check(nodeinfo, Matches, `(?ms).*Disk Space.*`)
+ c.Check(nodeinfo, Matches, `(?ms).*Disk INodes.*`)
}
func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
@@ -1062,11 +1023,10 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
return 0
})
- c.Assert(s.api.Logs["crunch-run"], NotNil)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Using container runtime: stub.*`)
}
func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExpected int) {
@@ -1082,8 +1042,9 @@ func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExp
"runtime_constraints": {"ram": `+fmt.Sprintf("%d", ram)+`},
"state": "Locked"
}`, nil, func() int { return 0 })
- c.Logf("=== crunchstat logs\n%s\n", s.api.Logs["crunchstat"].String())
- logs := s.api.Logs["crunch-run"].String()
+ logs := logFileContent(c, s.runner, "crunch-run.txt")
+ c.Log("=== crunchstat logs")
+ c.Log(logs)
pattern := logLineStart + `Container using over %d%% of memory \(rss %d/%d bytes\)`
var threshold int
for _, threshold = range expected {
@@ -1121,7 +1082,7 @@ func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
"runtime_constraints": {"ram": `+fmt.Sprintf("%d", s.debian12MemoryCurrent*10)+`},
"state": "Locked"
}`, nil, func() int { return 0 })
- logs := s.api.Logs["crunch-run"].String()
+ logs := logFileContent(c, s.runner, "crunch-run.txt")
for _, expected := range []string{
`Maximum disk usage was \d+%, \d+/\d+ bytes`,
fmt.Sprintf(`Maximum container memory swap usage was %d bytes`, s.debian12SwapCurrent),
@@ -1189,8 +1150,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Assert(s.api.Logs["container"], NotNil)
- c.Check(s.api.Logs["container"].String(), Matches, `(?ms).*container_image.*`)
+ c.Check(logFileContent(c, s.runner, "container.json"), Matches, `(?ms).*container_image.*`)
}
func (s *TestSuite) TestFullRunStderr(c *C) {
@@ -1215,8 +1175,8 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
- c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello\n")
- c.Check(s.api.Logs["stderr"].String(), Matches, ".*world\n")
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*hello\n")
+ c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ".*world\n")
}
func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
@@ -1237,8 +1197,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Log(s.api.Logs["stdout"])
- c.Check(s.api.Logs["stdout"].String(), Matches, `.*workdir=""\n`)
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*workdir=""`)
}
func (s *TestSuite) TestFullRunSetCwd(c *C) {
@@ -1259,7 +1218,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n")
}
func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
@@ -1281,7 +1240,7 @@ func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n")
c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
}
@@ -1378,14 +1337,11 @@ func (s *TestSuite) testStopContainer(c *C) {
case err = <-done:
c.Check(err, IsNil)
}
- for k, v := range s.api.Logs {
- c.Log(k)
- c.Log(v.String(), "\n")
- }
+ dumpAllLogFiles(c, s.runner)
c.Check(s.api.CalledWith("container.log", nil), NotNil)
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
- c.Check(s.api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$")
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "(?ms).*foo\n$")
}
func (s *TestSuite) TestFullRunSetEnv(c *C) {
@@ -1406,7 +1362,7 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(s.api.Logs["stdout"].String(), Matches, `.*map\[FROBIZ:bilbo\]\n`)
+ c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*map\[FROBIZ:bilbo\]`)
}
type ArvMountCmdLine struct {
@@ -1831,7 +1787,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
})
c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*status code 3\n.*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*status code 3\n.*`)
}
func (s *TestSuite) TestFullRunSetOutput(c *C) {
@@ -1886,7 +1842,7 @@ func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) {
c.Check(s.api.CalledWith("container.runtime_status.warning", "arv-mount: Keep write error"), NotNil)
c.Check(s.api.CalledWith("container.runtime_status.warningDetail", "Test: Keep write error: I am a teapot"), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
}
func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
@@ -2196,13 +2152,14 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
"state": "Locked"
}`, nil, func() int { return 0 })
c.Check(s.api.CalledWith("container.state", nextState), NotNil)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ logs := logFileContent(c, s.runner, "crunch-run.txt")
+ c.Check(logs, Matches, "(?ms).*unable to run containers.*")
if s.runner.brokenNodeHook != "" {
- c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
- c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
- c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+ c.Check(logs, Matches, "(?ms).*Running broken node hook.*")
+ c.Check(logs, Matches, "(?ms).*killme.*")
+ c.Check(logs, Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
} else {
- c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+ c.Check(logs, Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
}
}
}
@@ -2227,7 +2184,7 @@ func (s *TestSuite) TestBadCommand(c *C) {
"state": "Locked"
}`, nil, func() int { return 0 })
c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+ c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Possible causes:.*is missing.*")
}
}
@@ -2337,7 +2294,7 @@ func (s *TestSuite) TestCalculateCost(c *C) {
cr := s.runner
cr.costStartTime = now.Add(-time.Hour)
var logbuf bytes.Buffer
- cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
+ cr.CrunchLog = newLogWriter(&logbuf)
// if there's no InstanceType env var, cost is calculated as 0
os.Unsetenv("InstanceType")
@@ -2463,3 +2420,20 @@ type FakeProcess struct {
func (fp FakeProcess) CmdlineSlice() ([]string, error) {
return fp.cmdLine, nil
}
+
+func logFileContent(c *C, cr *ContainerRunner, fnm string) string {
+ buf, err := fs.ReadFile(arvados.FS(cr.LogCollection), fnm)
+ c.Assert(err, IsNil)
+ return string(buf)
+}
+
+func dumpAllLogFiles(c *C, cr *ContainerRunner) {
+ d, err := cr.LogCollection.OpenFile("/", os.O_RDONLY, 0)
+ c.Assert(err, IsNil)
+ fis, err := d.Readdir(-1)
+ c.Assert(err, IsNil)
+ for _, fi := range fis {
+ c.Logf("=== %s", fi.Name())
+ c.Log(logFileContent(c, cr, fi.Name()))
+ }
+}
diff --git a/lib/crunchrun/cuda.go b/lib/crunchrun/cuda.go
index c693dbcb96..f91a5c62cd 100644
--- a/lib/crunchrun/cuda.go
+++ b/lib/crunchrun/cuda.go
@@ -5,13 +5,15 @@
package crunchrun
import (
+ "fmt"
+ "io"
"os/exec"
)
// nvidiaModprobe makes sure all the nvidia kernel modules and devices
// are set up. If we don't have all the modules/devices set up we get
// "CUDA_ERROR_UNKNOWN".
-func nvidiaModprobe(writer *ThrottledLogger) {
+func nvidiaModprobe(writer io.Writer) {
// The underlying problem is that when normally running
// directly on the host, the CUDA SDK will automatically
// detect and set up the devices on demand. However, when
@@ -42,7 +44,7 @@ func nvidiaModprobe(writer *ThrottledLogger) {
nvidiaSmi.Stderr = writer
err := nvidiaSmi.Run()
if err != nil {
- writer.Printf("Warning %v: %v", nvidiaSmi.Args, err)
+ fmt.Fprintf(writer, "Warning %v: %v\n", nvidiaSmi.Args, err)
}
// Load the kernel modules & devices associated with
@@ -63,7 +65,7 @@ func nvidiaModprobe(writer *ThrottledLogger) {
nvmodprobe.Stderr = writer
err = nvmodprobe.Run()
if err != nil {
- writer.Printf("Warning %v: %v", nvmodprobe.Args, err)
+ fmt.Fprintf(writer, "Warning %v: %v\n", nvmodprobe.Args, err)
}
}
}
diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go
index 91a1b77cf4..04c3724969 100644
--- a/lib/crunchrun/logging.go
+++ b/lib/crunchrun/logging.go
@@ -5,373 +5,82 @@
package crunchrun
import (
- "bufio"
"bytes"
"encoding/json"
- "fmt"
"io"
"log"
- "regexp"
- "strings"
- "sync"
"time"
-
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
)
-// Timestamper is the signature for a function that takes a timestamp and
-// return a formated string value.
-type Timestamper func(t time.Time) string
-
-// Logging plumbing:
-//
-// ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher ->
-// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
-//
-// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
-// data from the stdout/stderr Reader and send to the Logger.
+// rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-// ThrottledLogger accepts writes, prepends a timestamp to each line of the
-// write, and periodically flushes to a downstream writer. It supports the
-// "Logger" and "WriteCloser" interfaces.
-type ThrottledLogger struct {
- *log.Logger
- buf *bytes.Buffer
- sync.Mutex
- writer io.WriteCloser
- flush chan struct{}
- stopped chan struct{}
- stopping chan struct{}
- Timestamper
- Immediate *log.Logger
- pendingFlush bool
+// prefixer wraps an io.Writer, inserting a string returned by
+// prefixFunc at the beginning of each line.
+type prefixer struct {
+ writer io.Writer
+ prefixFunc func() string
+ unfinished bool // true if the most recent write ended with a non-newline char
}
-// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
-const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-
-// RFC3339Timestamp formats t as RFC3339NanoFixed.
-func RFC3339Timestamp(t time.Time) string {
- return t.Format(RFC3339NanoFixed)
-}
-
-// Write prepends a timestamp to each line of the input data and
-// appends to the internal buffer. Each line is also logged to
-// tl.Immediate, if tl.Immediate is not nil.
-func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
- tl.Mutex.Lock()
- defer tl.Mutex.Unlock()
-
- if tl.buf == nil {
- tl.buf = &bytes.Buffer{}
- }
-
- now := tl.Timestamper(time.Now().UTC())
- sc := bufio.NewScanner(bytes.NewBuffer(p))
- for err == nil && sc.Scan() {
- out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
- if tl.Immediate != nil {
- tl.Immediate.Print(out[:len(out)-1])
- }
- _, err = io.WriteString(tl.buf, out)
- }
- if err == nil {
- err = sc.Err()
- if err == nil {
- n = len(p)
- }
- }
-
- if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
- // Non-blocking send. Try send a flush if it is ready to
- // accept it. Otherwise do nothing because a flush is already
- // pending.
- select {
- case tl.flush <- struct{}{}:
- default:
- }
+// newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed
+// timestamp at the beginning of each line.
+func newTimestamper(w io.Writer) *prefixer {
+ return &prefixer{
+ writer: w,
+ prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") },
}
-
- return
}
-// Periodically check the current buffer; if not empty, send it on the
-// channel to the goWriter goroutine.
-func (tl *ThrottledLogger) flusher() {
- ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
- defer ticker.Stop()
- for stopping := false; !stopping; {
- select {
- case <-tl.stopping:
- // flush tl.buf and exit the loop
- stopping = true
- case <-tl.flush:
- case <-ticker.C:
- }
-
- var ready *bytes.Buffer
-
- tl.Mutex.Lock()
- ready, tl.buf = tl.buf, &bytes.Buffer{}
- tl.Mutex.Unlock()
-
- if ready != nil && ready.Len() > 0 {
- tl.writer.Write(ready.Bytes())
- }
- }
- close(tl.stopped)
-}
-
-// Close the flusher goroutine and wait for it to complete, then close the
-// underlying Writer.
-func (tl *ThrottledLogger) Close() error {
- select {
- case <-tl.stopping:
- // already stopped
- default:
- close(tl.stopping)
+// newStringPrefixer wraps an io.Writer, inserting the given string at
+// the beginning of each line. The given string should include a
+// trailing space for readability.
+func newStringPrefixer(w io.Writer, s string) *prefixer {
+ return &prefixer{
+ writer: w,
+ prefixFunc: func() string { return s },
}
- <-tl.stopped
- return tl.writer.Close()
}
-const (
- // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
- MaxLogLine = 1 << 12
-)
-
-// ReadWriteLines reads lines from a reader and writes to a Writer, with long
-// line splitting.
-func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
- reader := bufio.NewReaderSize(in, MaxLogLine)
- var prefix string
- for {
- line, isPrefix, err := reader.ReadLine()
- if err == io.EOF {
- break
- } else if err != nil {
- writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
- }
- var suffix string
- if isPrefix {
- suffix = "[...]\n"
- }
-
- if prefix == "" && suffix == "" {
- writer.Write(line)
- } else {
- writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
+func (tp *prefixer) Write(p []byte) (n int, err error) {
+ for len(p) > 0 && err == nil {
+ if !tp.unfinished {
+ _, err = io.WriteString(tp.writer, tp.prefixFunc())
+ if err != nil {
+ return
+ }
}
-
- // Set up prefix for following line
- if isPrefix {
- prefix = "[...]"
+ newline := bytes.IndexRune(p, '\n')
+ var nn int
+ if newline < 0 {
+ tp.unfinished = true
+ nn, err = tp.writer.Write(p)
+ p = nil
} else {
- prefix = ""
+ tp.unfinished = false
+ nn, err = tp.writer.Write(p[:newline+1])
+ p = p[nn:]
}
+ n += nn
}
- done <- true
-}
-
-// NewThrottledLogger creates a new thottled logger that
-// - prepends timestamps to each line, and
-// - batches log messages and only calls the underlying Writer
-// at most once per "crunchLogSecondsBetweenEvents" seconds.
-func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
- tl := &ThrottledLogger{}
- tl.flush = make(chan struct{}, 1)
- tl.stopped = make(chan struct{})
- tl.stopping = make(chan struct{})
- tl.writer = writer
- tl.Logger = log.New(tl, "", 0)
- tl.Timestamper = RFC3339Timestamp
- go tl.flusher()
- return tl
-}
-
-// Log throttling rate limiting config parameters
-var crunchLimitLogBytesPerJob int64 = 67108864
-var crunchLogThrottleBytes int64 = 65536
-var crunchLogThrottlePeriod time.Duration = time.Second * 60
-var crunchLogThrottleLines int64 = 1024
-var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
-var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents = time.Second
-var crunchLogUpdatePeriod = time.Hour / 2
-var crunchLogUpdateSize = int64(1 << 25)
-
-// ArvLogWriter is an io.WriteCloser that processes each write by
-// writing it through to another io.WriteCloser (typically a
-// CollectionFileWriter) and creating an Arvados log entry.
-type ArvLogWriter struct {
- ArvClient IArvadosClient
- UUID string
- loggingStream string
- writeCloser io.WriteCloser
-
- // for rate limiting
- bytesLogged int64
- logThrottleResetTime time.Time
- logThrottleLinesSoFar int64
- logThrottleBytesSoFar int64
- logThrottleBytesSkipped int64
- logThrottleIsOpen bool
- logThrottlePartialLineNextAt time.Time
- logThrottleFirstPartialLine bool
- bufToFlush bytes.Buffer
- bufFlushedAt time.Time
- closing bool
+ return
}
-func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
- // Write to the next writer in the chain (a file in Keep)
- var err1 error
- if arvlog.writeCloser != nil {
- _, err1 = arvlog.writeCloser.Write(p)
- }
-
- // write to API after checking rate limit
- now := time.Now()
-
- if now.After(arvlog.logThrottleResetTime) {
- // It has been more than throttle_period seconds since the last
- // checkpoint; so reset the throttle
- if arvlog.logThrottleBytesSkipped > 0 {
- arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
- }
-
- arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
- arvlog.logThrottleBytesSoFar = 0
- arvlog.logThrottleLinesSoFar = 0
- arvlog.logThrottleBytesSkipped = 0
- arvlog.logThrottleIsOpen = true
- }
-
- lines := bytes.Split(p, []byte("\n"))
-
- for _, line := range lines {
- // Short circuit the counting code if we're just going to throw
- // away the data anyway.
- if !arvlog.logThrottleIsOpen {
- arvlog.logThrottleBytesSkipped += int64(len(line))
- continue
- } else if len(line) == 0 {
- continue
- }
-
- // check rateLimit
- logOpen, msg := arvlog.rateLimit(line, now)
- if logOpen {
- arvlog.bufToFlush.WriteString(string(msg) + "\n")
- }
- }
-
- if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
- (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
- arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
- // write to API
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": arvlog.UUID,
- "event_type": arvlog.loggingStream,
- "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
- err2 := arvlog.ArvClient.Create("logs", lr, nil)
-
- arvlog.bufToFlush = bytes.Buffer{}
- arvlog.bufFlushedAt = now
-
- if err1 != nil || err2 != nil {
- return 0, fmt.Errorf("%s ; %s", err1, err2)
- }
- }
-
- return len(p), nil
+// logWriter adds log.Logger methods to an io.Writer.
+type logWriter struct {
+ io.Writer
+ *log.Logger
}
-// Close the underlying writer
-func (arvlog *ArvLogWriter) Close() (err error) {
- arvlog.closing = true
- arvlog.Write([]byte{})
- if arvlog.writeCloser != nil {
- err = arvlog.writeCloser.Close()
- arvlog.writeCloser = nil
+func newLogWriter(w io.Writer) *logWriter {
+ return &logWriter{
+ Writer: w,
+ Logger: log.New(w, "", 0),
}
- return err
}
-var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
-
-// Test for hard cap on total output and for log throttling. Returns whether
-// the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
- message := ""
- lineSize := int64(len(line))
-
- if arvlog.logThrottleIsOpen {
- matches := lineRegexp.FindStringSubmatch(string(line))
-
- if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
- // This is a partial line.
-
- if arvlog.logThrottleFirstPartialLine {
- // Partial should be suppressed. First time this is happening for this line so provide a message instead.
- arvlog.logThrottleFirstPartialLine = false
- arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
- arvlog.logThrottleBytesSkipped += lineSize
- return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
- RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
- } else if now.After(arvlog.logThrottlePartialLineNextAt) {
- // The throttle period has passed. Update timestamp and let it through.
- arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
- } else {
- // Suppress line.
- arvlog.logThrottleBytesSkipped += lineSize
- return false, line
- }
- } else {
- // Not a partial line so reset.
- arvlog.logThrottlePartialLineNextAt = time.Time{}
- arvlog.logThrottleFirstPartialLine = true
- }
-
- arvlog.bytesLogged += lineSize
- arvlog.logThrottleBytesSoFar += lineSize
- arvlog.logThrottleLinesSoFar++
-
- if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
- message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
- RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
- arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
- arvlog.logThrottleIsOpen = false
-
- } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
- remainingTime := arvlog.logThrottleResetTime.Sub(now)
- message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
- RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
- arvlog.logThrottleIsOpen = false
-
- } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
- remainingTime := arvlog.logThrottleResetTime.Sub(now)
- message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
- RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
- arvlog.logThrottleIsOpen = false
-
- }
- }
-
- if !arvlog.logThrottleIsOpen {
- // Don't log anything if any limit has been exceeded. Just count lossage.
- arvlog.logThrottleBytesSkipped += lineSize
- }
-
- if message != "" {
- // Yes, write to logs, but use our "rate exceeded" message
- // instead of the log message that exceeded the limit.
- message += " A complete log is still being written to Keep, and will be available when the job finishes."
- return true, []byte(message)
- }
- return arvlog.logThrottleIsOpen, line
-}
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
@@ -394,13 +103,6 @@ func loadLogThrottleParams(clnt IArvadosClient) {
}
}
- loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
- loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
- loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
- loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
- loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
- loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
- loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
diff --git a/lib/crunchrun/logging_test.go b/lib/crunchrun/logging_test.go
index ee3320c7c3..e747a2069e 100644
--- a/lib/crunchrun/logging_test.go
+++ b/lib/crunchrun/logging_test.go
@@ -13,26 +13,25 @@ import (
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
check "gopkg.in/check.v1"
)
-type LoggingTestSuite struct {
- client *arvados.Client
-}
-
-type TestTimestamper struct {
- count int
+// newTestTimestamper wraps an io.Writer, inserting a predictable
+// RFC3339NanoFixed timestamp at the beginning of each line.
+func newTestTimestamper(w io.Writer) *prefixer {
+ count := 0
+ return &prefixer{
+ writer: w,
+ prefixFunc: func() string {
+ count++
+ return fmt.Sprintf("2015-12-29T15:51:45.%09dZ ", count)
+ },
+ }
}
-func (stamper *TestTimestamper) Timestamp(t time.Time) string {
- stamper.count++
- t, err := time.ParseInLocation(time.RFC3339Nano, fmt.Sprintf("2015-12-29T15:51:45.%09dZ", stamper.count), t.Location())
- if err != nil {
- panic(err)
- }
- return RFC3339Timestamp(t)
+type LoggingTestSuite struct {
+ client *arvados.Client
}
// Gocheck boilerplate
@@ -48,26 +47,20 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
- cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+ f, err := cr.openLogFile("crunch-run")
+ c.Assert(err, IsNil)
+ cr.CrunchLog = newLogWriter(newTestTimestamper(f))
cr.CrunchLog.Print("Hello world!")
cr.CrunchLog.Print("Goodbye")
- cr.CrunchLog.Close()
-
- c.Check(api.Calls, Equals, 1)
-
- mt, err := cr.LogCollection.MarshalManifest(".")
- c.Check(err, IsNil)
- c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
- logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
- "2015-12-29T15:51:45.000000002Z Goodbye\n"
+ c.Check(api.Calls, Equals, 0)
- c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
- c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
- s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext)
+ logs := logFileContent(c, cr, "crunch-run.txt")
+ c.Check(logs, Matches, `....-..-..T..:..:..\..........Z Hello world!\n`+
+ `....-..-..T..:..:..\..........Z Goodbye\n`)
}
func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
@@ -79,59 +72,34 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
defer kc.Close()
cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
- cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
- cr.CrunchLog.Immediate = nil
-
+ f, err := cr.openLogFile("crunch-run")
+ c.Assert(err, IsNil)
+ cr.CrunchLog = newLogWriter(newTestTimestamper(f))
for i := 0; i < 2000000; i++ {
cr.CrunchLog.Printf("Hello %d", i)
}
cr.CrunchLog.Print("Goodbye")
- cr.CrunchLog.Close()
-
- c.Check(api.Calls > 0, Equals, true)
- c.Check(api.Calls < 2000000, Equals, true)
-
- mt, err := cr.LogCollection.MarshalManifest(".")
- c.Check(err, IsNil)
- c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
-}
-
-func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
- api := &ArvTestClient{}
- kc := &KeepTestClient{}
- defer kc.Close()
- cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
- c.Assert(err, IsNil)
- ts := &TestTimestamper{}
- cr.CrunchLog.Timestamper = ts.Timestamp
- w, err := cr.NewLogWriter("stdout")
- c.Assert(err, IsNil)
- stdout := NewThrottledLogger(w)
- stdout.Timestamper = ts.Timestamp
-
- cr.CrunchLog.Print("Hello world!")
- stdout.Print("Doing stuff")
- cr.CrunchLog.Print("Goodbye")
- stdout.Print("Blurb")
- cr.CrunchLog.Close()
- stdout.Close()
-
- logText := make(map[string]string)
- for _, content := range api.Content {
- log := content["log"].(arvadosclient.Dict)
- logText[log["event_type"].(string)] += log["properties"].(map[string]string)["text"]
- }
- c.Check(logText["crunch-run"], Equals, `2015-12-29T15:51:45.000000001Z Hello world!
-2015-12-29T15:51:45.000000003Z Goodbye
-`)
- c.Check(logText["stdout"], Equals, `2015-12-29T15:51:45.000000002Z Doing stuff
-2015-12-29T15:51:45.000000004Z Blurb
+ logs := logFileContent(c, cr, "crunch-run.txt")
+ c.Check(strings.Count(logs, "\n"), Equals, 2000001)
+ // Redact most of the logs except the start/end for the regexp
+ // match -- otherwise, when the regexp fails, gocheck spams
+ // the test logs with tens of megabytes of quoted strings.
+ c.Assert(len(logs) > 10000, Equals, true)
+ c.Check(logs[:500]+"\n...\n"+logs[len(logs)-500:], Matches, `(?ms)2015-12-29T15:51:45.000000001Z Hello 0
+2015-12-29T15:51:45.000000002Z Hello 1
+2015-12-29T15:51:45.000000003Z Hello 2
+2015-12-29T15:51:45.000000004Z Hello 3
+.*
+2015-12-29T15:51:45.001999998Z Hello 1999997
+2015-12-29T15:51:45.001999999Z Hello 1999998
+2015-12-29T15:51:45.002000000Z Hello 1999999
+2015-12-29T15:51:45.002000001Z Goodbye
`)
mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
- c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
+ c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
}
func (s *LoggingTestSuite) TestLogUpdate(c *C) {
@@ -149,14 +117,13 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
- ts := &TestTimestamper{}
- cr.CrunchLog.Timestamper = ts.Timestamp
- w, err := cr.NewLogWriter("stdout")
+ f, err := cr.openLogFile("crunch-run")
+ c.Assert(err, IsNil)
+ cr.CrunchLog = newLogWriter(newTestTimestamper(f))
+ stdout, err := cr.openLogFile("stdout")
c.Assert(err, IsNil)
- stdout := NewThrottledLogger(w)
- stdout.Timestamper = ts.Timestamp
c.Check(cr.logUUID, Equals, "")
cr.CrunchLog.Printf("Hello %1000s", "space")
@@ -165,75 +132,18 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) {
}
c.Check(cr.logUUID, Not(Equals), "")
cr.CrunchLog.Print("Goodbye")
- fmt.Fprint(stdout, "Goodbye\n")
- cr.CrunchLog.Close()
- stdout.Close()
- w.Close()
+ fmt.Fprintln(stdout, "Goodbye")
+
+ c.Check(logFileContent(c, cr, "crunch-run.txt"), Matches, `....-..-..T..:..:............Z Hello {995}space\n`+
+ `....-..-..T..:..:............Z Goodbye\n`)
+ c.Check(logFileContent(c, cr, "stdout.txt"), Matches, `Goodbye\n`)
mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
- // Block packing depends on whether there's an update
- // between the two Goodbyes -- either way the first
- // block will be 4dc76.
- c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 (afc503bc1b9a828b4bb543cb629e936c\+78|90699dc22545cd74a0664303f70bc05a\+39 276b49339fd5203d15a93ff3de11bfb9\+39) 0:1077:crunch-run.txt 1077:39:stdout.txt\n`)
+ c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 5be52044a8c51e7b62dd62be07872968\+47 0:1077:crunch-run.txt 1077:8:stdout.txt\n`)
}
}
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
- s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) {
- s.testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) {
- s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithZeroBytesPerJob(c *C) {
- s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 0, 67108864, "Exceeded log limit 0 bytes (crunch_limit_log_bytes_per_job)")
-}
-
-func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
- discoveryMap[throttleParam] = float64(throttleValue)
- defer func() {
- discoveryMap[throttleParam] = float64(throttleDefault)
- }()
-
- api := &ArvTestClient{}
- kc := &KeepTestClient{}
- defer kc.Close()
- cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
- c.Assert(err, IsNil)
- cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
-
- cr.CrunchLog.Print("Hello world!")
- cr.CrunchLog.Print("Goodbye")
- cr.CrunchLog.Close()
-
- c.Check(api.Calls, Equals, 1)
-
- mt, err := cr.LogCollection.MarshalManifest(".")
- c.Check(err, IsNil)
- c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
-
- logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
- "2015-12-29T15:51:45.000000002Z Goodbye\n"
-
- c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
- stderrLog := api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]
- c.Check(true, Equals, strings.Contains(stderrLog, expected))
- s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext)
-}
-
-func (s *LoggingTestSuite) checkWroteBlock(c *C, kc *KeepTestClient, locator, expect string) {
- buf := make([]byte, len([]byte(expect))+1)
- n, err := kc.ReadAt(locator, buf, 0)
- c.Check(err, IsNil)
- c.Check(string(buf[:n]), Equals, expect)
-}
-
type filterSuite struct{}
var _ = Suite(&filterSuite{})
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list