[ARVADOS] updated: 6bcd3f15fd664e1d3b7200e77c1a09f94c06054f

git at public.curoverse.com git at public.curoverse.com
Fri Jan 15 15:53:59 EST 2016


Summary of changes:
 services/crunch-exec/crunchexec.go      | 55 +++++++++---------
 services/crunch-exec/crunchexec_test.go |  9 +--
 services/crunch-exec/logging.go         | 98 ++++++++++++++++++++++-----------
 services/crunch-exec/logging_test.go    |  8 +--
 services/crunch-exec/upload.go          |  9 +++
 5 files changed, 112 insertions(+), 67 deletions(-)

       via  6bcd3f15fd664e1d3b7200e77c1a09f94c06054f (commit)
      from  f7115688cc3a63b4721ca59259bc8276685bac9a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 6bcd3f15fd664e1d3b7200e77c1a09f94c06054f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jan 15 15:50:33 2016 -0500

    7816: Change ThrottledLogger Stop() to Close().  Choose finalState once so that
    log and container record can't go out of sync.  Add comments.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 9e6a12f..abaed82 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -22,6 +22,8 @@ type IArvadosClient interface {
 	Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
 }
 
+var ErrCancelled = errors.New("Cancelled")
+
 type IKeepClient interface {
 	PutHB(hash string, buf []byte) (string, int, error)
 	ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
@@ -34,16 +36,16 @@ type Collection struct {
 }
 
 type ContainerRecord struct {
-	Uuid               string            `json:"uuid"`
-	Command            []string          `json:"command"`
-	ContainerImage     string            `json:"container_image"`
-	Cwd                string            `json:"cwd"`
-	Environment        map[string]string `json:"environment"`
-	Mounts             map[string]Mount  `json:"mounts"`
-	OutputPath         string            `json:"output_path"`
-	Priority           int               `json:"priority"`
-	RuntimeConstraints map[string]string `json:"runtime_constraints"`
-	State              string            `json:"state"`
+	Uuid               string                 `json:"uuid"`
+	Command            []string               `json:"command"`
+	ContainerImage     string                 `json:"container_image"`
+	Cwd                string                 `json:"cwd"`
+	Environment        map[string]string      `json:"environment"`
+	Mounts             map[string]Mount       `json:"mounts"`
+	OutputPath         string                 `json:"output_path"`
+	Priority           int                    `json:"priority"`
+	RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
+	State              string                 `json:"state"`
 }
 
 type NewLogWriter func(name string) io.WriteCloser
@@ -77,6 +79,7 @@ type ContainerRunner struct {
 	CancelLock    sync.Mutex
 	Cancelled     bool
 	SigChan       chan os.Signal
+	finalState    string
 }
 
 func (this *ContainerRunner) SetupSignals() error {
@@ -152,7 +155,7 @@ func (this *ContainerRunner) StartContainer() (err error) {
 	defer this.CancelLock.Unlock()
 
 	if this.Cancelled {
-		return errors.New("Cancelled")
+		return ErrCancelled
 	}
 
 	this.ContainerConfig.Cmd = this.ContainerRecord.Command
@@ -213,20 +216,15 @@ func (this *ContainerRunner) WaitFinish() error {
 	<-this.loggingDone
 	<-this.loggingDone
 
-	this.Stdout.Stop()
-	this.Stderr.Stop()
+	this.Stdout.Close()
+	this.Stderr.Close()
 
 	return nil
 }
 
 func (this *ContainerRunner) CommitLogs() error {
-	if this.Cancelled {
-		this.CrunchLog.Print("Cancelled")
-	} else {
-		this.CrunchLog.Print("Complete")
-	}
-
-	this.CrunchLog.Stop()
+	this.CrunchLog.Print(this.finalState)
+	this.CrunchLog.Close()
 	this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
 		"crunchexec", nil})
 
@@ -264,11 +262,8 @@ func (this *ContainerRunner) UpdateContainerRecordComplete() error {
 		update["exit_code"] = *this.ExitCode
 	}
 
-	if this.Cancelled {
-		update["state"] = "Cancelled"
-	} else {
-		update["state"] = "Complete"
-	}
+	update["state"] = this.finalState
+
 	return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
 }
 
@@ -286,6 +281,12 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 			this.CrunchLog.Print(err)
 		}
 
+		if this.Cancelled {
+			this.finalState = "Cancelled"
+		} else {
+			this.finalState = "Complete"
+		}
+
 		// (6) write logs
 		logerr := this.CommitLogs()
 		if logerr != nil {
@@ -298,7 +299,7 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 			this.CrunchLog.Print(updateerr)
 		}
 
-		this.CrunchLog.Stop()
+		this.CrunchLog.Close()
 
 		if err == nil {
 			if runerr != nil {
@@ -333,7 +334,7 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 	// (2) start container
 	err = this.StartContainer()
 	if err != nil {
-		if err.Error() == "Cancelled" {
+		if err == ErrCancelled {
 			err = nil
 		}
 		return
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index fcd5f80..7efdad8 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -342,10 +342,6 @@ type TestLogs struct {
 	Stderr ClosableBuffer
 }
 
-func (this *ClosableBuffer) Write(p []byte) (n int, err error) {
-	return this.Buffer.Write(p)
-}
-
 func (this *ClosableBuffer) Close() error {
 	return nil
 }
@@ -399,6 +395,7 @@ func (s *TestSuite) TestCommitLogs(c *C) {
 
 	cr.CrunchLog.Print("Hello world!")
 	cr.CrunchLog.Print("Goodbye")
+	cr.finalState = "Complete"
 
 	err := cr.CommitLogs()
 	c.Check(err, IsNil)
@@ -431,6 +428,7 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
 
 	cr.ExitCode = new(int)
 	*cr.ExitCode = 42
+	cr.finalState = "Complete"
 
 	err := cr.UpdateContainerRecordComplete()
 	c.Check(err, IsNil)
@@ -446,6 +444,7 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
 	cr := NewContainerRunner(api, kc, nil)
 	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
 	cr.Cancelled = true
+	cr.finalState = "Cancelled"
 
 	err := cr.UpdateContainerRecordComplete()
 	c.Check(err, IsNil)
@@ -455,6 +454,8 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
 	c.Check(api.Content["state"], Equals, "Cancelled")
 }
 
+// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
+// dress rehersal of the Run() function, starting from a JSON container record.
 func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
 	rec := ContainerRecord{}
 	err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index 01b6548..1604445 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -14,6 +14,18 @@ import (
 
 type Timestamper func(t time.Time) string
 
+// Logging plumbing:
+//
+// ThrottledLogger.Logger -> ThrottledLogger.Write ->
+// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
+// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
+//
+// For stdout/stderr CopyReaderToLog additionally runs as a goroutine to pull
+// data from the stdout/stderr Reader and send to the Logger.
+
+// ThrottledLogger accepts writes, prepends a timestamp to each line of the
+// write, and periodically flushes to a downstream writer.  It supports the
+// "Logger" and "WriteCloser" interfaces.
 type ThrottledLogger struct {
 	*log.Logger
 	buf *bytes.Buffer
@@ -24,18 +36,16 @@ type ThrottledLogger struct {
 	Timestamper
 }
 
-func RFC3339Timestamp(now time.Time) string {
-	// return now.Format(time.RFC3339Nano)
-	// Builtin RFC3339Nano format isn't fixed width so
-	// provide our own.
-
-	return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ",
-		now.Year(), now.Month(), now.Day(),
-		now.Hour(), now.Minute(), now.Second(),
-		now.Nanosecond())
+// Builtin RFC3339Nano format isn't fixed width so
+// provide our own with microsecond precision (same as API server).
+const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
 
+func RFC3339Timestamp(now time.Time) string {
+	return now.Format(RFC3339Fixed)
 }
 
+// Write to the internal buffer.  Prepend a timestamp to each line of the input
+// data.
 func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
 	this.Mutex.Lock()
 	if this.buf == nil {
@@ -43,27 +53,23 @@ func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
 	}
 	defer this.Mutex.Unlock()
 
-	now := time.Now().UTC()
-	_, err = fmt.Fprintf(this.buf, "%s %s", this.Timestamper(now), p)
-	return len(p), err
-}
-
-func (this *ThrottledLogger) Stop() {
-	this.stop = true
-	<-this.flusherDone
-	this.writer.Close()
-}
-
-func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
-	for b := range c {
-		writer.Write(b.Bytes())
+	now := this.Timestamper(time.Now().UTC())
+	sc := bufio.NewScanner(bytes.NewBuffer(p))
+	for sc.Scan() {
+		_, err = fmt.Fprintf(this.buf, "%s %s\n", now, sc.Text())
 	}
-	t <- true
+	return len(p), err
 }
 
+// Periodically check the current buffer; if not empty, send it on the
+// channel to the goWriter goroutine.
 func (this *ThrottledLogger) flusher() {
 	bufchan := make(chan *bytes.Buffer)
 	bufterm := make(chan bool)
+
+	// Use a separate goroutine for the actual write so that the writes are
+	// actually initiated closer every 1s instead of every
+	// 1s + (time to it takes to write).
 	go goWriter(this.writer, bufchan, bufterm)
 	for {
 		if !this.stop {
@@ -71,23 +77,43 @@ func (this *ThrottledLogger) flusher() {
 		}
 		this.Mutex.Lock()
 		if this.buf != nil && this.buf.Len() > 0 {
-			bufchan <- this.buf
+			oldbuf := this.buf
 			this.buf = nil
+			this.Mutex.Unlock()
+			bufchan <- oldbuf
 		} else if this.stop {
 			this.Mutex.Unlock()
 			break
+		} else {
+			this.Mutex.Unlock()
 		}
-		this.Mutex.Unlock()
 	}
 	close(bufchan)
 	<-bufterm
 	this.flusherDone <- true
 }
 
+// Receive buffers from a channel and send to the underlying Writer
+func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
+	for b := range c {
+		writer.Write(b.Bytes())
+	}
+	t <- true
+}
+
+// Stop the flusher goroutine and wait for it to complete, then close the
+// underlying Writer.
+func (this *ThrottledLogger) Close() error {
+	this.stop = true
+	<-this.flusherDone
+	return this.writer.Close()
+}
+
 const (
 	MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
 )
 
+// Goroutine to copy from a reader to a logger, with long line splitting.
 func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
 	reader := bufio.NewReaderSize(in, MaxLogLine)
 	var prefix string
@@ -113,6 +139,11 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
 	done <- true
 }
 
+// Create a new thottled logger that
+// (a) prepends timestamps to each line
+// (b) batches log messages and only calls the underlying Writer at most once
+// per second.
+
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	alw := &ThrottledLogger{}
 	alw.flusherDone = make(chan bool)
@@ -123,17 +154,20 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	return alw
 }
 
+// Implements a writer that writes to each of a WriteCloser (typically
+// CollectionFileWriter) and creates an API server log entry.
 type ArvLogWriter struct {
 	Api           IArvadosClient
 	Uuid          string
 	loggingStream string
-	io.WriteCloser
+	writeCloser   io.WriteCloser
 }
 
 func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+	// Write to the next writer in the chain (a file in Keep)
 	var err1 error
-	if this.WriteCloser != nil {
-		_, err1 = this.WriteCloser.Write(p)
+	if this.writeCloser != nil {
+		_, err1 = this.writeCloser.Write(p)
 	}
 
 	// write to API
@@ -151,9 +185,9 @@ func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
 }
 
 func (this *ArvLogWriter) Close() (err error) {
-	if this.WriteCloser != nil {
-		err = this.WriteCloser.Close()
-		this.WriteCloser = nil
+	if this.writeCloser != nil {
+		err = this.writeCloser.Close()
+		this.writeCloser = nil
 	}
 	return err
 }
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
index c86d918..fd51936 100644
--- a/services/crunch-exec/logging_test.go
+++ b/services/crunch-exec/logging_test.go
@@ -28,7 +28,7 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
 
 	cr.CrunchLog.Print("Hello world!")
 	cr.CrunchLog.Print("Goodbye")
-	cr.CrunchLog.Stop()
+	cr.CrunchLog.Close()
 
 	c.Check(api.Calls, Equals, 1)
 
@@ -54,7 +54,7 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
 		cr.CrunchLog.Printf("Hello %d", i)
 	}
 	cr.CrunchLog.Print("Goodbye")
-	cr.CrunchLog.Stop()
+	cr.CrunchLog.Close()
 
 	c.Check(api.Calls > 1, Equals, true)
 	c.Check(api.Calls < 2000000, Equals, true)
@@ -78,13 +78,13 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
 	cr.CrunchLog.Print("Goodbye")
 	stdout.Print("Blurb")
 
-	cr.CrunchLog.Stop()
+	cr.CrunchLog.Close()
 	logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
 		"2015-12-29T15:51:45.000000003Z Goodbye\n"
 	c.Check(api.Content["event_type"], Equals, "crunchexec")
 	c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext1)
 
-	stdout.Stop()
+	stdout.Close()
 	logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
 		"2015-12-29T15:51:45.000000004Z Blurb\n"
 	c.Check(api.Content["event_type"], Equals, "stdout")
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
index e85d636..8d2d802 100644
--- a/services/crunch-exec/upload.go
+++ b/services/crunch-exec/upload.go
@@ -1,5 +1,14 @@
 package main
 
+// Originally based on sdk/go/crunchrunner/upload.go
+//
+// Unlike the original, which iterates over a directory tree and uploads each
+// file sequentially, this version supports opening and writing multiple files
+// in a collection simultaneously.
+//
+// Eventually this should move into the Arvados Go SDK for a more comprehensive
+// implementation of Collections.
+
 import (
 	"bytes"
 	"crypto/md5"

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list