[ARVADOS] updated: abbee001eff213d194978a67ee745402ab7ccc45

git at public.curoverse.com git at public.curoverse.com
Tue Dec 29 12:07:43 EST 2015


Summary of changes:
 services/crunch-exec/crunchexec.go      |  70 +++++++++-------
 services/crunch-exec/crunchexec_test.go |  57 ++++++-------
 services/crunch-exec/logging.go         |  44 +++++++---
 services/crunch-exec/logging_test.go    | 144 ++++++++++++++++++++++++++++++++
 services/crunch-exec/upload.go          |  44 +++++-----
 5 files changed, 267 insertions(+), 92 deletions(-)
 create mode 100644 services/crunch-exec/logging_test.go

       via  abbee001eff213d194978a67ee745402ab7ccc45 (commit)
      from  1648dde3954d95475035217ced41adc058b2da43 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit abbee001eff213d194978a67ee745402ab7ccc45
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Dec 29 12:07:37 2015 -0500

    7816: More work + tests for logging

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 72fe7ec..525fa4c 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -1,9 +1,8 @@
 package main
 
 import (
-	"flag"
-	//"fmt"
 	"errors"
+	"flag"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
@@ -11,7 +10,6 @@ import (
 	"io"
 	"log"
 	"os"
-	//"os/exec"
 	"os/signal"
 	"strings"
 	"syscall"
@@ -47,12 +45,7 @@ type ContainerRecord struct {
 	State              string            `json:"state"`
 }
 
-// Callback used to listen to Docker's events
-func dockerEvent(event *dockerclient.Event, ec chan error, args ...interface{}) {
-	log.Printf("Received event: %#v\n", *event)
-}
-
-type NewLogWriter func(name string) io.Writer
+type NewLogWriter func(name string) io.WriteCloser
 
 type ContainerRunner struct {
 	Docker *dockerclient.DockerClient
@@ -63,16 +56,15 @@ type ContainerRunner struct {
 	ContainerId string
 	ExitCode    int
 	NewLogWriter
-	loggingDone chan bool
-	CrunchLog   *ThrottledLogger
-	Stdout      *ThrottledLogger
-	Stderr      *ThrottledLogger
-	Logs        *ManifestWriter
+	loggingDone   chan bool
+	CrunchLog     *ThrottledLogger
+	Stdout        *ThrottledLogger
+	Stderr        *ThrottledLogger
+	LogCollection *CollectionWriter
+	LogsPDH       string
 }
 
 func (this *ContainerRunner) setupMonitoring() error {
-	this.Docker.StartMonitorEvents(dockerEvent, nil)
-
 	sigChan := make(chan os.Signal, 1)
 	signal.Notify(sigChan, syscall.SIGTERM)
 	signal.Notify(sigChan, syscall.SIGINT)
@@ -192,6 +184,27 @@ func (this *ContainerRunner) WaitFinish() error {
 }
 
 func (this *ContainerRunner) CommitLogs() error {
+	this.CrunchLog.Stop()
+	this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
+		"crunchexec", nil})
+
+	mt, err := this.LogCollection.ManifestText()
+	if err != nil {
+		this.CrunchLog.Print(err)
+		return err
+	}
+
+	var response map[string]string
+	err = this.Api.Create("collections",
+		arvadosclient.Dict{"name": "logs for " + this.ContainerRecord.Uuid,
+			"manifest_text": mt}, &response)
+	if err != nil {
+		this.CrunchLog.Print(err)
+		return err
+	}
+
+	this.LogsPDH = response["portable_data_hash"]
+
 	return nil
 }
 
@@ -199,15 +212,12 @@ func (this *ContainerRunner) UpdateContainerRecord() error {
 	return nil
 }
 
-func (this *ContainerRunner) NewArvLogWriter(name string) io.Writer {
-	return &ArvLogWriter{this, this.ManifestWriter.Open(name + ".txt"), name}
+func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+	return &ArvLogWriter{this.Api, this.ContainerRecord.Uuid, name, this.LogCollection.Open(name + ".txt")}
 }
 
 func (this *ContainerRunner) Run(containerUuid string) (err error) {
 
-	this.NewLogWriter = this.NewArvLogWriter
-	this.CrunchLog = NewThrottledLogger(this.NewLogWriter("crunchexec"))
-
 	err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
 	if err != nil {
 		this.CrunchLog.Print(err)
@@ -250,11 +260,14 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 	return
 }
 
-func NewContainerRunner(kc IKeepClient) *ContainerRunner {
-	cr := &ContainerRunner{}
+func NewContainerRunner(api IArvadosClient,
+	kc IKeepClient,
+	docker *dockerclient.DockerClient) *ContainerRunner {
+
+	cr := &ContainerRunner{Api: api, Kc: kc, Docker: docker}
 	cr.NewLogWriter = cr.NewArvLogWriter
+	cr.LogCollection = &CollectionWriter{kc, nil}
 	cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
-	cr.ManifestWriter = &ManifestWriter{kc, nil}
 	return cr
 }
 
@@ -272,15 +285,14 @@ func main() {
 		log.Fatal(err)
 	}
 
-	cr := NewContainerRunner(kc)
-	cr.Api = api
-	cr.Kc = kc
-
-	cr.Docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	var docker *dockerclient.DockerClient
+	docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
 	if err != nil {
 		log.Fatal(err)
 	}
 
+	cr := NewContainerRunner(api, kc, docker)
+
 	err = cr.Run(flag.Arg(0))
 	if err != nil {
 		log.Fatal(err)
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index ec1a271..9112ebb 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -10,6 +10,7 @@ import (
 	. "gopkg.in/check.v1"
 	"io"
 	"os"
+	"strings"
 	"testing"
 )
 
@@ -81,18 +82,15 @@ func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename str
 }
 
 func (s *TestSuite) TestLoadImage(c *C) {
-	cr := NewContainerRunner()
-	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	kc := &KeepTestClient{}
+	docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(ArvTestClient{}, kc, docker)
 
-	var err error
 	_, err = cr.Docker.RemoveImage(busyboxImageId, true)
 
 	_, err = cr.Docker.InspectImage(busyboxImageId)
 	c.Check(err, NotNil)
 
-	kc := &KeepTestClient{}
-	cr.Api = ArvTestClient{}
-	cr.Kc = kc
 	cr.ContainerRecord.ContainerImage = busyboxPDH
 
 	// (1) Test loading image from keep
@@ -173,9 +171,7 @@ func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, file
 
 func (s *TestSuite) TestLoadImageArvError(c *C) {
 	// (1) Arvados error
-	cr := NewContainerRunner()
-	cr.Api = ArvErrorTestClient{}
-	cr.Kc = &KeepTestClient{}
+	cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil)
 	cr.ContainerRecord.ContainerImage = busyboxPDH
 
 	err := cr.LoadImage()
@@ -184,10 +180,8 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
 	// (2) Keep error
-	cr := NewContainerRunner()
-	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
-	cr.Api = &ArvTestClient{}
-	cr.Kc = KeepErrorTestClient{}
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker)
 	cr.ContainerRecord.ContainerImage = busyboxPDH
 
 	err := cr.LoadImage()
@@ -196,9 +190,7 @@ func (s *TestSuite) TestLoadImageKeepError(c *C) {
 
 func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 	// (3) Collection doesn't contain image
-	cr := NewContainerRunner()
-	cr.Api = &ArvTestClient{}
-	cr.Kc = KeepErrorTestClient{}
+	cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil)
 	cr.ContainerRecord.ContainerImage = otherPDH
 
 	err := cr.LoadImage()
@@ -207,22 +199,32 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
 	// (4) Collection doesn't contain image
-	cr := NewContainerRunner()
-	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
-	cr.Api = &ArvTestClient{}
-	cr.Kc = KeepReadErrorTestClient{}
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker)
 	cr.ContainerRecord.ContainerImage = busyboxPDH
 
 	err := cr.LoadImage()
 	c.Check(err, NotNil)
 }
 
+type ClosableBuffer struct {
+	bytes.Buffer
+}
+
 type TestLogs struct {
-	Stdout bytes.Buffer
-	Stderr bytes.Buffer
+	Stdout ClosableBuffer
+	Stderr ClosableBuffer
+}
+
+func (this *ClosableBuffer) Write(p []byte) (n int, err error) {
+	return this.Buffer.Write(p)
 }
 
-func (this *TestLogs) NewTestLoggingWriter(logstr string) io.Writer {
+func (this *ClosableBuffer) Close() error {
+	return nil
+}
+
+func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
 	if logstr == "stdout" {
 		return &this.Stdout
 	}
@@ -233,10 +235,9 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.Writer {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-	cr := NewContainerRunner()
-	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
-	cr.Api = ArvTestClient{}
-	cr.Kc = &KeepTestClient{}
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(ArvTestClient{}, &KeepTestClient{}, docker)
+
 	var logs TestLogs
 	cr.NewLogWriter = logs.NewTestLoggingWriter
 	cr.ContainerRecord.ContainerImage = busyboxPDH
@@ -253,6 +254,6 @@ func (s *TestSuite) TestRunContainer(c *C) {
 	err = cr.WaitFinish()
 	c.Check(err, IsNil)
 
-	c.Check(logs.Stdout.String()[39:], Equals, "Hello world\n")
+	c.Check(strings.HasSuffix(logs.Stdout.String(), "Hello world\n"), Equals, true)
 	c.Check(logs.Stderr.String(), Equals, "")
 }
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index 52e343b..d5042be 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -3,6 +3,8 @@ package main
 import (
 	"bufio"
 	"bytes"
+	"errors"
+	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"io"
 	"log"
@@ -10,6 +12,8 @@ import (
 	"time"
 )
 
+type Timestamper func(t time.Time) string
+
 type ThrottledLogger struct {
 	*log.Logger
 	buf *bytes.Buffer
@@ -17,6 +21,11 @@ type ThrottledLogger struct {
 	writer      io.WriteCloser
 	stop        bool
 	flusherDone chan bool
+	Timestamper
+}
+
+func RFC3339Timestamp(now time.Time) string {
+	return now.Format(time.RFC3339Nano)
 }
 
 func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
@@ -25,8 +34,10 @@ func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
 		this.buf = &bytes.Buffer{}
 	}
 	defer this.Mutex.Unlock()
-	// XXX write timestamp + p
-	return this.buf.Write(p)
+
+	now := time.Now().UTC()
+	_, err = fmt.Fprintf(this.buf, "%s %s", this.Timestamper(now), p)
+	return len(p), err
 }
 
 func (this *ThrottledLogger) Stop() {
@@ -66,7 +77,7 @@ func (this *ThrottledLogger) flusher() {
 }
 
 const (
-	MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
+	MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
 )
 
 func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
@@ -99,29 +110,40 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 	alw.flusherDone = make(chan bool)
 	alw.writer = writer
 	alw.Logger = log.New(alw, "", 0)
+	alw.Timestamper = RFC3339Timestamp
 	go alw.flusher()
 	return alw
 }
 
 type ArvLogWriter struct {
-	*ContainerRunner
-	io.WriteCloser
+	Api           IArvadosClient
+	Uuid          string
 	loggingStream string
+	io.WriteCloser
 }
 
 func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+	var err1 error
+	if this.WriteCloser != nil {
+		_, err1 = this.WriteCloser.Write(p)
+	}
+
 	// write to API
-	lr := arvadosclient.Dict{"object_uuid": this.ContainerRecord.Uuid,
+	lr := arvadosclient.Dict{"object_uuid": this.Uuid,
 		"event_type": this.loggingStream,
 		"properties": map[string]string{"text": string(p)}}
-	err = this.Api.Create("logs", lr, nil)
+	err2 := this.Api.Create("logs", lr, nil)
 
-	// write to Keep
-	err = this.WriteCloser.Write(p)
+	if err1 != nil || err2 != nil {
+		return 0, errors.New(fmt.Sprintf("%s ; %s", err1, err2))
+	} else {
+		return len(p), nil
+	}
 
-	return len(p), nil
 }
 
 func (this *ArvLogWriter) Close() (err error) {
-	this.WriteCloser.Close()
+	err = this.WriteCloser.Close()
+	this.WriteCloser = nil
+	return err
 }
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
new file mode 100644
index 0000000..bac36f5
--- /dev/null
+++ b/services/crunch-exec/logging_test.go
@@ -0,0 +1,144 @@
+package main
+
+import (
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	. "gopkg.in/check.v1"
+	"testing"
+	"time"
+)
+
+// Gocheck boilerplate
+func Test2(t *testing.T) {
+	TestingT(t)
+}
+
+type LoggingTestSuite struct{}
+
+type LoggingArvTestClient struct {
+	Total   int64
+	Calls   int
+	Content arvadosclient.Dict
+}
+
+func (this *LoggingArvTestClient) Create(resourceType string,
+	parameters arvadosclient.Dict,
+	output interface{}) error {
+
+	this.Calls += 1
+	this.Content = parameters
+	return nil
+}
+
+func (this *LoggingArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+	return nil
+}
+
+func (this *LoggingArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+type LoggingKeepTestClient struct {
+	Content []byte
+}
+
+func (this *LoggingKeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	this.Content = buf
+	return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
+}
+
+func (this *LoggingKeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+	return nil, nil
+}
+
+type TestTimestamper struct {
+	count int
+}
+
+func (this *TestTimestamper) Timestamp(t time.Time) string {
+	this.count += 1
+	return fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&LoggingTestSuite{})
+
+func (s *LoggingTestSuite) TestWriteLogs(c *C) {
+	api := &LoggingArvTestClient{}
+	kc := &LoggingKeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+	cr.CrunchLog.Print("Hello world!")
+	cr.CrunchLog.Print("Goodbye")
+	cr.CrunchLog.Stop()
+
+	c.Check(api.Calls, Equals, 1)
+
+	mt, err := cr.LogCollection.ManifestText()
+	c.Check(err, IsNil)
+	c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunchexec.txt\n")
+
+	logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+		"2015-12-29T15:51:45.000000002Z Goodbye\n"
+
+	c.Check(api.Content["event_type"], Equals, "crunchexec")
+	c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext)
+	c.Check(string(kc.Content), Equals, logtext)
+}
+
+func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
+	api := &LoggingArvTestClient{}
+	kc := &LoggingKeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+	for i := 0; i < 2000000; i += 1 {
+		cr.CrunchLog.Printf("Hello %d", i)
+	}
+	cr.CrunchLog.Print("Goodbye")
+	cr.CrunchLog.Stop()
+
+	c.Check(api.Calls > 1, Equals, true)
+	c.Check(api.Calls < 2000000, Equals, true)
+
+	mt, err := cr.LogCollection.ManifestText()
+	c.Check(err, IsNil)
+	c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunchexec.txt\n")
+}
+
+func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
+	api := &LoggingArvTestClient{}
+	kc := &LoggingKeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	ts := &TestTimestamper{}
+	cr.CrunchLog.Timestamper = ts.Timestamp
+	stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
+	stdout.Timestamper = ts.Timestamp
+
+	cr.CrunchLog.Print("Hello world!")
+	stdout.Print("Doing stuff")
+	cr.CrunchLog.Print("Goodbye")
+	stdout.Print("Blurb")
+
+	cr.CrunchLog.Stop()
+	logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+		"2015-12-29T15:51:45.000000003Z Goodbye\n"
+	c.Check(api.Content["event_type"], Equals, "crunchexec")
+	c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext1)
+
+	stdout.Stop()
+	logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
+		"2015-12-29T15:51:45.000000004Z Blurb\n"
+	c.Check(api.Content["event_type"], Equals, "stdout")
+	c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext2)
+
+	mt, err := cr.LogCollection.ManifestText()
+	c.Check(err, IsNil)
+	c.Check(mt, Equals, ""+
+		". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunchexec.txt\n"+
+		". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
+
+}
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
index 44c7213..daf45bb 100644
--- a/services/crunch-exec/upload.go
+++ b/services/crunch-exec/upload.go
@@ -8,10 +8,6 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"io"
-	"log"
-	"os"
-	"path/filepath"
-	"sort"
 	"strings"
 )
 
@@ -20,26 +16,23 @@ type Block struct {
 	offset int64
 }
 
-type ManifestFileWriter struct {
+type CollectionFileWriter struct {
 	IKeepClient
 	*manifest.ManifestStream
-	offset int64
+	offset uint64
+	length uint64
 	*Block
 	uploader chan *Block
 	finish   chan []error
 	fn       string
 }
 
-type IKeepClient interface {
-	PutHB(hash string, buf []byte) (string, int, error)
-}
-
-func (m *ManifestFileWriter) Write(p []byte) (int, error) {
+func (m *CollectionFileWriter) Write(p []byte) (int, error) {
 	n, err := m.ReadFrom(bytes.NewReader(p))
 	return int(n), err
 }
 
-func (m *ManifestFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
+func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
 	var total int64
 	var count int
 
@@ -56,6 +49,8 @@ func (m *ManifestFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
 		}
 	}
 
+	m.length += uint64(total)
+
 	if err == io.EOF {
 		return total, nil
 	} else {
@@ -63,13 +58,13 @@ func (m *ManifestFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
 	}
 }
 
-func (m *ManifestFileWriter) Close() error {
+func (m *CollectionFileWriter) Close() error {
 	m.ManifestStream.FileTokens = append(m.ManifestStream.FileTokens,
-		manifest.FileToken{0, m.offset, m.fn})
+		manifest.FileToken{m.offset, m.length, m.fn})
 	return nil
 }
 
-func (m *ManifestFileWriter) goUpload() {
+func (m *CollectionFileWriter) goUpload() {
 	var errors []error
 	uploader := m.uploader
 	finish := m.finish
@@ -85,12 +80,12 @@ func (m *ManifestFileWriter) goUpload() {
 	finish <- errors
 }
 
-type ManifestWriter struct {
+type CollectionWriter struct {
 	IKeepClient
-	Streams []*ManifestFileWriter
+	Streams []*CollectionFileWriter
 }
 
-func (m *ManifestWriter) Open(path string) io.WriteCloser {
+func (m *CollectionWriter) Open(path string) io.WriteCloser {
 	var dir string
 	var fn string
 
@@ -103,10 +98,11 @@ func (m *ManifestWriter) Open(path string) io.WriteCloser {
 		fn = path
 	}
 
-	fw := &ManifestFileWriter{
+	fw := &CollectionFileWriter{
 		m.IKeepClient,
 		&manifest.ManifestStream{StreamName: dir},
 		0,
+		0,
 		nil,
 		make(chan *Block),
 		make(chan []error),
@@ -118,7 +114,7 @@ func (m *ManifestWriter) Open(path string) io.WriteCloser {
 	return fw
 }
 
-func (m *ManifestWriter) Finish() error {
+func (m *CollectionWriter) Finish() error {
 	var errstring string
 	for _, stream := range m.Streams {
 		if stream.uploader == nil {
@@ -145,7 +141,7 @@ func (m *ManifestWriter) Finish() error {
 	}
 }
 
-func (m *ManifestWriter) ManifestText() (mt string, err error) {
+func (m *CollectionWriter) ManifestText() (mt string, err error) {
 	err = m.Finish()
 	if err != nil {
 		return "", err
@@ -168,9 +164,9 @@ func (m *ManifestWriter) ManifestText() (mt string, err error) {
 		}
 		for _, f := range v.FileTokens {
 			buf.WriteString(" ")
-			f = strings.Replace(f, " ", "\\040", -1)
-			f = strings.Replace(f, "\n", "", -1)
-			buf.WriteString(f)
+			name := strings.Replace(f.Name, " ", "\\040", -1)
+			name = strings.Replace(name, "\n", "", -1)
+			buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
 		}
 		buf.WriteString("\n")
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list