[ARVADOS] updated: eff4b3c943e8c85242f75f09cd6c8a81b9b86309
git at public.curoverse.com
git at public.curoverse.com
Fri Jan 15 16:32:11 EST 2016
Summary of changes:
services/crunch-exec/crunchexec.go | 230 ++++++++++++++++++--------------
services/crunch-exec/crunchexec_test.go | 10 +-
services/crunch-exec/logging.go | 100 +++++++-------
services/crunch-exec/upload.go | 18 ++-
4 files changed, 201 insertions(+), 157 deletions(-)
via eff4b3c943e8c85242f75f09cd6c8a81b9b86309 (commit)
from 6bcd3f15fd664e1d3b7200e77c1a09f94c06054f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit eff4b3c943e8c85242f75f09cd6c8a81b9b86309
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 15 16:32:08 2016 -0500
7816: Fixes recommended by golint.
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index abaed82..c07ce4a 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -16,27 +16,33 @@ import (
"syscall"
)
+// IArvadosClient is the minimal Arvados API methods used by crunchexec.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
+// ErrCancelled is the error returned when the container is cancelled.
var ErrCancelled = errors.New("Cancelled")
+// IKeepClient is the minimal Keep API methods used by crunchexec.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
}
+// Mount describes the mount points to create inside the container.
type Mount struct{}
+// Collection record returned by the API server.
type Collection struct {
ManifestText string `json:"manifest_text"`
}
+// ContainerRecord is the container record returned by the API server.
type ContainerRecord struct {
- Uuid string `json:"uuid"`
+ UUID string `json:"uuid"`
Command []string `json:"command"`
ContainerImage string `json:"container_image"`
Cwd string `json:"cwd"`
@@ -48,8 +54,10 @@ type ContainerRecord struct {
State string `json:"state"`
}
+// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) io.WriteCloser
+// ThinDockerClient is the minimal Docker client interface used by crunchexec.
type ThinDockerClient interface {
StopContainer(id string, timeout int) error
InspectImage(id string) (*dockerclient.ImageInfo, error)
@@ -61,13 +69,15 @@ type ThinDockerClient interface {
RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
}
+// ContainerRunner is the main stateful struct used for a single execution of a
+// container.
type ContainerRunner struct {
- Docker ThinDockerClient
- Api IArvadosClient
- Kc IKeepClient
+ Docker ThinDockerClient
+ ArvClient IArvadosClient
+ Kc IKeepClient
ContainerRecord
dockerclient.ContainerConfig
- ContainerId string
+ ContainerID string
ExitCode *int
NewLogWriter
loggingDone chan bool
@@ -82,97 +92,103 @@ type ContainerRunner struct {
finalState string
}
-func (this *ContainerRunner) SetupSignals() error {
- this.SigChan = make(chan os.Signal, 1)
- signal.Notify(this.SigChan, syscall.SIGTERM)
- signal.Notify(this.SigChan, syscall.SIGINT)
- signal.Notify(this.SigChan, syscall.SIGQUIT)
+// SetupSignals sets up signal handling to gracefully terminate the underlying
+// Docker container and update state when receiving a TERM, INT or QUIT signal.
+func (runner *ContainerRunner) SetupSignals() error {
+ runner.SigChan = make(chan os.Signal, 1)
+ signal.Notify(runner.SigChan, syscall.SIGTERM)
+ signal.Notify(runner.SigChan, syscall.SIGINT)
+ signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig <-chan os.Signal) {
for _ = range sig {
- if !this.Cancelled {
- this.CancelLock.Lock()
- this.Cancelled = true
- if this.ContainerId != "" {
- this.Docker.StopContainer(this.ContainerId, 10)
+ if !runner.Cancelled {
+ runner.CancelLock.Lock()
+ runner.Cancelled = true
+ if runner.ContainerID != "" {
+ runner.Docker.StopContainer(runner.ContainerID, 10)
}
- this.CancelLock.Unlock()
+ runner.CancelLock.Unlock()
}
}
- }(this.SigChan)
+ }(runner.SigChan)
return nil
}
-func (this *ContainerRunner) LoadImage() (err error) {
+// LoadImage determines the docker image id from the container record and
+// checks if it is available in the local Docker image store. If not, it loads
+// the image from Keep.
+func (runner *ContainerRunner) LoadImage() (err error) {
- this.CrunchLog.Printf("Fetching Docker image from collection '%s'", this.ContainerRecord.ContainerImage)
+ runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
var collection Collection
- err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
+ err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
if err != nil {
return err
}
manifest := manifest.Manifest{Text: collection.ManifestText}
- var img, imageId string
+ var img, imageID string
for ms := range manifest.StreamIter() {
img = ms.FileStreamSegments[0].Name
if !strings.HasSuffix(img, ".tar") {
return errors.New("First file in the collection does not end in .tar")
}
- imageId = img[:len(img)-4]
+ imageID = img[:len(img)-4]
}
- this.CrunchLog.Printf("Using Docker image id '%s'", imageId)
+ runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
- _, err = this.Docker.InspectImage(imageId)
+ _, err = runner.Docker.InspectImage(imageID)
if err != nil {
- this.CrunchLog.Print("Loading Docker image from keep")
+ runner.CrunchLog.Print("Loading Docker image from keep")
var readCloser io.ReadCloser
- readCloser, err = this.Kc.ManifestFileReader(manifest, img)
+ readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
if err != nil {
return err
}
- err = this.Docker.LoadImage(readCloser)
+ err = runner.Docker.LoadImage(readCloser)
if err != nil {
return err
}
} else {
- this.CrunchLog.Print("Docker image is available")
+ runner.CrunchLog.Print("Docker image is available")
}
- this.ContainerConfig.Image = imageId
+ runner.ContainerConfig.Image = imageID
return nil
}
-func (this *ContainerRunner) StartContainer() (err error) {
- this.CrunchLog.Print("Creating Docker container")
+// StartContainer creates the container and runs it.
+func (runner *ContainerRunner) StartContainer() (err error) {
+ runner.CrunchLog.Print("Creating Docker container")
- this.CancelLock.Lock()
- defer this.CancelLock.Unlock()
+ runner.CancelLock.Lock()
+ defer runner.CancelLock.Unlock()
- if this.Cancelled {
+ if runner.Cancelled {
return ErrCancelled
}
- this.ContainerConfig.Cmd = this.ContainerRecord.Command
- if this.ContainerRecord.Cwd != "." {
- this.ContainerConfig.WorkingDir = this.ContainerRecord.Cwd
+ runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
+ if runner.ContainerRecord.Cwd != "." {
+ runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
}
- for k, v := range this.ContainerRecord.Environment {
- this.ContainerConfig.Env = append(this.ContainerConfig.Env, k+"="+v)
+ for k, v := range runner.ContainerRecord.Environment {
+ runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
- this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
+ runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
if err != nil {
return
}
hostConfig := &dockerclient.HostConfig{}
- this.CrunchLog.Printf("Starting Docker container id '%s'", this.ContainerId)
- err = this.Docker.StartContainer(this.ContainerId, hostConfig)
+ runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+ err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
if err != nil {
return
}
@@ -180,126 +196,141 @@ func (this *ContainerRunner) StartContainer() (err error) {
return nil
}
-func (this *ContainerRunner) AttachLogs() (err error) {
+// AttachLogs connects the docker container stdout and stderr logs to the
+// Arvados logger which logs to Keep and the API server logs table.
+func (runner *ContainerRunner) AttachLogs() (err error) {
- this.CrunchLog.Print("Attaching container logs")
+ runner.CrunchLog.Print("Attaching container logs")
var stderrReader, stdoutReader io.Reader
- stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true})
+ stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
if err != nil {
return
}
- stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true})
+ stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
if err != nil {
return
}
- this.loggingDone = make(chan bool)
+ runner.loggingDone = make(chan bool)
- this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
- this.Stderr = NewThrottledLogger(this.NewLogWriter("stderr"))
- go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
- go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
+ runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ go CopyReaderToLog(stdoutReader, runner.Stdout.Logger, runner.loggingDone)
+ go CopyReaderToLog(stderrReader, runner.Stderr.Logger, runner.loggingDone)
return nil
}
-func (this *ContainerRunner) WaitFinish() error {
- result := this.Docker.Wait(this.ContainerId)
+// WaitFinish waits for the container to terminate, capture the exit code, and
+// close the stdout/stderr logging.
+func (runner *ContainerRunner) WaitFinish() error {
+ result := runner.Docker.Wait(runner.ContainerID)
wr := <-result
if wr.Error != nil {
return wr.Error
}
- this.ExitCode = &wr.ExitCode
+ runner.ExitCode = &wr.ExitCode
// drain stdout/stderr
- <-this.loggingDone
- <-this.loggingDone
+ <-runner.loggingDone
+ <-runner.loggingDone
- this.Stdout.Close()
- this.Stderr.Close()
+ runner.Stdout.Close()
+ runner.Stderr.Close()
return nil
}
-func (this *ContainerRunner) CommitLogs() error {
- this.CrunchLog.Print(this.finalState)
- this.CrunchLog.Close()
- this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
+// CommitLogs posts the collection containing the final container logs.
+func (runner *ContainerRunner) CommitLogs() error {
+ runner.CrunchLog.Print(runner.finalState)
+ runner.CrunchLog.Close()
+
+ // Closing CrunchLog above allows it to be committed to Keep at this
+ // point, but re-open crunch log with ArvClient in case there are any
+ // other further (such as failing to write the log to Keep!) while
+ // shutting down
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
"crunchexec", nil})
- mt, err := this.LogCollection.ManifestText()
+ mt, err := runner.LogCollection.ManifestText()
if err != nil {
return err
}
response := make(map[string]string)
- err = this.Api.Create("collections",
- arvadosclient.Dict{"name": "logs for " + this.ContainerRecord.Uuid,
+ err = runner.ArvClient.Create("collections",
+ arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID,
"manifest_text": mt},
response)
if err != nil {
return err
}
- this.LogsPDH = new(string)
- *this.LogsPDH = response["portable_data_hash"]
+ runner.LogsPDH = new(string)
+ *runner.LogsPDH = response["portable_data_hash"]
return nil
}
-func (this *ContainerRunner) UpdateContainerRecordRunning() error {
+// UpdateContainerRecordRunning updates the container state to "Running"
+func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
update := arvadosclient.Dict{"state": "Running"}
- return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+ return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
}
-func (this *ContainerRunner) UpdateContainerRecordComplete() error {
+// UpdateContainerRecordComplete updates the container record state on API
+// server to "Complete" or "Cancelled"
+func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
update := arvadosclient.Dict{}
- if this.LogsPDH != nil {
- update["log"] = *this.LogsPDH
+ if runner.LogsPDH != nil {
+ update["log"] = *runner.LogsPDH
}
- if this.ExitCode != nil {
- update["exit_code"] = *this.ExitCode
+ if runner.ExitCode != nil {
+ update["exit_code"] = *runner.ExitCode
}
- update["state"] = this.finalState
+ update["state"] = runner.finalState
- return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+ return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
}
-func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
- return &ArvLogWriter{this.Api, this.ContainerRecord.Uuid, name, this.LogCollection.Open(name + ".txt")}
+// NewArvLogWriter creates an ArvLogWriter
+func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+ return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
}
-func (this *ContainerRunner) Run(containerUuid string) (err error) {
- this.CrunchLog.Printf("Executing container '%s'", containerUuid)
+// Run the full container lifecycle.
+func (runner *ContainerRunner) Run(containerUUID string) (err error) {
+ runner.CrunchLog.Printf("Executing container '%s'", containerUUID)
var runerr, waiterr error
defer func() {
if err != nil {
- this.CrunchLog.Print(err)
+ runner.CrunchLog.Print(err)
}
- if this.Cancelled {
- this.finalState = "Cancelled"
+ if runner.Cancelled {
+ runner.finalState = "Cancelled"
} else {
- this.finalState = "Complete"
+ runner.finalState = "Complete"
}
// (6) write logs
- logerr := this.CommitLogs()
+ logerr := runner.CommitLogs()
if logerr != nil {
- this.CrunchLog.Print(logerr)
+ runner.CrunchLog.Print(logerr)
}
// (7) update container record with results
- updateerr := this.UpdateContainerRecordComplete()
+ updateerr := runner.UpdateContainerRecordComplete()
if updateerr != nil {
- this.CrunchLog.Print(updateerr)
+ runner.CrunchLog.Print(updateerr)
}
- this.CrunchLog.Close()
+ runner.CrunchLog.Close()
if err == nil {
if runerr != nil {
@@ -314,25 +345,25 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
}
}()
- err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
+ err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord)
if err != nil {
return
}
// (0) setup signal handling
- err = this.SetupSignals()
+ err = runner.SetupSignals()
if err != nil {
return
}
// (1) check for and/or load image
- err = this.LoadImage()
+ err = runner.LoadImage()
if err != nil {
return
}
// (2) start container
- err = this.StartContainer()
+ err = runner.StartContainer()
if err != nil {
if err == ErrCancelled {
err = nil
@@ -341,28 +372,29 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
}
// (3) update container record state
- err = this.UpdateContainerRecordRunning()
+ err = runner.UpdateContainerRecordRunning()
if err != nil {
- this.CrunchLog.Print(err)
+ runner.CrunchLog.Print(err)
}
// (4) attach container logs
- runerr = this.AttachLogs()
+ runerr = runner.AttachLogs()
if runerr != nil {
- this.CrunchLog.Print(runerr)
+ runner.CrunchLog.Print(runerr)
}
// (5) wait for container to finish
- waiterr = this.WaitFinish()
+ waiterr = runner.WaitFinish()
return
}
+// NewContainerRunner creates a new container runner.
func NewContainerRunner(api IArvadosClient,
kc IKeepClient,
docker ThinDockerClient) *ContainerRunner {
- cr := &ContainerRunner{Api: api, Kc: kc, Docker: docker}
+ cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
cr.NewLogWriter = cr.NewArvLogWriter
cr.LogCollection = &CollectionWriter{kc, nil}
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 7efdad8..baf67b2 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -390,7 +390,7 @@ func (s *TestSuite) TestCommitLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
@@ -409,7 +409,7 @@ func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
err := cr.UpdateContainerRecordRunning()
c.Check(err, IsNil)
@@ -421,7 +421,7 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
cr.LogsPDH = new(string)
*cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
@@ -442,7 +442,7 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
cr.Cancelled = true
cr.finalState = "Cancelled"
@@ -610,7 +610,7 @@ func (s *TestSuite) TestCancel(c *C) {
cr := NewContainerRunner(api, &KeepTestClient{}, docker)
go func() {
- for cr.ContainerId == "" {
+ for cr.ContainerID == "" {
time.Sleep(1 * time.Second)
}
cr.SigChan <- syscall.SIGINT
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index 1604445..3860484 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -3,7 +3,6 @@ package main
import (
"bufio"
"bytes"
- "errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"io"
@@ -12,6 +11,8 @@ import (
"time"
)
+// Timestamper is the signature for a function that takes a timestamp and
+// return a formated string value.
type Timestamper func(t time.Time) string
// Logging plumbing:
@@ -36,61 +37,62 @@ type ThrottledLogger struct {
Timestamper
}
-// Builtin RFC3339Nano format isn't fixed width so
-// provide our own with microsecond precision (same as API server).
+// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
+// because the RFC3339Nano format isn't fixed width.
const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
+// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
func RFC3339Timestamp(now time.Time) string {
return now.Format(RFC3339Fixed)
}
// Write to the internal buffer. Prepend a timestamp to each line of the input
// data.
-func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
- this.Mutex.Lock()
- if this.buf == nil {
- this.buf = &bytes.Buffer{}
+func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
+ tl.Mutex.Lock()
+ if tl.buf == nil {
+ tl.buf = &bytes.Buffer{}
}
- defer this.Mutex.Unlock()
+ defer tl.Mutex.Unlock()
- now := this.Timestamper(time.Now().UTC())
+ now := tl.Timestamper(time.Now().UTC())
sc := bufio.NewScanner(bytes.NewBuffer(p))
for sc.Scan() {
- _, err = fmt.Fprintf(this.buf, "%s %s\n", now, sc.Text())
+ _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
}
return len(p), err
}
// Periodically check the current buffer; if not empty, send it on the
// channel to the goWriter goroutine.
-func (this *ThrottledLogger) flusher() {
+func (tl *ThrottledLogger) flusher() {
bufchan := make(chan *bytes.Buffer)
bufterm := make(chan bool)
// Use a separate goroutine for the actual write so that the writes are
// actually initiated closer every 1s instead of every
// 1s + (time to it takes to write).
- go goWriter(this.writer, bufchan, bufterm)
+ go goWriter(tl.writer, bufchan, bufterm)
for {
- if !this.stop {
+ if !tl.stop {
time.Sleep(1 * time.Second)
}
- this.Mutex.Lock()
- if this.buf != nil && this.buf.Len() > 0 {
- oldbuf := this.buf
- this.buf = nil
- this.Mutex.Unlock()
+ tl.Mutex.Lock()
+ if tl.buf != nil && tl.buf.Len() > 0 {
+ oldbuf := tl.buf
+ tl.buf = nil
+ tl.Mutex.Unlock()
bufchan <- oldbuf
- } else if this.stop {
- this.Mutex.Unlock()
+ } else if tl.stop {
+ tl.Mutex.Unlock()
break
} else {
- this.Mutex.Unlock()
+ tl.Mutex.Unlock()
}
}
close(bufchan)
<-bufterm
- this.flusherDone <- true
+ tl.flusherDone <- true
}
// Receive buffers from a channel and send to the underlying Writer
@@ -101,19 +103,21 @@ func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
t <- true
}
-// Stop the flusher goroutine and wait for it to complete, then close the
+// Close the flusher goroutine and wait for it to complete, then close the
// underlying Writer.
-func (this *ThrottledLogger) Close() error {
- this.stop = true
- <-this.flusherDone
- return this.writer.Close()
+func (tl *ThrottledLogger) Close() error {
+ tl.stop = true
+ <-tl.flusherDone
+ return tl.writer.Close()
}
const (
- MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
+ // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
+ MaxLogLine = 1 << 12
)
-// Goroutine to copy from a reader to a logger, with long line splitting.
+// CopyReaderToLog reads from a Reader and prints to a Logger, with long line
+// splitting.
func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
reader := bufio.NewReaderSize(in, MaxLogLine)
var prefix string
@@ -139,11 +143,10 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
done <- true
}
-// Create a new thottled logger that
+// NewThrottledLogger creates a new thottled logger that
// (a) prepends timestamps to each line
// (b) batches log messages and only calls the underlying Writer at most once
// per second.
-
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
alw := &ThrottledLogger{}
alw.flusherDone = make(chan bool)
@@ -154,40 +157,39 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
return alw
}
-// Implements a writer that writes to each of a WriteCloser (typically
-// CollectionFileWriter) and creates an API server log entry.
+// ArvLogWriter implements a writer that writes to each of a WriteCloser
+// (typically CollectionFileWriter) and creates an API server log entry.
type ArvLogWriter struct {
- Api IArvadosClient
- Uuid string
+ ArvClient IArvadosClient
+ UUID string
loggingStream string
writeCloser io.WriteCloser
}
-func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
// Write to the next writer in the chain (a file in Keep)
var err1 error
- if this.writeCloser != nil {
- _, err1 = this.writeCloser.Write(p)
+ if arvlog.writeCloser != nil {
+ _, err1 = arvlog.writeCloser.Write(p)
}
// write to API
- lr := arvadosclient.Dict{"object_uuid": this.Uuid,
- "event_type": this.loggingStream,
+ lr := arvadosclient.Dict{"object_uuid": arvlog.UUID,
+ "event_type": arvlog.loggingStream,
"properties": map[string]string{"text": string(p)}}
- err2 := this.Api.Create("logs", lr, nil)
+ err2 := arvlog.ArvClient.Create("logs", lr, nil)
if err1 != nil || err2 != nil {
- return 0, errors.New(fmt.Sprintf("%s ; %s", err1, err2))
- } else {
- return len(p), nil
+ return 0, fmt.Errorf("%s ; %s", err1, err2)
}
-
+ return len(p), nil
}
-func (this *ArvLogWriter) Close() (err error) {
- if this.writeCloser != nil {
- err = this.writeCloser.Close()
- this.writeCloser = nil
+// Close the underlying writer
+func (arvlog *ArvLogWriter) Close() (err error) {
+ if arvlog.writeCloser != nil {
+ err = arvlog.writeCloser.Close()
+ arvlog.writeCloser = nil
}
return err
}
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
index 8d2d802..4a2693a 100644
--- a/services/crunch-exec/upload.go
+++ b/services/crunch-exec/upload.go
@@ -20,11 +20,13 @@ import (
"strings"
)
+// Block is a data block in a manifest stream
type Block struct {
data []byte
offset int64
}
+// CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
type CollectionFileWriter struct {
IKeepClient
*manifest.ManifestStream
@@ -36,11 +38,13 @@ type CollectionFileWriter struct {
fn string
}
+// Write to a file in a keep collection
func (m *CollectionFileWriter) Write(p []byte) (int, error) {
n, err := m.ReadFrom(bytes.NewReader(p))
return int(n), err
}
+// ReadFrom a Reader and write to the Keep collection file.
func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
var total int64
var count int
@@ -62,11 +66,11 @@ func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
if err == io.EOF {
return total, nil
- } else {
- return total, err
}
+ return total, err
}
+// Close stops writing a file and adds it to the parent manifest.
func (m *CollectionFileWriter) Close() error {
m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
manifest.FileStreamSegment{m.offset, m.length, m.fn})
@@ -89,11 +93,14 @@ func (m *CollectionFileWriter) goUpload() {
finish <- errors
}
+// CollectionWriter makes implements creating new Keep collections by opening files
+// and writing to them.
type CollectionWriter struct {
IKeepClient
Streams []*CollectionFileWriter
}
+// Open a new file for writing in the Keep collection.
func (m *CollectionWriter) Open(path string) io.WriteCloser {
var dir string
var fn string
@@ -123,6 +130,7 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
return fw
}
+// Finish writing the collection, wait for all blocks to complete uploading.
func (m *CollectionWriter) Finish() error {
var errstring string
for _, stream := range m.Streams {
@@ -145,11 +153,13 @@ func (m *CollectionWriter) Finish() error {
}
if errstring != "" {
return errors.New(errstring)
- } else {
- return nil
}
+ return nil
}
+// ManifestText returns the manifest text of the collection. Calls Finish()
+// first to ensure that all blocks are written and that signed locators and
+// available.
func (m *CollectionWriter) ManifestText() (mt string, err error) {
err = m.Finish()
if err != nil {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list