[ARVADOS] updated: d77b8bd4cc7e22d1892a270704a0254e93b9e284
git at public.curoverse.com
git at public.curoverse.com
Mon Dec 28 18:47:41 EST 2015
Summary of changes:
services/crunch-exec/crunchexec.go | 105 ++++++++++++++++----------------
services/crunch-exec/crunchexec_test.go | 12 ++--
services/crunch-exec/logging.go | 98 +++++++++++++++++++++++++++++
3 files changed, 158 insertions(+), 57 deletions(-)
create mode 100644 services/crunch-exec/logging.go
via d77b8bd4cc7e22d1892a270704a0254e93b9e284 (commit)
from a89b2b253f5afb3866e7c9b4d60aa6df8a30d10e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d77b8bd4cc7e22d1892a270704a0254e93b9e284
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 28 18:47:37 2015 -0500
7816: Working on logging infrastructure.
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index b3f6051..48558b0 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -12,7 +12,6 @@ import (
"log"
"os"
//"os/exec"
- "bufio"
"os/signal"
"strings"
"syscall"
@@ -53,7 +52,7 @@ func dockerEvent(event *dockerclient.Event, ec chan error, args ...interface{})
log.Printf("Received event: %#v\n", *event)
}
-type NewLogWriter func(logstr string) io.Writer
+type NewLogWriter func(name string) io.Writer
type ContainerRunner struct {
Docker *dockerclient.DockerClient
@@ -64,7 +63,10 @@ type ContainerRunner struct {
ContainerId string
ExitCode int
NewLogWriter
- FinishChan chan bool
+ loggingDone chan bool
+ CrunchLog *ThrottledLogger
+ Stdout *ThrottledLogger
+ Stderr *ThrottledLogger
}
func (this *ContainerRunner) setupMonitoring() error {
@@ -84,6 +86,9 @@ func (this *ContainerRunner) setupMonitoring() error {
}
func (this *ContainerRunner) LoadImage() (err error) {
+
+ this.CrunchLog.Print("Fetching Docker image from collection ", this.ContainerRecord.ContainerImage)
+
var collection Collection
err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
if err != nil {
@@ -99,8 +104,12 @@ func (this *ContainerRunner) LoadImage() (err error) {
imageId = img[:len(img)-4]
}
+ this.CrunchLog.Print("Using Docker image id ", imageId)
+
_, err = this.Docker.InspectImage(imageId)
if err != nil {
+ this.CrunchLog.Print("Loading Docker image from keep")
+
var readCloser io.ReadCloser
readCloser, err = this.Kc.ManifestFileReader(manifest, img)
if err != nil {
@@ -111,6 +120,8 @@ func (this *ContainerRunner) LoadImage() (err error) {
if err != nil {
return err
}
+ } else {
+ this.CrunchLog.Print("Docker image is already available on host")
}
this.ContainerConfig.Image = imageId
@@ -119,12 +130,16 @@ func (this *ContainerRunner) LoadImage() (err error) {
}
func (this *ContainerRunner) StartContainer() (err error) {
+ this.CrunchLog.Print("Creating Docker container")
+
this.ContainerConfig.Cmd = this.ContainerRecord.Command
this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
if err != nil {
return
}
hostConfig := &dockerclient.HostConfig{}
+
+ this.CrunchLog.Print("Starting Docker container id ", this.ContainerId)
err = this.Docker.StartContainer(this.ContainerId, hostConfig)
if err != nil {
return
@@ -133,45 +148,10 @@ func (this *ContainerRunner) StartContainer() (err error) {
return nil
}
-type ArvLoggingWriter struct {
- Api IArvadosClient
- Kc IKeepClient
-}
-
-func (this ArvLoggingWriter) Write(p []byte) (n int, err error) {
- return len(p), nil
-}
-
-const (
- MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-)
+func (this *ContainerRunner) AttachLogs() (err error) {
-func CopyLog(in io.Reader, out *log.Logger, done chan<- bool) {
- reader := bufio.NewReaderSize(in, MaxLogLine)
- var prefix string
- for {
- line, isPrefix, err := reader.ReadLine()
- if err == io.EOF {
- break
- } else if err != nil {
- out.Fatal("error reading container log:", err)
- }
- var suffix string
- if isPrefix {
- suffix = "[...]"
- }
- out.Print(prefix, string(line), suffix)
- // Set up prefix for following line
- if isPrefix {
- prefix = "[...]"
- } else {
- prefix = ""
- }
- }
- done <- true
-}
+ this.CrunchLog.Print("Attaching container logs")
-func (this *ContainerRunner) GetLogs() (err error) {
var stderrReader, stdoutReader io.Reader
stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true, Timestamps: true})
if err != nil {
@@ -182,10 +162,12 @@ func (this *ContainerRunner) GetLogs() (err error) {
return
}
- this.FinishChan = make(chan bool)
+ this.loggingDone = make(chan bool)
- go CopyLog(stderrReader, log.New(this.NewLogWriter("stderr"), "", 0), this.FinishChan)
- go CopyLog(stdoutReader, log.New(this.NewLogWriter("stdout"), "", 0), this.FinishChan)
+ this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
+ this.Stderr = NewThrottledLogger(this.NewLogWriter("stdout"))
+ go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
+ go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
return nil
}
@@ -198,64 +180,85 @@ func (this *ContainerRunner) WaitFinish() error {
}
this.ExitCode = wr.ExitCode
- <-this.FinishChan
- <-this.FinishChan
+ <-this.loggingDone
+ <-this.loggingDone
+
+ this.Stdout.Stop()
+ this.Stderr.Stop()
return nil
}
-func (this *ContainerRunner) writeLogs() error {
+func (this *ContainerRunner) CommitLogs() error {
return nil
}
-func (this *ContainerRunner) updateContainer() error {
+func (this *ContainerRunner) UpdateContainerRecord() error {
return nil
}
+func (this *ContainerRunner) NewArvLogWriter(name string) io.Writer {
+ return &ArvLogWriter{this.Api, this.Kc, name}
+}
+
func (this *ContainerRunner) Run(containerUuid string) (err error) {
+ this.NewLogWriter = this.NewArvLogWriter
+ this.CrunchLog = NewThrottledLogger(this.NewLogWriter("crunchexec"))
+
err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
if err != nil {
+ this.CrunchLog.Print(err)
return
}
// (0) start event monitoring goroutines
err = this.setupMonitoring()
if err != nil {
+ this.CrunchLog.Print(err)
return
}
// (1) check for and/or load image
err = this.LoadImage()
if err != nil {
+ this.CrunchLog.Print(err)
return
}
// (2) start container
err = this.StartContainer()
if err != nil {
+ this.CrunchLog.Print(err)
return
}
// (3) attach container logs
- err = this.GetLogs()
+ err = this.AttachLogs()
// (4) wait for container to finish
err = this.WaitFinish()
// (5) write logs
- err = this.writeLogs()
+ err = this.CommitLogs()
// (6) update container record with results
- this.updateContainer()
+ this.UpdateContainerRecord()
return
}
+func NewContainerRunner() *ContainerRunner {
+ cr := &ContainerRunner{}
+ cr.NewLogWriter = cr.NewArvLogWriter
+ cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
+ return cr
+}
+
func main() {
flag.Parse()
- var cr ContainerRunner
+ cr := NewContainerRunner()
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index c24a74e..01eeea0 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -81,7 +81,7 @@ func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename str
}
func (s *TestSuite) TestLoadImage(c *C) {
- cr := ContainerRunner{}
+ cr := NewContainerRunner()
cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
var err error
@@ -173,7 +173,7 @@ func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, file
func (s *TestSuite) TestLoadImageArvError(c *C) {
// (1) Arvados error
- cr := ContainerRunner{}
+ cr := NewContainerRunner()
cr.Api = ArvErrorTestClient{}
cr.Kc = &KeepTestClient{}
cr.ContainerRecord.ContainerImage = busyboxPDH
@@ -184,7 +184,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- cr := ContainerRunner{}
+ cr := NewContainerRunner()
cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
cr.Api = &ArvTestClient{}
cr.Kc = KeepErrorTestClient{}
@@ -196,7 +196,7 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) {
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
- cr := ContainerRunner{}
+ cr := NewContainerRunner()
cr.Api = &ArvTestClient{}
cr.Kc = KeepErrorTestClient{}
cr.ContainerRecord.ContainerImage = otherPDH
@@ -207,7 +207,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- cr := ContainerRunner{}
+ cr := NewContainerRunner()
cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
cr.Api = &ArvTestClient{}
cr.Kc = KeepReadErrorTestClient{}
@@ -233,7 +233,7 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.Writer {
}
func (s *TestSuite) TestRunContainer(c *C) {
- cr := ContainerRunner{}
+ cr := NewContainerRunner()
cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
cr.Api = ArvTestClient{}
cr.Kc = &KeepTestClient{}
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
new file mode 100644
index 0000000..4f03433
--- /dev/null
+++ b/services/crunch-exec/logging.go
@@ -0,0 +1,98 @@
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "io"
+ "log"
+ "sync"
+ "time"
+)
+
+type ThrottledLogger struct {
+ *log.Logger
+ buf bytes.Buffer
+ sync.Mutex
+ writer io.Writer
+ stop bool
+ flusherDone chan bool
+}
+
+func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
+ this.Mutex.Lock()
+ defer this.Mutex.Unlock()
+ return this.buf.Write(p)
+}
+
+func (this *ThrottledLogger) Stop() {
+ this.stop = true
+ <-this.flusherDone
+}
+
+func (this *ThrottledLogger) Flusher() {
+ for {
+ if !this.stop {
+ time.Sleep(1 * time.Second)
+ }
+ this.Mutex.Lock()
+ if this.buf.Len() > 0 {
+ this.writer.Write(this.buf.Bytes())
+ this.buf.Reset()
+ } else if this.stop {
+ this.Mutex.Unlock()
+ break
+ }
+ this.Mutex.Unlock()
+ }
+ this.flusherDone <- true
+}
+
+const (
+ MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
+)
+
+func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
+ reader := bufio.NewReaderSize(in, MaxLogLine)
+ var prefix string
+ for {
+ line, isPrefix, err := reader.ReadLine()
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ logger.Print("error reading container log:", err)
+ }
+ var suffix string
+ if isPrefix {
+ suffix = "[...]"
+ }
+ logger.Print(prefix, string(line), suffix)
+ // Set up prefix for following line
+ if isPrefix {
+ prefix = "[...]"
+ } else {
+ prefix = ""
+ }
+ }
+ done <- true
+}
+
+func NewThrottledLogger(writer io.Writer) *ThrottledLogger {
+ alw := &ThrottledLogger{}
+ alw.flusherDone = make(chan bool)
+ alw.writer = writer
+ alw.Logger = log.New(alw, "", 0)
+ go alw.Flusher()
+ return alw
+}
+
+type ArvLogWriter struct {
+ Api IArvadosClient
+ Kc IKeepClient
+ loggingStream string
+}
+
+func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+ // write to API
+ // write to Keep
+ return 0, nil
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list