[ARVADOS] updated: d77b8bd4cc7e22d1892a270704a0254e93b9e284

git at public.curoverse.com git at public.curoverse.com
Mon Dec 28 18:47:41 EST 2015


Summary of changes:
 services/crunch-exec/crunchexec.go      | 105 ++++++++++++++++----------------
 services/crunch-exec/crunchexec_test.go |  12 ++--
 services/crunch-exec/logging.go         |  98 +++++++++++++++++++++++++++++
 3 files changed, 158 insertions(+), 57 deletions(-)
 create mode 100644 services/crunch-exec/logging.go

       via  d77b8bd4cc7e22d1892a270704a0254e93b9e284 (commit)
      from  a89b2b253f5afb3866e7c9b4d60aa6df8a30d10e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d77b8bd4cc7e22d1892a270704a0254e93b9e284
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 28 18:47:37 2015 -0500

    7816: Working on logging infrastructure.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index b3f6051..48558b0 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -12,7 +12,6 @@ import (
 	"log"
 	"os"
 	//"os/exec"
-	"bufio"
 	"os/signal"
 	"strings"
 	"syscall"
@@ -53,7 +52,7 @@ func dockerEvent(event *dockerclient.Event, ec chan error, args ...interface{})
 	log.Printf("Received event: %#v\n", *event)
 }
 
-type NewLogWriter func(logstr string) io.Writer
+type NewLogWriter func(name string) io.Writer
 
 type ContainerRunner struct {
 	Docker *dockerclient.DockerClient
@@ -64,7 +63,10 @@ type ContainerRunner struct {
 	ContainerId string
 	ExitCode    int
 	NewLogWriter
-	FinishChan chan bool
+	loggingDone chan bool
+	CrunchLog   *ThrottledLogger
+	Stdout      *ThrottledLogger
+	Stderr      *ThrottledLogger
 }
 
 func (this *ContainerRunner) setupMonitoring() error {
@@ -84,6 +86,9 @@ func (this *ContainerRunner) setupMonitoring() error {
 }
 
 func (this *ContainerRunner) LoadImage() (err error) {
+
+	this.CrunchLog.Print("Fetching Docker image from collection ", this.ContainerRecord.ContainerImage)
+
 	var collection Collection
 	err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
 	if err != nil {
@@ -99,8 +104,12 @@ func (this *ContainerRunner) LoadImage() (err error) {
 		imageId = img[:len(img)-4]
 	}
 
+	this.CrunchLog.Print("Using Docker image id ", imageId)
+
 	_, err = this.Docker.InspectImage(imageId)
 	if err != nil {
+		this.CrunchLog.Print("Loading Docker image from keep")
+
 		var readCloser io.ReadCloser
 		readCloser, err = this.Kc.ManifestFileReader(manifest, img)
 		if err != nil {
@@ -111,6 +120,8 @@ func (this *ContainerRunner) LoadImage() (err error) {
 		if err != nil {
 			return err
 		}
+	} else {
+		this.CrunchLog.Print("Docker image is already available on host")
 	}
 
 	this.ContainerConfig.Image = imageId
@@ -119,12 +130,16 @@ func (this *ContainerRunner) LoadImage() (err error) {
 }
 
 func (this *ContainerRunner) StartContainer() (err error) {
+	this.CrunchLog.Print("Creating Docker container")
+
 	this.ContainerConfig.Cmd = this.ContainerRecord.Command
 	this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
 	if err != nil {
 		return
 	}
 	hostConfig := &dockerclient.HostConfig{}
+
+	this.CrunchLog.Print("Starting Docker container id ", this.ContainerId)
 	err = this.Docker.StartContainer(this.ContainerId, hostConfig)
 	if err != nil {
 		return
@@ -133,45 +148,10 @@ func (this *ContainerRunner) StartContainer() (err error) {
 	return nil
 }
 
-type ArvLoggingWriter struct {
-	Api IArvadosClient
-	Kc  IKeepClient
-}
-
-func (this ArvLoggingWriter) Write(p []byte) (n int, err error) {
-	return len(p), nil
-}
-
-const (
-	MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-)
+func (this *ContainerRunner) AttachLogs() (err error) {
 
-func CopyLog(in io.Reader, out *log.Logger, done chan<- bool) {
-	reader := bufio.NewReaderSize(in, MaxLogLine)
-	var prefix string
-	for {
-		line, isPrefix, err := reader.ReadLine()
-		if err == io.EOF {
-			break
-		} else if err != nil {
-			out.Fatal("error reading container log:", err)
-		}
-		var suffix string
-		if isPrefix {
-			suffix = "[...]"
-		}
-		out.Print(prefix, string(line), suffix)
-		// Set up prefix for following line
-		if isPrefix {
-			prefix = "[...]"
-		} else {
-			prefix = ""
-		}
-	}
-	done <- true
-}
+	this.CrunchLog.Print("Attaching container logs")
 
-func (this *ContainerRunner) GetLogs() (err error) {
 	var stderrReader, stdoutReader io.Reader
 	stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true, Timestamps: true})
 	if err != nil {
@@ -182,10 +162,12 @@ func (this *ContainerRunner) GetLogs() (err error) {
 		return
 	}
 
-	this.FinishChan = make(chan bool)
+	this.loggingDone = make(chan bool)
 
-	go CopyLog(stderrReader, log.New(this.NewLogWriter("stderr"), "", 0), this.FinishChan)
-	go CopyLog(stdoutReader, log.New(this.NewLogWriter("stdout"), "", 0), this.FinishChan)
+	this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
+	this.Stderr = NewThrottledLogger(this.NewLogWriter("stdout"))
+	go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
+	go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
 
 	return nil
 }
@@ -198,64 +180,85 @@ func (this *ContainerRunner) WaitFinish() error {
 	}
 	this.ExitCode = wr.ExitCode
 
-	<-this.FinishChan
-	<-this.FinishChan
+	<-this.loggingDone
+	<-this.loggingDone
+
+	this.Stdout.Stop()
+	this.Stderr.Stop()
 
 	return nil
 }
 
-func (this *ContainerRunner) writeLogs() error {
+func (this *ContainerRunner) CommitLogs() error {
 	return nil
 }
 
-func (this *ContainerRunner) updateContainer() error {
+func (this *ContainerRunner) UpdateContainerRecord() error {
 	return nil
 }
 
+func (this *ContainerRunner) NewArvLogWriter(name string) io.Writer {
+	return &ArvLogWriter{this.Api, this.Kc, name}
+}
+
 func (this *ContainerRunner) Run(containerUuid string) (err error) {
 
+	this.NewLogWriter = this.NewArvLogWriter
+	this.CrunchLog = NewThrottledLogger(this.NewLogWriter("crunchexec"))
+
 	err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
 	if err != nil {
+		this.CrunchLog.Print(err)
 		return
 	}
 
 	// (0) start event monitoring goroutines
 	err = this.setupMonitoring()
 	if err != nil {
+		this.CrunchLog.Print(err)
 		return
 	}
 
 	// (1) check for and/or load image
 	err = this.LoadImage()
 	if err != nil {
+		this.CrunchLog.Print(err)
 		return
 	}
 
 	// (2) start container
 	err = this.StartContainer()
 	if err != nil {
+		this.CrunchLog.Print(err)
 		return
 	}
 
 	// (3) attach container logs
-	err = this.GetLogs()
+	err = this.AttachLogs()
 
 	// (4) wait for container to finish
 	err = this.WaitFinish()
 
 	// (5) write logs
-	err = this.writeLogs()
+	err = this.CommitLogs()
 
 	// (6) update container record with results
-	this.updateContainer()
+	this.UpdateContainerRecord()
 
 	return
 }
 
+func NewContainerRunner() *ContainerRunner {
+	cr := &ContainerRunner{}
+	cr.NewLogWriter = cr.NewArvLogWriter
+	cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
+	return cr
+}
+
 func main() {
 	flag.Parse()
 
-	var cr ContainerRunner
+	cr := NewContainerRunner()
 
 	api, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index c24a74e..01eeea0 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -81,7 +81,7 @@ func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename str
 }
 
 func (s *TestSuite) TestLoadImage(c *C) {
-	cr := ContainerRunner{}
+	cr := NewContainerRunner()
 	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
 
 	var err error
@@ -173,7 +173,7 @@ func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, file
 
 func (s *TestSuite) TestLoadImageArvError(c *C) {
 	// (1) Arvados error
-	cr := ContainerRunner{}
+	cr := NewContainerRunner()
 	cr.Api = ArvErrorTestClient{}
 	cr.Kc = &KeepTestClient{}
 	cr.ContainerRecord.ContainerImage = busyboxPDH
@@ -184,7 +184,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
 	// (2) Keep error
-	cr := ContainerRunner{}
+	cr := NewContainerRunner()
 	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
 	cr.Api = &ArvTestClient{}
 	cr.Kc = KeepErrorTestClient{}
@@ -196,7 +196,7 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) {
 
 func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 	// (3) Collection doesn't contain image
-	cr := ContainerRunner{}
+	cr := NewContainerRunner()
 	cr.Api = &ArvTestClient{}
 	cr.Kc = KeepErrorTestClient{}
 	cr.ContainerRecord.ContainerImage = otherPDH
@@ -207,7 +207,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
 	// (4) Collection doesn't contain image
-	cr := ContainerRunner{}
+	cr := NewContainerRunner()
 	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
 	cr.Api = &ArvTestClient{}
 	cr.Kc = KeepReadErrorTestClient{}
@@ -233,7 +233,7 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.Writer {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-	cr := ContainerRunner{}
+	cr := NewContainerRunner()
 	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
 	cr.Api = ArvTestClient{}
 	cr.Kc = &KeepTestClient{}
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
new file mode 100644
index 0000000..4f03433
--- /dev/null
+++ b/services/crunch-exec/logging.go
@@ -0,0 +1,98 @@
+package main
+
+import (
+	"bufio"
+	"bytes"
+	"io"
+	"log"
+	"sync"
+	"time"
+)
+
+type ThrottledLogger struct {
+	*log.Logger
+	buf bytes.Buffer
+	sync.Mutex
+	writer      io.Writer
+	stop        bool
+	flusherDone chan bool
+}
+
+func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
+	this.Mutex.Lock()
+	defer this.Mutex.Unlock()
+	return this.buf.Write(p)
+}
+
+func (this *ThrottledLogger) Stop() {
+	this.stop = true
+	<-this.flusherDone
+}
+
+func (this *ThrottledLogger) Flusher() {
+	for {
+		if !this.stop {
+			time.Sleep(1 * time.Second)
+		}
+		this.Mutex.Lock()
+		if this.buf.Len() > 0 {
+			this.writer.Write(this.buf.Bytes())
+			this.buf.Reset()
+		} else if this.stop {
+			this.Mutex.Unlock()
+			break
+		}
+		this.Mutex.Unlock()
+	}
+	this.flusherDone <- true
+}
+
+const (
+	MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
+)
+
+func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
+	reader := bufio.NewReaderSize(in, MaxLogLine)
+	var prefix string
+	for {
+		line, isPrefix, err := reader.ReadLine()
+		if err == io.EOF {
+			break
+		} else if err != nil {
+			logger.Print("error reading container log:", err)
+		}
+		var suffix string
+		if isPrefix {
+			suffix = "[...]"
+		}
+		logger.Print(prefix, string(line), suffix)
+		// Set up prefix for following line
+		if isPrefix {
+			prefix = "[...]"
+		} else {
+			prefix = ""
+		}
+	}
+	done <- true
+}
+
+func NewThrottledLogger(writer io.Writer) *ThrottledLogger {
+	alw := &ThrottledLogger{}
+	alw.flusherDone = make(chan bool)
+	alw.writer = writer
+	alw.Logger = log.New(alw, "", 0)
+	go alw.Flusher()
+	return alw
+}
+
+type ArvLogWriter struct {
+	Api           IArvadosClient
+	Kc            IKeepClient
+	loggingStream string
+}
+
+func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+	// write to API
+	// write to Keep
+	return 0, nil
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list