[ARVADOS] updated: 1648dde3954d95475035217ced41adc058b2da43

git at public.curoverse.com git at public.curoverse.com
Tue Dec 29 00:04:06 EST 2015


Summary of changes:
 services/crunch-exec/crunchexec.go                 |  21 ++--
 services/crunch-exec/crunchexec_test.go            |   2 +-
 services/crunch-exec/logging.go                    |  51 +++++++--
 .../crunch-exec}/upload.go                         | 121 +++++++--------------
 4 files changed, 95 insertions(+), 100 deletions(-)
 copy {sdk/go/crunchrunner => services/crunch-exec}/upload.go (56%)

       via  1648dde3954d95475035217ced41adc058b2da43 (commit)
      from  d77b8bd4cc7e22d1892a270704a0254e93b9e284 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1648dde3954d95475035217ced41adc058b2da43
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Dec 29 00:03:49 2015 -0500

    7816: Write logs to keep work in progress.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 48558b0..72fe7ec 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -67,6 +67,7 @@ type ContainerRunner struct {
 	CrunchLog   *ThrottledLogger
 	Stdout      *ThrottledLogger
 	Stderr      *ThrottledLogger
+	Logs        *ManifestWriter
 }
 
 func (this *ContainerRunner) setupMonitoring() error {
@@ -153,11 +154,11 @@ func (this *ContainerRunner) AttachLogs() (err error) {
 	this.CrunchLog.Print("Attaching container logs")
 
 	var stderrReader, stdoutReader io.Reader
-	stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true, Timestamps: true})
+	stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true})
 	if err != nil {
 		return
 	}
-	stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true, Timestamps: true})
+	stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true})
 	if err != nil {
 		return
 	}
@@ -180,6 +181,7 @@ func (this *ContainerRunner) WaitFinish() error {
 	}
 	this.ExitCode = wr.ExitCode
 
+	// drain stdout/stderr
 	<-this.loggingDone
 	<-this.loggingDone
 
@@ -198,7 +200,7 @@ func (this *ContainerRunner) UpdateContainerRecord() error {
 }
 
 func (this *ContainerRunner) NewArvLogWriter(name string) io.Writer {
-	return &ArvLogWriter{this.Api, this.Kc, name}
+	return &ArvLogWriter{this, this.ManifestWriter.Open(name + ".txt"), name}
 }
 
 func (this *ContainerRunner) Run(containerUuid string) (err error) {
@@ -248,29 +250,32 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 	return
 }
 
-func NewContainerRunner() *ContainerRunner {
+func NewContainerRunner(kc IKeepClient) *ContainerRunner {
 	cr := &ContainerRunner{}
 	cr.NewLogWriter = cr.NewArvLogWriter
 	cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
+	cr.ManifestWriter = &ManifestWriter{kc, nil}
 	return cr
 }
 
 func main() {
 	flag.Parse()
 
-	cr := NewContainerRunner()
-
 	api, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
 		log.Fatal(err)
 	}
-	cr.Api = api
 
-	cr.Kc, err = keepclient.MakeKeepClient(&api)
+	var kc IKeepClient
+	kc, err = keepclient.MakeKeepClient(&api)
 	if err != nil {
 		log.Fatal(err)
 	}
 
+	cr := NewContainerRunner(kc)
+	cr.Api = api
+	cr.Kc = kc
+
 	cr.Docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
 	if err != nil {
 		log.Fatal(err)
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 01eeea0..ec1a271 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -247,7 +247,7 @@ func (s *TestSuite) TestRunContainer(c *C) {
 	err = cr.StartContainer()
 	c.Check(err, IsNil)
 
-	err = cr.GetLogs()
+	err = cr.AttachLogs()
 	c.Check(err, IsNil)
 
 	err = cr.WaitFinish()
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index 4f03433..52e343b 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -3,6 +3,7 @@ package main
 import (
 	"bufio"
 	"bytes"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"io"
 	"log"
 	"sync"
@@ -11,39 +12,56 @@ import (
 
 type ThrottledLogger struct {
 	*log.Logger
-	buf bytes.Buffer
+	buf *bytes.Buffer
 	sync.Mutex
-	writer      io.Writer
+	writer      io.WriteCloser
 	stop        bool
 	flusherDone chan bool
 }
 
 func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
 	this.Mutex.Lock()
+	if this.buf == nil {
+		this.buf = &bytes.Buffer{}
+	}
 	defer this.Mutex.Unlock()
+	// XXX write timestamp + p
 	return this.buf.Write(p)
 }
 
 func (this *ThrottledLogger) Stop() {
 	this.stop = true
 	<-this.flusherDone
+	this.writer.Close()
+}
+
+func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
+	for b := range c {
+		writer.Write(b.Bytes())
+	}
+	t <- true
 }
 
-func (this *ThrottledLogger) Flusher() {
+func (this *ThrottledLogger) flusher() {
+	bufchan := make(chan *bytes.Buffer)
+	bufterm := make(chan bool)
+	go goWriter(this.writer, bufchan, bufterm)
 	for {
 		if !this.stop {
 			time.Sleep(1 * time.Second)
 		}
 		this.Mutex.Lock()
-		if this.buf.Len() > 0 {
-			this.writer.Write(this.buf.Bytes())
-			this.buf.Reset()
+		if this.buf != nil && this.buf.Len() > 0 {
+			bufchan <- this.buf
+			this.buf = nil
 		} else if this.stop {
 			this.Mutex.Unlock()
 			break
 		}
 		this.Mutex.Unlock()
 	}
+	close(bufchan)
+	<-bufterm
 	this.flusherDone <- true
 }
 
@@ -76,23 +94,34 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
 	done <- true
 }
 
-func NewThrottledLogger(writer io.Writer) *ThrottledLogger {
+func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	alw := &ThrottledLogger{}
 	alw.flusherDone = make(chan bool)
 	alw.writer = writer
 	alw.Logger = log.New(alw, "", 0)
-	go alw.Flusher()
+	go alw.flusher()
 	return alw
 }
 
 type ArvLogWriter struct {
-	Api           IArvadosClient
-	Kc            IKeepClient
+	*ContainerRunner
+	io.WriteCloser
 	loggingStream string
 }
 
 func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
 	// write to API
+	lr := arvadosclient.Dict{"object_uuid": this.ContainerRecord.Uuid,
+		"event_type": this.loggingStream,
+		"properties": map[string]string{"text": string(p)}}
+	err = this.Api.Create("logs", lr, nil)
+
 	// write to Keep
-	return 0, nil
+	err = this.WriteCloser.Write(p)
+
+	return len(p), nil
+}
+
+func (this *ArvLogWriter) Close() (err error) {
+	this.WriteCloser.Close()
 }
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
new file mode 100644
index 0000000..44c7213
--- /dev/null
+++ b/services/crunch-exec/upload.go
@@ -0,0 +1,178 @@
+package main
+
+import (
+	"bytes"
+	"crypto/md5"
+	"errors"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"io"
+	"log"
+	"os"
+	"path/filepath"
+	"sort"
+	"strings"
+)
+
+type Block struct {
+	data   []byte
+	offset int64
+}
+
+type ManifestFileWriter struct {
+	IKeepClient
+	*manifest.ManifestStream
+	offset int64
+	*Block
+	uploader chan *Block
+	finish   chan []error
+	fn       string
+}
+
+type IKeepClient interface {
+	PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestFileWriter) Write(p []byte) (int, error) {
+	n, err := m.ReadFrom(bytes.NewReader(p))
+	return int(n), err
+}
+
+func (m *ManifestFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
+	var total int64
+	var count int
+
+	for err == nil {
+		if m.Block == nil {
+			m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+		}
+		count, err = r.Read(m.Block.data[m.Block.offset:])
+		total += int64(count)
+		m.Block.offset += int64(count)
+		if m.Block.offset == keepclient.BLOCKSIZE {
+			m.uploader <- m.Block
+			m.Block = nil
+		}
+	}
+
+	if err == io.EOF {
+		return total, nil
+	} else {
+		return total, err
+	}
+}
+
+func (m *ManifestFileWriter) Close() error {
+	m.ManifestStream.FileTokens = append(m.ManifestStream.FileTokens,
+		manifest.FileToken{0, m.offset, m.fn})
+	return nil
+}
+
+func (m *ManifestFileWriter) goUpload() {
+	var errors []error
+	uploader := m.uploader
+	finish := m.finish
+	for block := range uploader {
+		hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+		signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+		if err != nil {
+			errors = append(errors, err)
+		} else {
+			m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+		}
+	}
+	finish <- errors
+}
+
+type ManifestWriter struct {
+	IKeepClient
+	Streams []*ManifestFileWriter
+}
+
+func (m *ManifestWriter) Open(path string) io.WriteCloser {
+	var dir string
+	var fn string
+
+	i := strings.Index(path, "/")
+	if i > -1 {
+		dir = "./" + path[0:i]
+		fn = path[i+1:]
+	} else {
+		dir = "."
+		fn = path
+	}
+
+	fw := &ManifestFileWriter{
+		m.IKeepClient,
+		&manifest.ManifestStream{StreamName: dir},
+		0,
+		nil,
+		make(chan *Block),
+		make(chan []error),
+		fn}
+	go fw.goUpload()
+
+	m.Streams = append(m.Streams, fw)
+
+	return fw
+}
+
+func (m *ManifestWriter) Finish() error {
+	var errstring string
+	for _, stream := range m.Streams {
+		if stream.uploader == nil {
+			continue
+		}
+		if stream.Block != nil {
+			stream.uploader <- stream.Block
+		}
+		close(stream.uploader)
+		stream.uploader = nil
+
+		errors := <-stream.finish
+		close(stream.finish)
+		stream.finish = nil
+
+		for _, r := range errors {
+			errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+		}
+	}
+	if errstring != "" {
+		return errors.New(errstring)
+	} else {
+		return nil
+	}
+}
+
+func (m *ManifestWriter) ManifestText() (mt string, err error) {
+	err = m.Finish()
+	if err != nil {
+		return "", err
+	}
+
+	var buf bytes.Buffer
+
+	for _, v := range m.Streams {
+		k := v.StreamName
+		if k == "." {
+			buf.WriteString(".")
+		} else {
+			k = strings.Replace(k, " ", "\\040", -1)
+			k = strings.Replace(k, "\n", "", -1)
+			buf.WriteString("./" + k)
+		}
+		for _, b := range v.Blocks {
+			buf.WriteString(" ")
+			buf.WriteString(b)
+		}
+		for _, f := range v.FileTokens {
+			buf.WriteString(" ")
+			f = strings.Replace(f, " ", "\\040", -1)
+			f = strings.Replace(f, "\n", "", -1)
+			buf.WriteString(f)
+		}
+		buf.WriteString("\n")
+	}
+	return buf.String(), nil
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list