[ARVADOS] updated: eff4b3c943e8c85242f75f09cd6c8a81b9b86309

git at public.curoverse.com git at public.curoverse.com
Fri Jan 15 16:32:11 EST 2016


Summary of changes:
 services/crunch-exec/crunchexec.go      | 230 ++++++++++++++++++--------------
 services/crunch-exec/crunchexec_test.go |  10 +-
 services/crunch-exec/logging.go         | 100 +++++++-------
 services/crunch-exec/upload.go          |  18 ++-
 4 files changed, 201 insertions(+), 157 deletions(-)

       via  eff4b3c943e8c85242f75f09cd6c8a81b9b86309 (commit)
      from  6bcd3f15fd664e1d3b7200e77c1a09f94c06054f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit eff4b3c943e8c85242f75f09cd6c8a81b9b86309
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jan 15 16:32:08 2016 -0500

    7816: Fixes recommended by golint.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index abaed82..c07ce4a 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -16,27 +16,33 @@ import (
 	"syscall"
 )
 
+// IArvadosClient is the minimal Arvados API methods used by crunchexec.
 type IArvadosClient interface {
 	Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
 	Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
 	Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
 }
 
+// ErrCancelled is the error returned when the container is cancelled.
 var ErrCancelled = errors.New("Cancelled")
 
+// IKeepClient is the minimal Keep API methods used by crunchexec.
 type IKeepClient interface {
 	PutHB(hash string, buf []byte) (string, int, error)
 	ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
 }
 
+// Mount describes the mount points to create inside the container.
 type Mount struct{}
 
+// Collection record returned by the API server.
 type Collection struct {
 	ManifestText string `json:"manifest_text"`
 }
 
+// ContainerRecord is the container record returned by the API server.
 type ContainerRecord struct {
-	Uuid               string                 `json:"uuid"`
+	UUID               string                 `json:"uuid"`
 	Command            []string               `json:"command"`
 	ContainerImage     string                 `json:"container_image"`
 	Cwd                string                 `json:"cwd"`
@@ -48,8 +54,10 @@ type ContainerRecord struct {
 	State              string                 `json:"state"`
 }
 
+// NewLogWriter is a factory function to create a new log writer.
 type NewLogWriter func(name string) io.WriteCloser
 
+// ThinDockerClient is the minimal Docker client interface used by crunchexec.
 type ThinDockerClient interface {
 	StopContainer(id string, timeout int) error
 	InspectImage(id string) (*dockerclient.ImageInfo, error)
@@ -61,13 +69,15 @@ type ThinDockerClient interface {
 	RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
 }
 
+// ContainerRunner is the main stateful struct used for a single execution of a
+// container.
 type ContainerRunner struct {
-	Docker ThinDockerClient
-	Api    IArvadosClient
-	Kc     IKeepClient
+	Docker    ThinDockerClient
+	ArvClient IArvadosClient
+	Kc        IKeepClient
 	ContainerRecord
 	dockerclient.ContainerConfig
-	ContainerId string
+	ContainerID string
 	ExitCode    *int
 	NewLogWriter
 	loggingDone   chan bool
@@ -82,97 +92,103 @@ type ContainerRunner struct {
 	finalState    string
 }
 
-func (this *ContainerRunner) SetupSignals() error {
-	this.SigChan = make(chan os.Signal, 1)
-	signal.Notify(this.SigChan, syscall.SIGTERM)
-	signal.Notify(this.SigChan, syscall.SIGINT)
-	signal.Notify(this.SigChan, syscall.SIGQUIT)
+// SetupSignals sets up signal handling to gracefully terminate the underlying
+// Docker container and update state when receiving a TERM, INT or QUIT signal.
+func (runner *ContainerRunner) SetupSignals() error {
+	runner.SigChan = make(chan os.Signal, 1)
+	signal.Notify(runner.SigChan, syscall.SIGTERM)
+	signal.Notify(runner.SigChan, syscall.SIGINT)
+	signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
 	go func(sig <-chan os.Signal) {
 		for _ = range sig {
-			if !this.Cancelled {
-				this.CancelLock.Lock()
-				this.Cancelled = true
-				if this.ContainerId != "" {
-					this.Docker.StopContainer(this.ContainerId, 10)
+			if !runner.Cancelled {
+				runner.CancelLock.Lock()
+				runner.Cancelled = true
+				if runner.ContainerID != "" {
+					runner.Docker.StopContainer(runner.ContainerID, 10)
 				}
-				this.CancelLock.Unlock()
+				runner.CancelLock.Unlock()
 			}
 		}
-	}(this.SigChan)
+	}(runner.SigChan)
 
 	return nil
 }
 
-func (this *ContainerRunner) LoadImage() (err error) {
+// LoadImage determines the docker image id from the container record and
+// checks if it is available in the local Docker image store.  If not, it loads
+// the image from Keep.
+func (runner *ContainerRunner) LoadImage() (err error) {
 
-	this.CrunchLog.Printf("Fetching Docker image from collection '%s'", this.ContainerRecord.ContainerImage)
+	runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
 
 	var collection Collection
-	err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
+	err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
 	if err != nil {
 		return err
 	}
 	manifest := manifest.Manifest{Text: collection.ManifestText}
-	var img, imageId string
+	var img, imageID string
 	for ms := range manifest.StreamIter() {
 		img = ms.FileStreamSegments[0].Name
 		if !strings.HasSuffix(img, ".tar") {
 			return errors.New("First file in the collection does not end in .tar")
 		}
-		imageId = img[:len(img)-4]
+		imageID = img[:len(img)-4]
 	}
 
-	this.CrunchLog.Printf("Using Docker image id '%s'", imageId)
+	runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
 
-	_, err = this.Docker.InspectImage(imageId)
+	_, err = runner.Docker.InspectImage(imageID)
 	if err != nil {
-		this.CrunchLog.Print("Loading Docker image from keep")
+		runner.CrunchLog.Print("Loading Docker image from keep")
 
 		var readCloser io.ReadCloser
-		readCloser, err = this.Kc.ManifestFileReader(manifest, img)
+		readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
 		if err != nil {
 			return err
 		}
 
-		err = this.Docker.LoadImage(readCloser)
+		err = runner.Docker.LoadImage(readCloser)
 		if err != nil {
 			return err
 		}
 	} else {
-		this.CrunchLog.Print("Docker image is available")
+		runner.CrunchLog.Print("Docker image is available")
 	}
 
-	this.ContainerConfig.Image = imageId
+	runner.ContainerConfig.Image = imageID
 
 	return nil
 }
 
-func (this *ContainerRunner) StartContainer() (err error) {
-	this.CrunchLog.Print("Creating Docker container")
+// StartContainer creates the container and runs it.
+func (runner *ContainerRunner) StartContainer() (err error) {
+	runner.CrunchLog.Print("Creating Docker container")
 
-	this.CancelLock.Lock()
-	defer this.CancelLock.Unlock()
+	runner.CancelLock.Lock()
+	defer runner.CancelLock.Unlock()
 
-	if this.Cancelled {
+	if runner.Cancelled {
 		return ErrCancelled
 	}
 
-	this.ContainerConfig.Cmd = this.ContainerRecord.Command
-	if this.ContainerRecord.Cwd != "." {
-		this.ContainerConfig.WorkingDir = this.ContainerRecord.Cwd
+	runner.ContainerConfig.Cmd = runner.ContainerRecord.Command
+	if runner.ContainerRecord.Cwd != "." {
+		runner.ContainerConfig.WorkingDir = runner.ContainerRecord.Cwd
 	}
-	for k, v := range this.ContainerRecord.Environment {
-		this.ContainerConfig.Env = append(this.ContainerConfig.Env, k+"="+v)
+	for k, v := range runner.ContainerRecord.Environment {
+		runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
 	}
-	this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
+	runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
 	if err != nil {
 		return
 	}
 	hostConfig := &dockerclient.HostConfig{}
 
-	this.CrunchLog.Printf("Starting Docker container id '%s'", this.ContainerId)
-	err = this.Docker.StartContainer(this.ContainerId, hostConfig)
+	runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+	err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
 	if err != nil {
 		return
 	}
@@ -180,126 +196,141 @@ func (this *ContainerRunner) StartContainer() (err error) {
 	return nil
 }
 
-func (this *ContainerRunner) AttachLogs() (err error) {
+// AttachLogs connects the docker container stdout and stderr logs to the
+// Arvados logger which logs to Keep and the API server logs table.
+func (runner *ContainerRunner) AttachLogs() (err error) {
 
-	this.CrunchLog.Print("Attaching container logs")
+	runner.CrunchLog.Print("Attaching container logs")
 
 	var stderrReader, stdoutReader io.Reader
-	stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true})
+	stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
 	if err != nil {
 		return
 	}
-	stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true})
+	stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
 	if err != nil {
 		return
 	}
 
-	this.loggingDone = make(chan bool)
+	runner.loggingDone = make(chan bool)
 
-	this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
-	this.Stderr = NewThrottledLogger(this.NewLogWriter("stderr"))
-	go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
-	go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
+	runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+	runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+	go CopyReaderToLog(stdoutReader, runner.Stdout.Logger, runner.loggingDone)
+	go CopyReaderToLog(stderrReader, runner.Stderr.Logger, runner.loggingDone)
 
 	return nil
 }
 
-func (this *ContainerRunner) WaitFinish() error {
-	result := this.Docker.Wait(this.ContainerId)
+// WaitFinish waits for the container to terminate, capture the exit code, and
+// close the stdout/stderr logging.
+func (runner *ContainerRunner) WaitFinish() error {
+	result := runner.Docker.Wait(runner.ContainerID)
 	wr := <-result
 	if wr.Error != nil {
 		return wr.Error
 	}
-	this.ExitCode = &wr.ExitCode
+	runner.ExitCode = &wr.ExitCode
 
 	// drain stdout/stderr
-	<-this.loggingDone
-	<-this.loggingDone
+	<-runner.loggingDone
+	<-runner.loggingDone
 
-	this.Stdout.Close()
-	this.Stderr.Close()
+	runner.Stdout.Close()
+	runner.Stderr.Close()
 
 	return nil
 }
 
-func (this *ContainerRunner) CommitLogs() error {
-	this.CrunchLog.Print(this.finalState)
-	this.CrunchLog.Close()
-	this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
+// CommitLogs posts the collection containing the final container logs.
+func (runner *ContainerRunner) CommitLogs() error {
+	runner.CrunchLog.Print(runner.finalState)
+	runner.CrunchLog.Close()
+
+	// Closing CrunchLog above allows it to be committed to Keep at this
+	// point, but re-open crunch log with ArvClient in case there are any
+	// other further (such as failing to write the log to Keep!) while
+	// shutting down
+	runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID,
 		"crunchexec", nil})
 
-	mt, err := this.LogCollection.ManifestText()
+	mt, err := runner.LogCollection.ManifestText()
 	if err != nil {
 		return err
 	}
 
 	response := make(map[string]string)
-	err = this.Api.Create("collections",
-		arvadosclient.Dict{"name": "logs for " + this.ContainerRecord.Uuid,
+	err = runner.ArvClient.Create("collections",
+		arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID,
 			"manifest_text": mt},
 		response)
 	if err != nil {
 		return err
 	}
 
-	this.LogsPDH = new(string)
-	*this.LogsPDH = response["portable_data_hash"]
+	runner.LogsPDH = new(string)
+	*runner.LogsPDH = response["portable_data_hash"]
 
 	return nil
 }
 
-func (this *ContainerRunner) UpdateContainerRecordRunning() error {
+// UpdateContainerRecordRunning updates the container state to "Running"
+func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
 	update := arvadosclient.Dict{"state": "Running"}
-	return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+	return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
 }
 
-func (this *ContainerRunner) UpdateContainerRecordComplete() error {
+// UpdateContainerRecordComplete updates the container record state on API
+// server to "Complete" or "Cancelled"
+func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
 	update := arvadosclient.Dict{}
-	if this.LogsPDH != nil {
-		update["log"] = *this.LogsPDH
+	if runner.LogsPDH != nil {
+		update["log"] = *runner.LogsPDH
 	}
-	if this.ExitCode != nil {
-		update["exit_code"] = *this.ExitCode
+	if runner.ExitCode != nil {
+		update["exit_code"] = *runner.ExitCode
 	}
 
-	update["state"] = this.finalState
+	update["state"] = runner.finalState
 
-	return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+	return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
 }
 
-func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-	return &ArvLogWriter{this.Api, this.ContainerRecord.Uuid, name, this.LogCollection.Open(name + ".txt")}
+// NewArvLogWriter creates an ArvLogWriter
+func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+	return &ArvLogWriter{runner.ArvClient, runner.ContainerRecord.UUID, name, runner.LogCollection.Open(name + ".txt")}
 }
 
-func (this *ContainerRunner) Run(containerUuid string) (err error) {
-	this.CrunchLog.Printf("Executing container '%s'", containerUuid)
+// Run the full container lifecycle.
+func (runner *ContainerRunner) Run(containerUUID string) (err error) {
+	runner.CrunchLog.Printf("Executing container '%s'", containerUUID)
 
 	var runerr, waiterr error
 
 	defer func() {
 		if err != nil {
-			this.CrunchLog.Print(err)
+			runner.CrunchLog.Print(err)
 		}
 
-		if this.Cancelled {
-			this.finalState = "Cancelled"
+		if runner.Cancelled {
+			runner.finalState = "Cancelled"
 		} else {
-			this.finalState = "Complete"
+			runner.finalState = "Complete"
 		}
 
 		// (6) write logs
-		logerr := this.CommitLogs()
+		logerr := runner.CommitLogs()
 		if logerr != nil {
-			this.CrunchLog.Print(logerr)
+			runner.CrunchLog.Print(logerr)
 		}
 
 		// (7) update container record with results
-		updateerr := this.UpdateContainerRecordComplete()
+		updateerr := runner.UpdateContainerRecordComplete()
 		if updateerr != nil {
-			this.CrunchLog.Print(updateerr)
+			runner.CrunchLog.Print(updateerr)
 		}
 
-		this.CrunchLog.Close()
+		runner.CrunchLog.Close()
 
 		if err == nil {
 			if runerr != nil {
@@ -314,25 +345,25 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 		}
 	}()
 
-	err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
+	err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord)
 	if err != nil {
 		return
 	}
 
 	// (0) setup signal handling
-	err = this.SetupSignals()
+	err = runner.SetupSignals()
 	if err != nil {
 		return
 	}
 
 	// (1) check for and/or load image
-	err = this.LoadImage()
+	err = runner.LoadImage()
 	if err != nil {
 		return
 	}
 
 	// (2) start container
-	err = this.StartContainer()
+	err = runner.StartContainer()
 	if err != nil {
 		if err == ErrCancelled {
 			err = nil
@@ -341,28 +372,29 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 	}
 
 	// (3) update container record state
-	err = this.UpdateContainerRecordRunning()
+	err = runner.UpdateContainerRecordRunning()
 	if err != nil {
-		this.CrunchLog.Print(err)
+		runner.CrunchLog.Print(err)
 	}
 
 	// (4) attach container logs
-	runerr = this.AttachLogs()
+	runerr = runner.AttachLogs()
 	if runerr != nil {
-		this.CrunchLog.Print(runerr)
+		runner.CrunchLog.Print(runerr)
 	}
 
 	// (5) wait for container to finish
-	waiterr = this.WaitFinish()
+	waiterr = runner.WaitFinish()
 
 	return
 }
 
+// NewContainerRunner creates a new container runner.
 func NewContainerRunner(api IArvadosClient,
 	kc IKeepClient,
 	docker ThinDockerClient) *ContainerRunner {
 
-	cr := &ContainerRunner{Api: api, Kc: kc, Docker: docker}
+	cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
 	cr.NewLogWriter = cr.NewArvLogWriter
 	cr.LogCollection = &CollectionWriter{kc, nil}
 	cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 7efdad8..baf67b2 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -390,7 +390,7 @@ func (s *TestSuite) TestCommitLogs(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
-	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
 	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
 	cr.CrunchLog.Print("Hello world!")
@@ -409,7 +409,7 @@ func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
-	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
 
 	err := cr.UpdateContainerRecordRunning()
 	c.Check(err, IsNil)
@@ -421,7 +421,7 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
-	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
 
 	cr.LogsPDH = new(string)
 	*cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
@@ -442,7 +442,7 @@ func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
-	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
 	cr.Cancelled = true
 	cr.finalState = "Cancelled"
 
@@ -610,7 +610,7 @@ func (s *TestSuite) TestCancel(c *C) {
 	cr := NewContainerRunner(api, &KeepTestClient{}, docker)
 
 	go func() {
-		for cr.ContainerId == "" {
+		for cr.ContainerID == "" {
 			time.Sleep(1 * time.Second)
 		}
 		cr.SigChan <- syscall.SIGINT
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index 1604445..3860484 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -3,7 +3,6 @@ package main
 import (
 	"bufio"
 	"bytes"
-	"errors"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"io"
@@ -12,6 +11,8 @@ import (
 	"time"
 )
 
+// Timestamper is the signature for a function that takes a timestamp and
+// return a formated string value.
 type Timestamper func(t time.Time) string
 
 // Logging plumbing:
@@ -36,61 +37,62 @@ type ThrottledLogger struct {
 	Timestamper
 }
 
-// Builtin RFC3339Nano format isn't fixed width so
-// provide our own with microsecond precision (same as API server).
+// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
+// because the RFC3339Nano format isn't fixed width.
 const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
 
+// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
 func RFC3339Timestamp(now time.Time) string {
 	return now.Format(RFC3339Fixed)
 }
 
 // Write to the internal buffer.  Prepend a timestamp to each line of the input
 // data.
-func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
-	this.Mutex.Lock()
-	if this.buf == nil {
-		this.buf = &bytes.Buffer{}
+func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
+	tl.Mutex.Lock()
+	if tl.buf == nil {
+		tl.buf = &bytes.Buffer{}
 	}
-	defer this.Mutex.Unlock()
+	defer tl.Mutex.Unlock()
 
-	now := this.Timestamper(time.Now().UTC())
+	now := tl.Timestamper(time.Now().UTC())
 	sc := bufio.NewScanner(bytes.NewBuffer(p))
 	for sc.Scan() {
-		_, err = fmt.Fprintf(this.buf, "%s %s\n", now, sc.Text())
+		_, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
 	}
 	return len(p), err
 }
 
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
-func (this *ThrottledLogger) flusher() {
+func (tl *ThrottledLogger) flusher() {
 	bufchan := make(chan *bytes.Buffer)
 	bufterm := make(chan bool)
 
 	// Use a separate goroutine for the actual write so that the writes are
 	// actually initiated closer every 1s instead of every
 	// 1s + (time to it takes to write).
-	go goWriter(this.writer, bufchan, bufterm)
+	go goWriter(tl.writer, bufchan, bufterm)
 	for {
-		if !this.stop {
+		if !tl.stop {
 			time.Sleep(1 * time.Second)
 		}
-		this.Mutex.Lock()
-		if this.buf != nil && this.buf.Len() > 0 {
-			oldbuf := this.buf
-			this.buf = nil
-			this.Mutex.Unlock()
+		tl.Mutex.Lock()
+		if tl.buf != nil && tl.buf.Len() > 0 {
+			oldbuf := tl.buf
+			tl.buf = nil
+			tl.Mutex.Unlock()
 			bufchan <- oldbuf
-		} else if this.stop {
-			this.Mutex.Unlock()
+		} else if tl.stop {
+			tl.Mutex.Unlock()
 			break
 		} else {
-			this.Mutex.Unlock()
+			tl.Mutex.Unlock()
 		}
 	}
 	close(bufchan)
 	<-bufterm
-	this.flusherDone <- true
+	tl.flusherDone <- true
 }
 
 // Receive buffers from a channel and send to the underlying Writer
@@ -101,19 +103,21 @@ func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
 	t <- true
 }
 
-// Stop the flusher goroutine and wait for it to complete, then close the
+// Close the flusher goroutine and wait for it to complete, then close the
 // underlying Writer.
-func (this *ThrottledLogger) Close() error {
-	this.stop = true
-	<-this.flusherDone
-	return this.writer.Close()
+func (tl *ThrottledLogger) Close() error {
+	tl.stop = true
+	<-tl.flusherDone
+	return tl.writer.Close()
 }
 
 const (
-	MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
+	// MaxLogLine is the maximum length of stdout/stderr lines before they are split.
+	MaxLogLine = 1 << 12
 )
 
-// Goroutine to copy from a reader to a logger, with long line splitting.
+// CopyReaderToLog reads from a Reader and prints to a Logger, with long line
+// splitting.
 func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
 	reader := bufio.NewReaderSize(in, MaxLogLine)
 	var prefix string
@@ -139,11 +143,10 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
 	done <- true
 }
 
-// Create a new thottled logger that
+// NewThrottledLogger creates a new thottled logger that
 // (a) prepends timestamps to each line
 // (b) batches log messages and only calls the underlying Writer at most once
 // per second.
-
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	alw := &ThrottledLogger{}
 	alw.flusherDone = make(chan bool)
@@ -154,40 +157,39 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	return alw
 }
 
-// Implements a writer that writes to each of a WriteCloser (typically
-// CollectionFileWriter) and creates an API server log entry.
+// ArvLogWriter implements a writer that writes to each of a WriteCloser
+// (typically CollectionFileWriter) and creates an API server log entry.
 type ArvLogWriter struct {
-	Api           IArvadosClient
-	Uuid          string
+	ArvClient     IArvadosClient
+	UUID          string
 	loggingStream string
 	writeCloser   io.WriteCloser
 }
 
-func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 	// Write to the next writer in the chain (a file in Keep)
 	var err1 error
-	if this.writeCloser != nil {
-		_, err1 = this.writeCloser.Write(p)
+	if arvlog.writeCloser != nil {
+		_, err1 = arvlog.writeCloser.Write(p)
 	}
 
 	// write to API
-	lr := arvadosclient.Dict{"object_uuid": this.Uuid,
-		"event_type": this.loggingStream,
+	lr := arvadosclient.Dict{"object_uuid": arvlog.UUID,
+		"event_type": arvlog.loggingStream,
 		"properties": map[string]string{"text": string(p)}}
-	err2 := this.Api.Create("logs", lr, nil)
+	err2 := arvlog.ArvClient.Create("logs", lr, nil)
 
 	if err1 != nil || err2 != nil {
-		return 0, errors.New(fmt.Sprintf("%s ; %s", err1, err2))
-	} else {
-		return len(p), nil
+		return 0, fmt.Errorf("%s ; %s", err1, err2)
 	}
-
+	return len(p), nil
 }
 
-func (this *ArvLogWriter) Close() (err error) {
-	if this.writeCloser != nil {
-		err = this.writeCloser.Close()
-		this.writeCloser = nil
+// Close the underlying writer
+func (arvlog *ArvLogWriter) Close() (err error) {
+	if arvlog.writeCloser != nil {
+		err = arvlog.writeCloser.Close()
+		arvlog.writeCloser = nil
 	}
 	return err
 }
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
index 8d2d802..4a2693a 100644
--- a/services/crunch-exec/upload.go
+++ b/services/crunch-exec/upload.go
@@ -20,11 +20,13 @@ import (
 	"strings"
 )
 
+// Block is a data block in a manifest stream
 type Block struct {
 	data   []byte
 	offset int64
 }
 
+// CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
 type CollectionFileWriter struct {
 	IKeepClient
 	*manifest.ManifestStream
@@ -36,11 +38,13 @@ type CollectionFileWriter struct {
 	fn       string
 }
 
+// Write to a file in a keep collection
 func (m *CollectionFileWriter) Write(p []byte) (int, error) {
 	n, err := m.ReadFrom(bytes.NewReader(p))
 	return int(n), err
 }
 
+// ReadFrom a Reader and write to the Keep collection file.
 func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
 	var total int64
 	var count int
@@ -62,11 +66,11 @@ func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
 
 	if err == io.EOF {
 		return total, nil
-	} else {
-		return total, err
 	}
+	return total, err
 }
 
+// Close stops writing a file and adds it to the parent manifest.
 func (m *CollectionFileWriter) Close() error {
 	m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
 		manifest.FileStreamSegment{m.offset, m.length, m.fn})
@@ -89,11 +93,14 @@ func (m *CollectionFileWriter) goUpload() {
 	finish <- errors
 }
 
+// CollectionWriter makes implements creating new Keep collections by opening files
+// and writing to them.
 type CollectionWriter struct {
 	IKeepClient
 	Streams []*CollectionFileWriter
 }
 
+// Open a new file for writing in the Keep collection.
 func (m *CollectionWriter) Open(path string) io.WriteCloser {
 	var dir string
 	var fn string
@@ -123,6 +130,7 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
 	return fw
 }
 
+// Finish writing the collection, wait for all blocks to complete uploading.
 func (m *CollectionWriter) Finish() error {
 	var errstring string
 	for _, stream := range m.Streams {
@@ -145,11 +153,13 @@ func (m *CollectionWriter) Finish() error {
 	}
 	if errstring != "" {
 		return errors.New(errstring)
-	} else {
-		return nil
 	}
+	return nil
 }
 
+// ManifestText returns the manifest text of the collection.  Calls Finish()
+// first to ensure that all blocks are written and that signed locators and
+// available.
 func (m *CollectionWriter) ManifestText() (mt string, err error) {
 	err = m.Finish()
 	if err != nil {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list