[ARVADOS] updated: 1648dde3954d95475035217ced41adc058b2da43
git at public.curoverse.com
git at public.curoverse.com
Tue Dec 29 00:04:06 EST 2015
Summary of changes:
services/crunch-exec/crunchexec.go | 21 ++--
services/crunch-exec/crunchexec_test.go | 2 +-
services/crunch-exec/logging.go | 51 +++++++--
.../crunch-exec}/upload.go | 121 +++++++--------------
4 files changed, 95 insertions(+), 100 deletions(-)
copy {sdk/go/crunchrunner => services/crunch-exec}/upload.go (56%)
via 1648dde3954d95475035217ced41adc058b2da43 (commit)
from d77b8bd4cc7e22d1892a270704a0254e93b9e284 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1648dde3954d95475035217ced41adc058b2da43
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Dec 29 00:03:49 2015 -0500
7816: Write logs to keep work in progress.
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 48558b0..72fe7ec 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -67,6 +67,7 @@ type ContainerRunner struct {
CrunchLog *ThrottledLogger
Stdout *ThrottledLogger
Stderr *ThrottledLogger
+ Logs *ManifestWriter
}
func (this *ContainerRunner) setupMonitoring() error {
@@ -153,11 +154,11 @@ func (this *ContainerRunner) AttachLogs() (err error) {
this.CrunchLog.Print("Attaching container logs")
var stderrReader, stdoutReader io.Reader
- stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true, Timestamps: true})
+ stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true})
if err != nil {
return
}
- stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true, Timestamps: true})
+ stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true})
if err != nil {
return
}
@@ -180,6 +181,7 @@ func (this *ContainerRunner) WaitFinish() error {
}
this.ExitCode = wr.ExitCode
+ // drain stdout/stderr
<-this.loggingDone
<-this.loggingDone
@@ -198,7 +200,7 @@ func (this *ContainerRunner) UpdateContainerRecord() error {
}
func (this *ContainerRunner) NewArvLogWriter(name string) io.Writer {
- return &ArvLogWriter{this.Api, this.Kc, name}
+ return &ArvLogWriter{this, this.ManifestWriter.Open(name + ".txt"), name}
}
func (this *ContainerRunner) Run(containerUuid string) (err error) {
@@ -248,29 +250,32 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
return
}
-func NewContainerRunner() *ContainerRunner {
+func NewContainerRunner(kc IKeepClient) *ContainerRunner {
cr := &ContainerRunner{}
cr.NewLogWriter = cr.NewArvLogWriter
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
+ cr.ManifestWriter = &ManifestWriter{kc, nil}
return cr
}
func main() {
flag.Parse()
- cr := NewContainerRunner()
-
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Fatal(err)
}
- cr.Api = api
- cr.Kc, err = keepclient.MakeKeepClient(&api)
+ var kc IKeepClient
+ kc, err = keepclient.MakeKeepClient(&api)
if err != nil {
log.Fatal(err)
}
+ cr := NewContainerRunner(kc)
+ cr.Api = api
+ cr.Kc = kc
+
cr.Docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
if err != nil {
log.Fatal(err)
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 01eeea0..ec1a271 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -247,7 +247,7 @@ func (s *TestSuite) TestRunContainer(c *C) {
err = cr.StartContainer()
c.Check(err, IsNil)
- err = cr.GetLogs()
+ err = cr.AttachLogs()
c.Check(err, IsNil)
err = cr.WaitFinish()
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index 4f03433..52e343b 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -3,6 +3,7 @@ package main
import (
"bufio"
"bytes"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"io"
"log"
"sync"
@@ -11,39 +12,56 @@ import (
type ThrottledLogger struct {
*log.Logger
- buf bytes.Buffer
+ buf *bytes.Buffer
sync.Mutex
- writer io.Writer
+ writer io.WriteCloser
stop bool
flusherDone chan bool
}
func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
this.Mutex.Lock()
+ if this.buf == nil {
+ this.buf = &bytes.Buffer{}
+ }
defer this.Mutex.Unlock()
+ // XXX write timestamp + p
return this.buf.Write(p)
}
func (this *ThrottledLogger) Stop() {
this.stop = true
<-this.flusherDone
+ this.writer.Close()
+}
+
+func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
+ for b := range c {
+ writer.Write(b.Bytes())
+ }
+ t <- true
}
-func (this *ThrottledLogger) Flusher() {
+func (this *ThrottledLogger) flusher() {
+ bufchan := make(chan *bytes.Buffer)
+ bufterm := make(chan bool)
+ go goWriter(this.writer, bufchan, bufterm)
for {
if !this.stop {
time.Sleep(1 * time.Second)
}
this.Mutex.Lock()
- if this.buf.Len() > 0 {
- this.writer.Write(this.buf.Bytes())
- this.buf.Reset()
+ if this.buf != nil && this.buf.Len() > 0 {
+ bufchan <- this.buf
+ this.buf = nil
} else if this.stop {
this.Mutex.Unlock()
break
}
this.Mutex.Unlock()
}
+ close(bufchan)
+ <-bufterm
this.flusherDone <- true
}
@@ -76,23 +94,34 @@ func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
done <- true
}
-func NewThrottledLogger(writer io.Writer) *ThrottledLogger {
+func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
alw := &ThrottledLogger{}
alw.flusherDone = make(chan bool)
alw.writer = writer
alw.Logger = log.New(alw, "", 0)
- go alw.Flusher()
+ go alw.flusher()
return alw
}
type ArvLogWriter struct {
- Api IArvadosClient
- Kc IKeepClient
+ *ContainerRunner
+ io.WriteCloser
loggingStream string
}
func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
// write to API
+ lr := arvadosclient.Dict{"object_uuid": this.ContainerRecord.Uuid,
+ "event_type": this.loggingStream,
+ "properties": map[string]string{"text": string(p)}}
+ err = this.Api.Create("logs", lr, nil)
+
// write to Keep
- return 0, nil
+ err = this.WriteCloser.Write(p)
+
+ return len(p), nil
+}
+
+func (this *ArvLogWriter) Close() (err error) {
+ this.WriteCloser.Close()
}
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
new file mode 100644
index 0000000..44c7213
--- /dev/null
+++ b/services/crunch-exec/upload.go
@@ -0,0 +1,178 @@
+package main
+
+import (
+ "bytes"
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "io"
+ "log"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+)
+
+type Block struct {
+ data []byte
+ offset int64
+}
+
+type ManifestFileWriter struct {
+ IKeepClient
+ *manifest.ManifestStream
+ offset int64
+ *Block
+ uploader chan *Block
+ finish chan []error
+ fn string
+}
+
+type IKeepClient interface {
+ PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestFileWriter) Write(p []byte) (int, error) {
+ n, err := m.ReadFrom(bytes.NewReader(p))
+ return int(n), err
+}
+
+func (m *ManifestFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
+ var total int64
+ var count int
+
+ for err == nil {
+ if m.Block == nil {
+ m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+ }
+ count, err = r.Read(m.Block.data[m.Block.offset:])
+ total += int64(count)
+ m.Block.offset += int64(count)
+ if m.Block.offset == keepclient.BLOCKSIZE {
+ m.uploader <- m.Block
+ m.Block = nil
+ }
+ }
+
+ if err == io.EOF {
+ return total, nil
+ } else {
+ return total, err
+ }
+}
+
+func (m *ManifestFileWriter) Close() error {
+ m.ManifestStream.FileTokens = append(m.ManifestStream.FileTokens,
+ manifest.FileToken{0, m.offset, m.fn})
+ return nil
+}
+
+func (m *ManifestFileWriter) goUpload() {
+ var errors []error
+ uploader := m.uploader
+ finish := m.finish
+ for block := range uploader {
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
+ }
+ finish <- errors
+}
+
+type ManifestWriter struct {
+ IKeepClient
+ Streams []*ManifestFileWriter
+}
+
+func (m *ManifestWriter) Open(path string) io.WriteCloser {
+ var dir string
+ var fn string
+
+ i := strings.Index(path, "/")
+ if i > -1 {
+ dir = "./" + path[0:i]
+ fn = path[i+1:]
+ } else {
+ dir = "."
+ fn = path
+ }
+
+ fw := &ManifestFileWriter{
+ m.IKeepClient,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ nil,
+ make(chan *Block),
+ make(chan []error),
+ fn}
+ go fw.goUpload()
+
+ m.Streams = append(m.Streams, fw)
+
+ return fw
+}
+
+func (m *ManifestWriter) Finish() error {
+ var errstring string
+ for _, stream := range m.Streams {
+ if stream.uploader == nil {
+ continue
+ }
+ if stream.Block != nil {
+ stream.uploader <- stream.Block
+ }
+ close(stream.uploader)
+ stream.uploader = nil
+
+ errors := <-stream.finish
+ close(stream.finish)
+ stream.finish = nil
+
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+ }
+ }
+ if errstring != "" {
+ return errors.New(errstring)
+ } else {
+ return nil
+ }
+}
+
+func (m *ManifestWriter) ManifestText() (mt string, err error) {
+ err = m.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ var buf bytes.Buffer
+
+ for _, v := range m.Streams {
+ k := v.StreamName
+ if k == "." {
+ buf.WriteString(".")
+ } else {
+ k = strings.Replace(k, " ", "\\040", -1)
+ k = strings.Replace(k, "\n", "", -1)
+ buf.WriteString("./" + k)
+ }
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ for _, f := range v.FileTokens {
+ buf.WriteString(" ")
+ f = strings.Replace(f, " ", "\\040", -1)
+ f = strings.Replace(f, "\n", "", -1)
+ buf.WriteString(f)
+ }
+ buf.WriteString("\n")
+ }
+ return buf.String(), nil
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list