[ARVADOS] updated: a89b2b253f5afb3866e7c9b4d60aa6df8a30d10e

git at public.curoverse.com git at public.curoverse.com
Mon Dec 28 00:09:44 EST 2015


Summary of changes:
 services/crunch-exec/crunchexec.go      | 101 +++++++++++++---
 services/crunch-exec/crunchexec_test.go | 207 +++++++++++++++++++++++++++++---
 2 files changed, 273 insertions(+), 35 deletions(-)

       via  a89b2b253f5afb3866e7c9b4d60aa6df8a30d10e (commit)
      from  af4342c93873955249ce8f775cb77cd1454b06bc (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a89b2b253f5afb3866e7c9b4d60aa6df8a30d10e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 28 00:09:41 2015 -0500

    7816: Successfully runs container and gets logs from docker daemon, still needs
    to put those logs in the right place.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 1c9c5dd..b3f6051 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -12,6 +12,7 @@ import (
 	"log"
 	"os"
 	//"os/exec"
+	"bufio"
 	"os/signal"
 	"strings"
 	"syscall"
@@ -34,7 +35,7 @@ type Collection struct {
 	ManifestText string `json:"manifest_text"`
 }
 
-type Container struct {
+type ContainerRecord struct {
 	Uuid               string            `json:"uuid"`
 	Command            []string          `json:"command"`
 	ContainerImage     string            `json:"container_image`
@@ -52,12 +53,18 @@ func dockerEvent(event *dockerclient.Event, ec chan error, args ...interface{})
 	log.Printf("Received event: %#v\n", *event)
 }
 
+type NewLogWriter func(logstr string) io.Writer
+
 type ContainerRunner struct {
 	Docker *dockerclient.DockerClient
 	Api    IArvadosClient
 	Kc     IKeepClient
-	Container
+	ContainerRecord
 	dockerclient.ContainerConfig
+	ContainerId string
+	ExitCode    int
+	NewLogWriter
+	FinishChan chan bool
 }
 
 func (this *ContainerRunner) setupMonitoring() error {
@@ -78,7 +85,7 @@ func (this *ContainerRunner) setupMonitoring() error {
 
 func (this *ContainerRunner) LoadImage() (err error) {
 	var collection Collection
-	err = this.Api.Get("collections", this.Container.ContainerImage, nil, &collection)
+	err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
 	if err != nil {
 		return err
 	}
@@ -89,7 +96,7 @@ func (this *ContainerRunner) LoadImage() (err error) {
 		if !strings.HasSuffix(img, ".tar") {
 			return errors.New("First file in the collection does not end in .tar")
 		}
-		imageId = img[len(img)-4:]
+		imageId = img[:len(img)-4]
 	}
 
 	_, err = this.Docker.InspectImage(imageId)
@@ -111,25 +118,89 @@ func (this *ContainerRunner) LoadImage() (err error) {
 	return nil
 }
 
-func (this *ContainerRunner) startContainer() error {
-	containerId, err := this.Docker.CreateContainer(&this.ContainerConfig, "foobar", nil)
+func (this *ContainerRunner) StartContainer() (err error) {
+	this.ContainerConfig.Cmd = this.ContainerRecord.Command
+	this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
 	if err != nil {
-		return err
+		return
 	}
 	hostConfig := &dockerclient.HostConfig{}
-	err = this.Docker.StartContainer(containerId, hostConfig)
+	err = this.Docker.StartContainer(this.ContainerId, hostConfig)
 	if err != nil {
-		return err
+		return
 	}
 
 	return nil
 }
 
-func (this *ContainerRunner) getLogs() error {
+type ArvLoggingWriter struct {
+	Api IArvadosClient
+	Kc  IKeepClient
+}
+
+func (this ArvLoggingWriter) Write(p []byte) (n int, err error) {
+	return len(p), nil
+}
+
+const (
+	MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
+)
+
+func CopyLog(in io.Reader, out *log.Logger, done chan<- bool) {
+	reader := bufio.NewReaderSize(in, MaxLogLine)
+	var prefix string
+	for {
+		line, isPrefix, err := reader.ReadLine()
+		if err == io.EOF {
+			break
+		} else if err != nil {
+			out.Fatal("error reading container log:", err)
+		}
+		var suffix string
+		if isPrefix {
+			suffix = "[...]"
+		}
+		out.Print(prefix, string(line), suffix)
+		// Set up prefix for following line
+		if isPrefix {
+			prefix = "[...]"
+		} else {
+			prefix = ""
+		}
+	}
+	done <- true
+}
+
+func (this *ContainerRunner) GetLogs() (err error) {
+	var stderrReader, stdoutReader io.Reader
+	stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true, Timestamps: true})
+	if err != nil {
+		return
+	}
+	stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true, Timestamps: true})
+	if err != nil {
+		return
+	}
+
+	this.FinishChan = make(chan bool)
+
+	go CopyLog(stderrReader, log.New(this.NewLogWriter("stderr"), "", 0), this.FinishChan)
+	go CopyLog(stdoutReader, log.New(this.NewLogWriter("stdout"), "", 0), this.FinishChan)
+
 	return nil
 }
 
-func (this *ContainerRunner) waitFinish() error {
+func (this *ContainerRunner) WaitFinish() error {
+	result := this.Docker.Wait(this.ContainerId)
+	wr := <-result
+	if wr.Error != nil {
+		return wr.Error
+	}
+	this.ExitCode = wr.ExitCode
+
+	<-this.FinishChan
+	<-this.FinishChan
+
 	return nil
 }
 
@@ -143,7 +214,7 @@ func (this *ContainerRunner) updateContainer() error {
 
 func (this *ContainerRunner) Run(containerUuid string) (err error) {
 
-	err = this.Api.Get("containers", containerUuid, nil, &this.Container)
+	err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
 	if err != nil {
 		return
 	}
@@ -161,16 +232,16 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 	}
 
 	// (2) start container
-	err = this.startContainer()
+	err = this.StartContainer()
 	if err != nil {
 		return
 	}
 
 	// (3) attach container logs
-	err = this.getLogs()
+	err = this.GetLogs()
 
 	// (4) wait for container to finish
-	err = this.waitFinish()
+	err = this.WaitFinish()
 
 	// (5) write logs
 	err = this.writeLogs()
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index c2cb506..c24a74e 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -1,18 +1,16 @@
 package main
 
 import (
+	"bytes"
+	"errors"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"github.com/curoverse/dockerclient"
 	. "gopkg.in/check.v1"
 	"io"
-	//"io/ioutil"
-	//"log"
 	"os"
-	//"syscall"
 	"testing"
-	//"time"
 )
 
 // Gocheck boilerplate
@@ -26,31 +24,31 @@ type TestSuite struct{}
 var _ = Suite(&TestSuite{})
 
 type ArvTestClient struct {
-	c        *C
-	manifest string
-	success  bool
 }
 
 type KeepTestClient struct {
-	c        *C
-	manifest string
-	success  bool
+	Called bool
 }
 
+var busyboxManifest = ". 59950b5bd8b0854ac44669c5559c4358+1321984+Af83e8fa245ab84f5817064ba4e99aed87060bea5 at 5692cadc 0:1321984:fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar\n"
+var busyboxPDH = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
+var busyboxImageId = "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7"
+
+var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d at 5693216f 0:46:md5sum.txt\n"
+var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
+
 func (this ArvTestClient) Create(resourceType string,
 	parameters arvadosclient.Dict,
 	output interface{}) error {
 	return nil
 }
 
-var busyboxManifest = ". 59950b5bd8b0854ac44669c5559c4358+1321984+Af83e8fa245ab84f5817064ba4e99aed87060bea5 at 5692cadc 0:1321984:fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar\n"
-var busyboxPDH = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
-
 func (this ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
 	if resourceType == "collections" {
 		if uuid == busyboxPDH {
 			output.(*Collection).ManifestText = busyboxManifest
-
+		} else if uuid == otherPDH {
+			output.(*Collection).ManifestText = otherManifest
 		}
 	}
 	return nil
@@ -60,7 +58,7 @@ func (this ArvTestClient) Update(resourceType string, uuid string, parameters ar
 	return nil
 }
 
-func (this KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+func (this *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
 	return "", 0, nil
 }
 
@@ -73,9 +71,10 @@ func (this FileWrapper) Len() uint64 {
 	return this.len
 }
 
-func (this KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
 	if filename == "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar" {
 		rdr, err := os.Open("fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar")
+		this.Called = true
 		return FileWrapper{rdr, 1321984}, err
 	}
 	return nil, nil
@@ -84,8 +83,176 @@ func (this KeepTestClient) ManifestFileReader(m manifest.Manifest, filename stri
 func (s *TestSuite) TestLoadImage(c *C) {
 	cr := ContainerRunner{}
 	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
-	cr.Api = ArvTestClient{manifest: busyboxManifest}
-	cr.Kc = KeepTestClient{}
-	cr.Container.ContainerImage = busyboxPDH
-	cr.LoadImage()
+
+	var err error
+	_, err = cr.Docker.RemoveImage(busyboxImageId, true)
+
+	_, err = cr.Docker.InspectImage(busyboxImageId)
+	c.Check(err, NotNil)
+
+	kc := &KeepTestClient{}
+	cr.Api = ArvTestClient{}
+	cr.Kc = kc
+	cr.ContainerRecord.ContainerImage = busyboxPDH
+
+	// (1) Test loading image from keep
+	c.Check(kc.Called, Equals, false)
+	c.Check(cr.ContainerConfig.Image, Equals, "")
+
+	err = cr.LoadImage()
+
+	c.Check(err, IsNil)
+	defer func() {
+		cr.Docker.RemoveImage(busyboxImageId, true)
+	}()
+
+	c.Check(kc.Called, Equals, true)
+	c.Check(cr.ContainerConfig.Image, Equals, busyboxImageId)
+
+	_, err = cr.Docker.InspectImage(busyboxImageId)
+	c.Check(err, IsNil)
+
+	// (2) Test using image that's already loaded
+	kc.Called = false
+	cr.ContainerConfig.Image = ""
+
+	err = cr.LoadImage()
+	c.Check(err, IsNil)
+	c.Check(kc.Called, Equals, false)
+	c.Check(cr.ContainerConfig.Image, Equals, busyboxImageId)
+
+}
+
+type ArvErrorTestClient struct{}
+type KeepErrorTestClient struct{}
+type KeepReadErrorTestClient struct{}
+
+func (this ArvErrorTestClient) Create(resourceType string,
+	parameters arvadosclient.Dict,
+	output interface{}) error {
+	return nil
+}
+
+func (this ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+	return errors.New("ArvError")
+}
+
+func (this ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (this KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	return "", 0, nil
+}
+
+func (this KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+	return nil, errors.New("KeepError")
+}
+
+func (this KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	return "", 0, nil
+}
+
+type ErrorReader struct{}
+
+func (this ErrorReader) Read(p []byte) (n int, err error) {
+	return 0, errors.New("ErrorReader")
+}
+
+func (this ErrorReader) Close() error {
+	return nil
+}
+
+func (this ErrorReader) Len() uint64 {
+	return 0
+}
+
+func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+	return ErrorReader{}, nil
+}
+
+func (s *TestSuite) TestLoadImageArvError(c *C) {
+	// (1) Arvados error
+	cr := ContainerRunner{}
+	cr.Api = ArvErrorTestClient{}
+	cr.Kc = &KeepTestClient{}
+	cr.ContainerRecord.ContainerImage = busyboxPDH
+
+	err := cr.LoadImage()
+	c.Check(err.Error(), Equals, "ArvError")
+}
+
+func (s *TestSuite) TestLoadImageKeepError(c *C) {
+	// (2) Keep error
+	cr := ContainerRunner{}
+	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr.Api = &ArvTestClient{}
+	cr.Kc = KeepErrorTestClient{}
+	cr.ContainerRecord.ContainerImage = busyboxPDH
+
+	err := cr.LoadImage()
+	c.Check(err.Error(), Equals, "KeepError")
+}
+
+func (s *TestSuite) TestLoadImageCollectionError(c *C) {
+	// (3) Collection doesn't contain image
+	cr := ContainerRunner{}
+	cr.Api = &ArvTestClient{}
+	cr.Kc = KeepErrorTestClient{}
+	cr.ContainerRecord.ContainerImage = otherPDH
+
+	err := cr.LoadImage()
+	c.Check(err.Error(), Equals, "First file in the collection does not end in .tar")
+}
+
+func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
+	// (4) Collection doesn't contain image
+	cr := ContainerRunner{}
+	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr.Api = &ArvTestClient{}
+	cr.Kc = KeepReadErrorTestClient{}
+	cr.ContainerRecord.ContainerImage = busyboxPDH
+
+	err := cr.LoadImage()
+	c.Check(err, NotNil)
+}
+
+type TestLogs struct {
+	Stdout bytes.Buffer
+	Stderr bytes.Buffer
+}
+
+func (this *TestLogs) NewTestLoggingWriter(logstr string) io.Writer {
+	if logstr == "stdout" {
+		return &this.Stdout
+	}
+	if logstr == "stderr" {
+		return &this.Stderr
+	}
+	return nil
+}
+
+func (s *TestSuite) TestRunContainer(c *C) {
+	cr := ContainerRunner{}
+	cr.Docker, _ = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr.Api = ArvTestClient{}
+	cr.Kc = &KeepTestClient{}
+	var logs TestLogs
+	cr.NewLogWriter = logs.NewTestLoggingWriter
+	cr.ContainerRecord.ContainerImage = busyboxPDH
+	cr.ContainerRecord.Command = []string{"echo", "Hello world"}
+	err := cr.LoadImage()
+	c.Check(err, IsNil)
+
+	err = cr.StartContainer()
+	c.Check(err, IsNil)
+
+	err = cr.GetLogs()
+	c.Check(err, IsNil)
+
+	err = cr.WaitFinish()
+	c.Check(err, IsNil)
+
+	c.Check(logs.Stdout.String()[39:], Equals, "Hello world\n")
+	c.Check(logs.Stderr.String(), Equals, "")
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list