[ARVADOS] updated: 6bcd3f15fd664e1d3b7200e77c1a09f94c06054f
git at public.curoverse.com
git at public.curoverse.com
Fri Jan 15 15:53:59 EST 2016
Summary of changes:
services/crunch-exec/crunchexec.go | 55 +++++++++---------
services/crunch-exec/crunchexec_test.go | 9 +--
services/crunch-exec/logging.go | 98 ++++++++++++++++++++++-----------
services/crunch-exec/logging_test.go | 8 +--
services/crunch-exec/upload.go | 9 +++
5 files changed, 112 insertions(+), 67 deletions(-)
via 6bcd3f15fd664e1d3b7200e77c1a09f94c06054f (commit)
from f7115688cc3a63b4721ca59259bc8276685bac9a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 6bcd3f15fd664e1d3b7200e77c1a09f94c06054f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 15 15:50:33 2016 -0500
7816: Change ThrottledLogger Stop() to Close(). Choose finalState once so that
log and container record can't go out of sync. Add comments.
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 9e6a12f..abaed82 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -22,6 +22,8 @@ type IArvadosClient interface {
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
+var ErrCancelled = errors.New("Cancelled")
+
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
@@ -34,16 +36,16 @@ type Collection struct {
}
type ContainerRecord struct {
- Uuid string `json:"uuid"`
- Command []string `json:"command"`
- ContainerImage string `json:"container_image"`
- Cwd string `json:"cwd"`
- Environment map[string]string `json:"environment"`
- Mounts map[string]Mount `json:"mounts"`
- OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]string `json:"runtime_constraints"`
- State string `json:"state"`
+ Uuid string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ Mounts map[string]Mount `json:"mounts"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
+ RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
+ State string `json:"state"`
}
type NewLogWriter func(name string) io.WriteCloser
@@ -77,6 +79,7 @@ type ContainerRunner struct {
CancelLock sync.Mutex
Cancelled bool
SigChan chan os.Signal
+ finalState string
}
func (this *ContainerRunner) SetupSignals() error {
@@ -152,7 +155,7 @@ func (this *ContainerRunner) StartContainer() (err error) {
defer this.CancelLock.Unlock()
if this.Cancelled {
- return errors.New("Cancelled")
+ return ErrCancelled
}
this.ContainerConfig.Cmd = this.ContainerRecord.Command
@@ -213,20 +216,15 @@ func (this *ContainerRunner) WaitFinish() error {
<-this.loggingDone
<-this.loggingDone
- this.Stdout.Stop()
- this.Stderr.Stop()
+ this.Stdout.Close()
+ this.Stderr.Close()
return nil
}
func (this *ContainerRunner) CommitLogs() error {
- if this.Cancelled {
- this.CrunchLog.Print("Cancelled")
- } else {
- this.CrunchLog.Print("Complete")
- }
-
- this.CrunchLog.Stop()
+ this.CrunchLog.Print(this.finalState)
+ this.CrunchLog.Close()
this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
"crunchexec", nil})
@@ -264,11 +262,8 @@ func (this *ContainerRunner) UpdateContainerRecordComplete() error {
update["exit_code"] = *this.ExitCode
}
- if this.Cancelled {
- update["state"] = "Cancelled"
- } else {
- update["state"] = "Complete"
- }
+ update["state"] = this.finalState
+
return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
}
@@ -286,6 +281,12 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
this.CrunchLog.Print(err)
}
+ if this.Cancelled {
+ this.finalState = "Cancelled"
+ } else {
+ this.finalState = "Complete"
+ }
+
// (6) write logs
logerr := this.CommitLogs()
if logerr != nil {
@@ -298,7 +299,7 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
this.CrunchLog.Print(updateerr)
}
- this.CrunchLog.Stop()
+ this.CrunchLog.Close()
if err == nil {
if runerr != nil {
@@ -333,7 +334,7 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
// (2) start container
err = this.StartContainer()
if err != nil {
- if err.Error() == "Cancelled" {
+ if err == ErrCancelled {
err = nil
}
return
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index fcd5f80..7efdad8 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -342,10 +342,6 @@ type TestLogs struct {
Stderr ClosableBuffer
}
-func (this *ClosableBuffer) Write(p []byte) (n int, err error) {
- return this.Buffer.Write(p)
-}
-
func (this *ClosableBuffer) Close() error {
return nil
}
@@ -399,6 +395,7 @@ func (s *TestSuite) TestCommitLogs(c *C) {
cr.CrunchLog.Print("Hello world!")
cr.CrunchLog.Print("Goodbye")
+ cr.finalState = "Complete"
err := cr.CommitLogs()
c.Check(err, IsNil)
@@ -431,6 +428,7 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
cr.ExitCode = new(int)
*cr.ExitCode = 42
+ cr.finalState = "Complete"
err := cr.UpdateContainerRecordComplete()
c.Check(err, IsNil)
@@ -446,6 +444,7 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
cr := NewContainerRunner(api, kc, nil)
cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
cr.Cancelled = true
+ cr.finalState = "Cancelled"
err := cr.UpdateContainerRecordComplete()
c.Check(err, IsNil)
@@ -455,6 +454,8 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
c.Check(api.Content["state"], Equals, "Cancelled")
}
+// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
+// dress rehersal of the Run() function, starting from a JSON container record.
func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
rec := ContainerRecord{}
err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index 01b6548..1604445 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -14,6 +14,18 @@ import (
type Timestamper func(t time.Time) string
+// Logging plumbing:
+//
+// ThrottledLogger.Logger -> ThrottledLogger.Write ->
+// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
+// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
+//
+// For stdout/stderr CopyReaderToLog additionally runs as a goroutine to pull
+// data from the stdout/stderr Reader and send to the Logger.
+
+// ThrottledLogger accepts writes, prepends a timestamp to each line of the
+// write, and periodically flushes to a downstream writer. It supports the
+// "Logger" and "WriteCloser" interfaces.
type ThrottledLogger struct {
*log.Logger
buf *bytes.Buffer
@@ -24,18 +36,16 @@ type ThrottledLogger struct {
Timestamper
}
-func RFC3339Timestamp(now time.Time) string {
- // return now.Format(time.RFC3339Nano)
- // Builtin RFC3339Nano format isn't fixed width so
- // provide our own.
-
- return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ",
- now.Year(), now.Month(), now.Day(),
- now.Hour(), now.Minute(), now.Second(),
- now.Nanosecond())
+// Builtin RFC3339Nano format isn't fixed width so
+// provide our own with microsecond precision (same as API server).
+const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
+func RFC3339Timestamp(now time.Time) string {
+ return now.Format(RFC3339Fixed)
}
+// Write to the internal buffer. Prepend a timestamp to each line of the input
+// data.
func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
this.Mutex.Lock()
if this.buf == nil {
@@ -43,27 +53,23 @@ func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
}
defer this.Mutex.Unlock()
- now := time.Now().UTC()
- _, err = fmt.Fprintf(this.buf, "%s %s", this.Timestamper(now), p)
- return len(p), err
-}
-
-func (this *ThrottledLogger) Stop() {
- this.stop = true
- <-this.flusherDone
- this.writer.Close()
-}
-
-func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
- for b := range c {
- writer.Write(b.Bytes())
+ now := this.Timestamper(time.Now().UTC())
+ sc := bufio.NewScanner(bytes.NewBuffer(p))
+ for sc.Scan() {
+ _, err = fmt.Fprintf(this.buf, "%s %s\n", now, sc.Text())
}
- t <- true
+ return len(p), err
}
+// Periodically check the current buffer; if not empty, send it on the
+// channel to the goWriter goroutine.
func (this *ThrottledLogger) flusher() {
bufchan := make(chan *bytes.Buffer)
bufterm := make(chan bool)
+
+ // Use a separate goroutine for the actual write so that the writes are
+ // actually initiated closer every 1s instead of every
+ // 1s + (time to it takes to write).
go goWriter(this.writer, bufchan, bufterm)
for {
if !this.stop {
@@ -71,23 +77,43 @@ func (this *ThrottledLogger) flusher() {
}
this.Mutex.Lock()
if this.buf != nil && this.buf.Len() > 0 {
- bufchan <- this.buf
+ oldbuf := this.buf
this.buf = nil
+ this.Mutex.Unlock()
+ bufchan <- oldbuf
} else if this.stop {
this.Mutex.Unlock()
break
+ } else {
+ this.Mutex.Unlock()
}
- this.Mutex.Unlock()
}
close(bufchan)
<-bufterm
this.flusherDone <- true
}
+// Receive buffers from a channel and send to the underlying Writer
+func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
+ for b := range c {
+ writer.Write(b.Bytes())
+ }
+ t <- true
+}
+
+// Stop the flusher goroutine and wait for it to complete, then close the
+// underlying Writer.
+func (this *ThrottledLogger) Close() error {
+ this.stop = true
+ <-this.flusherDone
+ return this.writer.Close()
+}
+
const (
MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
)
+// Goroutine to copy from a reader to a logger, with long line splitting.
func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
reader := bufio.NewReaderSize(in, MaxLogLine)
var prefix string
@@ -113,6 +139,11 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
done <- true
}
+// Create a new thottled logger that
+// (a) prepends timestamps to each line
+// (b) batches log messages and only calls the underlying Writer at most once
+// per second.
+
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
alw := &ThrottledLogger{}
alw.flusherDone = make(chan bool)
@@ -123,17 +154,20 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
return alw
}
+// Implements a writer that writes to each of a WriteCloser (typically
+// CollectionFileWriter) and creates an API server log entry.
type ArvLogWriter struct {
Api IArvadosClient
Uuid string
loggingStream string
- io.WriteCloser
+ writeCloser io.WriteCloser
}
func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+ // Write to the next writer in the chain (a file in Keep)
var err1 error
- if this.WriteCloser != nil {
- _, err1 = this.WriteCloser.Write(p)
+ if this.writeCloser != nil {
+ _, err1 = this.writeCloser.Write(p)
}
// write to API
@@ -151,9 +185,9 @@ func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
}
func (this *ArvLogWriter) Close() (err error) {
- if this.WriteCloser != nil {
- err = this.WriteCloser.Close()
- this.WriteCloser = nil
+ if this.writeCloser != nil {
+ err = this.writeCloser.Close()
+ this.writeCloser = nil
}
return err
}
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
index c86d918..fd51936 100644
--- a/services/crunch-exec/logging_test.go
+++ b/services/crunch-exec/logging_test.go
@@ -28,7 +28,7 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
cr.CrunchLog.Print("Hello world!")
cr.CrunchLog.Print("Goodbye")
- cr.CrunchLog.Stop()
+ cr.CrunchLog.Close()
c.Check(api.Calls, Equals, 1)
@@ -54,7 +54,7 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
cr.CrunchLog.Printf("Hello %d", i)
}
cr.CrunchLog.Print("Goodbye")
- cr.CrunchLog.Stop()
+ cr.CrunchLog.Close()
c.Check(api.Calls > 1, Equals, true)
c.Check(api.Calls < 2000000, Equals, true)
@@ -78,13 +78,13 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
cr.CrunchLog.Print("Goodbye")
stdout.Print("Blurb")
- cr.CrunchLog.Stop()
+ cr.CrunchLog.Close()
logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
"2015-12-29T15:51:45.000000003Z Goodbye\n"
c.Check(api.Content["event_type"], Equals, "crunchexec")
c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext1)
- stdout.Stop()
+ stdout.Close()
logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
"2015-12-29T15:51:45.000000004Z Blurb\n"
c.Check(api.Content["event_type"], Equals, "stdout")
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
index e85d636..8d2d802 100644
--- a/services/crunch-exec/upload.go
+++ b/services/crunch-exec/upload.go
@@ -1,5 +1,14 @@
package main
+// Originally based on sdk/go/crunchrunner/upload.go
+//
+// Unlike the original, which iterates over a directory tree and uploads each
+// file sequentially, this version supports opening and writing multiple files
+// in a collection simultaneously.
+//
+// Eventually this should move into the Arvados Go SDK for a more comprehensive
+// implementation of Collections.
+
import (
"bytes"
"crypto/md5"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list