[ARVADOS] updated: 680525ea705aeb24a82699c8c57dbc26c797a775

git at public.curoverse.com git at public.curoverse.com
Tue Dec 29 16:44:00 EST 2015


Summary of changes:
 services/crunch-exec/crunchexec.go      | 168 +++++++++++++-----
 services/crunch-exec/crunchexec_test.go | 292 +++++++++++++++++++++++++++++++-
 services/crunch-exec/logging.go         |  16 +-
 services/crunch-exec/logging_test.go    |  52 +-----
 4 files changed, 432 insertions(+), 96 deletions(-)

       via  680525ea705aeb24a82699c8c57dbc26c797a775 (commit)
      from  abbee001eff213d194978a67ee745402ab7ccc45 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 680525ea705aeb24a82699c8c57dbc26c797a775
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Dec 29 16:43:56 2015 -0500

    7816: Full run tests against synthetic container records.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 525fa4c..8bc4d8c 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -12,6 +12,7 @@ import (
 	"os"
 	"os/signal"
 	"strings"
+	"sync"
 	"syscall"
 )
 
@@ -35,7 +36,7 @@ type Collection struct {
 type ContainerRecord struct {
 	Uuid               string            `json:"uuid"`
 	Command            []string          `json:"command"`
-	ContainerImage     string            `json:"container_image`
+	ContainerImage     string            `json:"container_image"`
 	Cwd                string            `json:"cwd"`
 	Environment        map[string]string `json:"environment"`
 	Mounts             map[string]Mount  `json:"mounts"`
@@ -54,33 +55,44 @@ type ContainerRunner struct {
 	ContainerRecord
 	dockerclient.ContainerConfig
 	ContainerId string
-	ExitCode    int
+	ExitCode    *int
 	NewLogWriter
 	loggingDone   chan bool
 	CrunchLog     *ThrottledLogger
 	Stdout        *ThrottledLogger
 	Stderr        *ThrottledLogger
 	LogCollection *CollectionWriter
-	LogsPDH       string
+	LogsPDH       *string
+	CancelLock    sync.Mutex
+	Cancelled     bool
+	SigChan       chan os.Signal
 }
 
-func (this *ContainerRunner) setupMonitoring() error {
-	sigChan := make(chan os.Signal, 1)
-	signal.Notify(sigChan, syscall.SIGTERM)
-	signal.Notify(sigChan, syscall.SIGINT)
-	signal.Notify(sigChan, syscall.SIGQUIT)
+func (this *ContainerRunner) SetupSignals() error {
+	this.SigChan = make(chan os.Signal, 1)
+	signal.Notify(this.SigChan, syscall.SIGTERM)
+	signal.Notify(this.SigChan, syscall.SIGINT)
+	signal.Notify(this.SigChan, syscall.SIGQUIT)
 
 	go func(sig <-chan os.Signal) {
-		//for sig := range sig {
-		//}
-	}(sigChan)
+		for _ = range sig {
+			if !this.Cancelled {
+				this.CancelLock.Lock()
+				this.Cancelled = true
+				if this.ContainerId != "" {
+					this.Docker.StopContainer(this.ContainerId, 10)
+				}
+				this.CancelLock.Unlock()
+			}
+		}
+	}(this.SigChan)
 
 	return nil
 }
 
 func (this *ContainerRunner) LoadImage() (err error) {
 
-	this.CrunchLog.Print("Fetching Docker image from collection ", this.ContainerRecord.ContainerImage)
+	this.CrunchLog.Printf("Fetching Docker image from collection '%s'", this.ContainerRecord.ContainerImage)
 
 	var collection Collection
 	err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
@@ -97,7 +109,7 @@ func (this *ContainerRunner) LoadImage() (err error) {
 		imageId = img[:len(img)-4]
 	}
 
-	this.CrunchLog.Print("Using Docker image id ", imageId)
+	this.CrunchLog.Printf("Using Docker image id '%s'", imageId)
 
 	_, err = this.Docker.InspectImage(imageId)
 	if err != nil {
@@ -114,7 +126,7 @@ func (this *ContainerRunner) LoadImage() (err error) {
 			return err
 		}
 	} else {
-		this.CrunchLog.Print("Docker image is already available on host")
+		this.CrunchLog.Print("Docker image is available")
 	}
 
 	this.ContainerConfig.Image = imageId
@@ -125,14 +137,27 @@ func (this *ContainerRunner) LoadImage() (err error) {
 func (this *ContainerRunner) StartContainer() (err error) {
 	this.CrunchLog.Print("Creating Docker container")
 
+	this.CancelLock.Lock()
+	defer this.CancelLock.Unlock()
+
+	if this.Cancelled {
+		return errors.New("Cancelled")
+	}
+
 	this.ContainerConfig.Cmd = this.ContainerRecord.Command
+	if this.ContainerRecord.Cwd != "." {
+		this.ContainerConfig.WorkingDir = this.ContainerRecord.Cwd
+	}
+	for k, v := range this.ContainerRecord.Environment {
+		this.ContainerConfig.Env = append(this.ContainerConfig.Env, k+"="+v)
+	}
 	this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
 	if err != nil {
 		return
 	}
 	hostConfig := &dockerclient.HostConfig{}
 
-	this.CrunchLog.Print("Starting Docker container id ", this.ContainerId)
+	this.CrunchLog.Printf("Starting Docker container id '%s'", this.ContainerId)
 	err = this.Docker.StartContainer(this.ContainerId, hostConfig)
 	if err != nil {
 		return
@@ -158,7 +183,7 @@ func (this *ContainerRunner) AttachLogs() (err error) {
 	this.loggingDone = make(chan bool)
 
 	this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
-	this.Stderr = NewThrottledLogger(this.NewLogWriter("stdout"))
+	this.Stderr = NewThrottledLogger(this.NewLogWriter("stderr"))
 	go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
 	go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
 
@@ -171,7 +196,7 @@ func (this *ContainerRunner) WaitFinish() error {
 	if wr.Error != nil {
 		return wr.Error
 	}
-	this.ExitCode = wr.ExitCode
+	this.ExitCode = &wr.ExitCode
 
 	// drain stdout/stderr
 	<-this.loggingDone
@@ -184,32 +209,56 @@ func (this *ContainerRunner) WaitFinish() error {
 }
 
 func (this *ContainerRunner) CommitLogs() error {
+	if this.Cancelled {
+		this.CrunchLog.Print("Cancelled")
+	} else {
+		this.CrunchLog.Print("Complete")
+	}
+
 	this.CrunchLog.Stop()
 	this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
 		"crunchexec", nil})
 
 	mt, err := this.LogCollection.ManifestText()
 	if err != nil {
-		this.CrunchLog.Print(err)
 		return err
 	}
 
-	var response map[string]string
+	response := make(map[string]string)
 	err = this.Api.Create("collections",
 		arvadosclient.Dict{"name": "logs for " + this.ContainerRecord.Uuid,
-			"manifest_text": mt}, &response)
+			"manifest_text": mt},
+		response)
 	if err != nil {
-		this.CrunchLog.Print(err)
 		return err
 	}
 
-	this.LogsPDH = response["portable_data_hash"]
+	this.LogsPDH = new(string)
+	*this.LogsPDH = response["portable_data_hash"]
 
 	return nil
 }
 
-func (this *ContainerRunner) UpdateContainerRecord() error {
-	return nil
+func (this *ContainerRunner) UpdateContainerRecordRunning() error {
+	update := arvadosclient.Dict{"state": "Running"}
+	return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+}
+
+func (this *ContainerRunner) UpdateContainerRecordComplete() error {
+	update := arvadosclient.Dict{}
+	if this.LogsPDH != nil {
+		update["log"] = *this.LogsPDH
+	}
+	if this.ExitCode != nil {
+		update["exit_code"] = *this.ExitCode
+	}
+
+	if this.Cancelled {
+		update["state"] = "Cancelled"
+	} else {
+		update["state"] = "Complete"
+	}
+	return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
 }
 
 func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
@@ -217,45 +266,82 @@ func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
 }
 
 func (this *ContainerRunner) Run(containerUuid string) (err error) {
+	this.CrunchLog.Printf("Executing container '%s'", containerUuid)
+
+	var runerr, waiterr error
+
+	defer func() {
+		if err != nil {
+			this.CrunchLog.Print(err)
+		}
+
+		// (6) write logs
+		logerr := this.CommitLogs()
+		if logerr != nil {
+			this.CrunchLog.Print(logerr)
+		}
+
+		// (7) update container record with results
+		updateerr := this.UpdateContainerRecordComplete()
+		if updateerr != nil {
+			this.CrunchLog.Print(updateerr)
+		}
+
+		this.CrunchLog.Stop()
+
+		if err == nil {
+			if runerr != nil {
+				err = runerr
+			} else if waiterr != nil {
+				err = runerr
+			} else if logerr != nil {
+				err = logerr
+			} else if updateerr != nil {
+				err = updateerr
+			}
+		}
+	}()
 
 	err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
 	if err != nil {
-		this.CrunchLog.Print(err)
 		return
 	}
 
-	// (0) start event monitoring goroutines
-	err = this.setupMonitoring()
+	// (0) setup signal handling
+	err = this.SetupSignals()
 	if err != nil {
-		this.CrunchLog.Print(err)
 		return
 	}
 
 	// (1) check for and/or load image
 	err = this.LoadImage()
 	if err != nil {
-		this.CrunchLog.Print(err)
 		return
 	}
 
 	// (2) start container
 	err = this.StartContainer()
 	if err != nil {
-		this.CrunchLog.Print(err)
+		if err.Error() == "Cancelled" {
+			err = nil
+		}
 		return
 	}
 
-	// (3) attach container logs
-	err = this.AttachLogs()
-
-	// (4) wait for container to finish
-	err = this.WaitFinish()
+	// (3) update container record state
+	err = this.UpdateContainerRecordRunning()
+	if err != nil {
+		this.CrunchLog.Print(err)
+	}
 
-	// (5) write logs
-	err = this.CommitLogs()
+	// (4) attach container logs
+	runerr = this.AttachLogs()
+	if runerr != nil {
+		this.CrunchLog.Print(runerr)
+	}
 
-	// (6) update container record with results
-	this.UpdateContainerRecord()
+	// (5) wait for container to finish
+	waiterr = this.WaitFinish()
 
 	return
 }
@@ -278,12 +364,14 @@ func main() {
 	if err != nil {
 		log.Fatal(err)
 	}
+	api.Retries = 8
 
-	var kc IKeepClient
+	var kc *keepclient.KeepClient
 	kc, err = keepclient.MakeKeepClient(&api)
 	if err != nil {
 		log.Fatal(err)
 	}
+	kc.Retries = 4
 
 	var docker *dockerclient.DockerClient
 	docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 9112ebb..bab299f 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -2,7 +2,10 @@ package main
 
 import (
 	"bytes"
+	"crypto/md5"
+	"encoding/json"
 	"errors"
+	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
@@ -11,7 +14,9 @@ import (
 	"io"
 	"os"
 	"strings"
+	"syscall"
 	"testing"
+	"time"
 )
 
 // Gocheck boilerplate
@@ -25,10 +30,17 @@ type TestSuite struct{}
 var _ = Suite(&TestSuite{})
 
 type ArvTestClient struct {
+	Total   int64
+	Calls   int
+	Content arvadosclient.Dict
+	ContainerRecord
+	Logs          map[string]*bytes.Buffer
+	WasSetRunning bool
 }
 
 type KeepTestClient struct {
-	Called bool
+	Called  bool
+	Content []byte
 }
 
 var busyboxManifest = ". 59950b5bd8b0854ac44669c5559c4358+1321984+Af83e8fa245ab84f5817064ba4e99aed87060bea5 at 5692cadc 0:1321984:fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar\n"
@@ -38,13 +50,34 @@ var busyboxImageId = "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a4
 var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d at 5693216f 0:46:md5sum.txt\n"
 var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
 
-func (this ArvTestClient) Create(resourceType string,
+func (this *ArvTestClient) Create(resourceType string,
 	parameters arvadosclient.Dict,
 	output interface{}) error {
+
+	this.Calls += 1
+	this.Content = parameters
+
+	if resourceType == "logs" {
+		et := parameters["event_type"].(string)
+		if this.Logs == nil {
+			this.Logs = make(map[string]*bytes.Buffer)
+		}
+		if this.Logs[et] == nil {
+			this.Logs[et] = &bytes.Buffer{}
+		}
+		this.Logs[et].Write([]byte(parameters["properties"].(map[string]string)["text"]))
+	}
+
+	if resourceType == "collections" && output != nil {
+		mt := parameters["manifest_text"].(string)
+		outmap := output.(map[string]string)
+		outmap["portable_data_hash"] = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+	}
+
 	return nil
 }
 
-func (this ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
 	if resourceType == "collections" {
 		if uuid == busyboxPDH {
 			output.(*Collection).ManifestText = busyboxManifest
@@ -52,15 +85,27 @@ func (this ArvTestClient) Get(resourceType string, uuid string, parameters arvad
 			output.(*Collection).ManifestText = otherManifest
 		}
 	}
+	if resourceType == "containers" {
+		(*output.(*ContainerRecord)) = this.ContainerRecord
+	}
 	return nil
 }
 
-func (this ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+func (this *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+
+	this.Content = parameters
+	if resourceType == "containers" {
+		if parameters["state"] == "Running" {
+			this.WasSetRunning = true
+		}
+
+	}
 	return nil
 }
 
 func (this *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
-	return "", 0, nil
+	this.Content = buf
+	return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
 }
 
 type FileWrapper struct {
@@ -84,7 +129,7 @@ func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename str
 func (s *TestSuite) TestLoadImage(c *C) {
 	kc := &KeepTestClient{}
 	docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
-	cr := NewContainerRunner(ArvTestClient{}, kc, docker)
+	cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
 
 	_, err = cr.Docker.RemoveImage(busyboxImageId, true)
 
@@ -236,7 +281,7 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
 
 func (s *TestSuite) TestRunContainer(c *C) {
 	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
-	cr := NewContainerRunner(ArvTestClient{}, &KeepTestClient{}, docker)
+	cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
 
 	var logs TestLogs
 	cr.NewLogWriter = logs.NewTestLoggingWriter
@@ -257,3 +302,236 @@ func (s *TestSuite) TestRunContainer(c *C) {
 	c.Check(strings.HasSuffix(logs.Stdout.String(), "Hello world\n"), Equals, true)
 	c.Check(logs.Stderr.String(), Equals, "")
 }
+
+func (s *LoggingTestSuite) TestCommitLogs(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+	cr.CrunchLog.Print("Hello world!")
+	cr.CrunchLog.Print("Goodbye")
+
+	err := cr.CommitLogs()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	c.Check(api.Content["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunchexec.txt\n")
+	c.Check(*cr.LogsPDH, Equals, "d3a229d2fe3690c2c3e75a71a153c6a3+60")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+	err := cr.UpdateContainerRecordRunning()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["state"], Equals, "Running")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+	cr.LogsPDH = new(string)
+	*cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
+
+	cr.ExitCode = new(int)
+	*cr.ExitCode = 42
+
+	err := cr.UpdateContainerRecordComplete()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["log"], Equals, *cr.LogsPDH)
+	c.Check(api.Content["exit_code"], Equals, *cr.ExitCode)
+	c.Check(api.Content["state"], Equals, "Complete")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.Cancelled = true
+
+	err := cr.UpdateContainerRecordComplete()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["log"], IsNil)
+	c.Check(api.Content["exit_code"], IsNil)
+	c.Check(api.Content["state"], Equals, "Cancelled")
+}
+
+func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner) {
+	rec := ContainerRecord{}
+	err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+	c.Check(err, IsNil)
+
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker.RemoveImage(busyboxImageId, true)
+
+	api = &ArvTestClient{ContainerRecord: rec}
+	cr = NewContainerRunner(api, &KeepTestClient{}, docker)
+
+	err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	c.Check(err, IsNil)
+	c.Check(api.WasSetRunning, Equals, true)
+
+	c.Check(api.Content["log"], NotNil)
+
+	if err != nil {
+		for k, v := range api.Logs {
+			c.Log(k)
+			c.Log(v.String())
+		}
+	}
+
+	return
+}
+
+func (s *LoggingTestSuite) TestFullRunHello(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["log"], NotNil)
+	c.Check(api.Content["exit_code"], Equals, 1)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
+	c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["pwd"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["pwd"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": "/bin",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestCancel(c *C) {
+	record := `{
+    "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`
+
+	rec := ContainerRecord{}
+	err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+	c.Check(err, IsNil)
+
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker.RemoveImage(busyboxImageId, true)
+
+	api := &ArvTestClient{ContainerRecord: rec}
+	cr := NewContainerRunner(api, &KeepTestClient{}, docker)
+
+	go func() {
+		for cr.ContainerId == "" {
+			time.Sleep(1 * time.Second)
+		}
+		cr.SigChan <- syscall.SIGINT
+	}()
+
+	err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["log"], NotNil)
+
+	if err != nil {
+		for k, v := range api.Logs {
+			c.Log(k)
+			c.Log(v.String())
+		}
+	}
+
+	c.Check(api.Content["state"], Equals, "Cancelled")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": "/bin",
+    "environment": {"FROBIZ": "bilbo"},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
+}
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index d5042be..01b6548 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -25,7 +25,15 @@ type ThrottledLogger struct {
 }
 
 func RFC3339Timestamp(now time.Time) string {
-	return now.Format(time.RFC3339Nano)
+	// return now.Format(time.RFC3339Nano)
+	// Builtin RFC3339Nano format isn't fixed width so
+	// provide our own.
+
+	return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ",
+		now.Year(), now.Month(), now.Day(),
+		now.Hour(), now.Minute(), now.Second(),
+		now.Nanosecond())
+
 }
 
 func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
@@ -143,7 +151,9 @@ func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
 }
 
 func (this *ArvLogWriter) Close() (err error) {
-	err = this.WriteCloser.Close()
-	this.WriteCloser = nil
+	if this.WriteCloser != nil {
+		err = this.WriteCloser.Close()
+		this.WriteCloser = nil
+	}
 	return err
 }
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
index bac36f5..27d066a 100644
--- a/services/crunch-exec/logging_test.go
+++ b/services/crunch-exec/logging_test.go
@@ -2,9 +2,6 @@ package main
 
 import (
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	. "gopkg.in/check.v1"
 	"testing"
 	"time"
@@ -17,42 +14,6 @@ func Test2(t *testing.T) {
 
 type LoggingTestSuite struct{}
 
-type LoggingArvTestClient struct {
-	Total   int64
-	Calls   int
-	Content arvadosclient.Dict
-}
-
-func (this *LoggingArvTestClient) Create(resourceType string,
-	parameters arvadosclient.Dict,
-	output interface{}) error {
-
-	this.Calls += 1
-	this.Content = parameters
-	return nil
-}
-
-func (this *LoggingArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
-	return nil
-}
-
-func (this *LoggingArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
-	return nil
-}
-
-type LoggingKeepTestClient struct {
-	Content []byte
-}
-
-func (this *LoggingKeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
-	this.Content = buf
-	return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
-}
-
-func (this *LoggingKeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
-	return nil, nil
-}
-
 type TestTimestamper struct {
 	count int
 }
@@ -66,8 +27,8 @@ func (this *TestTimestamper) Timestamp(t time.Time) string {
 var _ = Suite(&LoggingTestSuite{})
 
 func (s *LoggingTestSuite) TestWriteLogs(c *C) {
-	api := &LoggingArvTestClient{}
-	kc := &LoggingKeepTestClient{}
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
 	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
@@ -90,8 +51,8 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
 }
 
 func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
-	api := &LoggingArvTestClient{}
-	kc := &LoggingKeepTestClient{}
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
 	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
@@ -110,8 +71,8 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
 }
 
 func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
-	api := &LoggingArvTestClient{}
-	kc := &LoggingKeepTestClient{}
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
 	ts := &TestTimestamper{}
 	cr.CrunchLog.Timestamper = ts.Timestamp
@@ -140,5 +101,4 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
 	c.Check(mt, Equals, ""+
 		". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunchexec.txt\n"+
 		". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
-
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list