[ARVADOS] updated: 680525ea705aeb24a82699c8c57dbc26c797a775
git at public.curoverse.com
git at public.curoverse.com
Tue Dec 29 16:44:00 EST 2015
Summary of changes:
services/crunch-exec/crunchexec.go | 168 +++++++++++++-----
services/crunch-exec/crunchexec_test.go | 292 +++++++++++++++++++++++++++++++-
services/crunch-exec/logging.go | 16 +-
services/crunch-exec/logging_test.go | 52 +-----
4 files changed, 432 insertions(+), 96 deletions(-)
via 680525ea705aeb24a82699c8c57dbc26c797a775 (commit)
from abbee001eff213d194978a67ee745402ab7ccc45 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 680525ea705aeb24a82699c8c57dbc26c797a775
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Dec 29 16:43:56 2015 -0500
7816: Full run tests against synthetic container records.
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index 525fa4c..8bc4d8c 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -12,6 +12,7 @@ import (
"os"
"os/signal"
"strings"
+ "sync"
"syscall"
)
@@ -35,7 +36,7 @@ type Collection struct {
type ContainerRecord struct {
Uuid string `json:"uuid"`
Command []string `json:"command"`
- ContainerImage string `json:"container_image`
+ ContainerImage string `json:"container_image"`
Cwd string `json:"cwd"`
Environment map[string]string `json:"environment"`
Mounts map[string]Mount `json:"mounts"`
@@ -54,33 +55,44 @@ type ContainerRunner struct {
ContainerRecord
dockerclient.ContainerConfig
ContainerId string
- ExitCode int
+ ExitCode *int
NewLogWriter
loggingDone chan bool
CrunchLog *ThrottledLogger
Stdout *ThrottledLogger
Stderr *ThrottledLogger
LogCollection *CollectionWriter
- LogsPDH string
+ LogsPDH *string
+ CancelLock sync.Mutex
+ Cancelled bool
+ SigChan chan os.Signal
}
-func (this *ContainerRunner) setupMonitoring() error {
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGTERM)
- signal.Notify(sigChan, syscall.SIGINT)
- signal.Notify(sigChan, syscall.SIGQUIT)
+func (this *ContainerRunner) SetupSignals() error {
+ this.SigChan = make(chan os.Signal, 1)
+ signal.Notify(this.SigChan, syscall.SIGTERM)
+ signal.Notify(this.SigChan, syscall.SIGINT)
+ signal.Notify(this.SigChan, syscall.SIGQUIT)
go func(sig <-chan os.Signal) {
- //for sig := range sig {
- //}
- }(sigChan)
+ for _ = range sig {
+ if !this.Cancelled {
+ this.CancelLock.Lock()
+ this.Cancelled = true
+ if this.ContainerId != "" {
+ this.Docker.StopContainer(this.ContainerId, 10)
+ }
+ this.CancelLock.Unlock()
+ }
+ }
+ }(this.SigChan)
return nil
}
func (this *ContainerRunner) LoadImage() (err error) {
- this.CrunchLog.Print("Fetching Docker image from collection ", this.ContainerRecord.ContainerImage)
+ this.CrunchLog.Printf("Fetching Docker image from collection '%s'", this.ContainerRecord.ContainerImage)
var collection Collection
err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
@@ -97,7 +109,7 @@ func (this *ContainerRunner) LoadImage() (err error) {
imageId = img[:len(img)-4]
}
- this.CrunchLog.Print("Using Docker image id ", imageId)
+ this.CrunchLog.Printf("Using Docker image id '%s'", imageId)
_, err = this.Docker.InspectImage(imageId)
if err != nil {
@@ -114,7 +126,7 @@ func (this *ContainerRunner) LoadImage() (err error) {
return err
}
} else {
- this.CrunchLog.Print("Docker image is already available on host")
+ this.CrunchLog.Print("Docker image is available")
}
this.ContainerConfig.Image = imageId
@@ -125,14 +137,27 @@ func (this *ContainerRunner) LoadImage() (err error) {
func (this *ContainerRunner) StartContainer() (err error) {
this.CrunchLog.Print("Creating Docker container")
+ this.CancelLock.Lock()
+ defer this.CancelLock.Unlock()
+
+ if this.Cancelled {
+ return errors.New("Cancelled")
+ }
+
this.ContainerConfig.Cmd = this.ContainerRecord.Command
+ if this.ContainerRecord.Cwd != "." {
+ this.ContainerConfig.WorkingDir = this.ContainerRecord.Cwd
+ }
+ for k, v := range this.ContainerRecord.Environment {
+ this.ContainerConfig.Env = append(this.ContainerConfig.Env, k+"="+v)
+ }
this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
if err != nil {
return
}
hostConfig := &dockerclient.HostConfig{}
- this.CrunchLog.Print("Starting Docker container id ", this.ContainerId)
+ this.CrunchLog.Printf("Starting Docker container id '%s'", this.ContainerId)
err = this.Docker.StartContainer(this.ContainerId, hostConfig)
if err != nil {
return
@@ -158,7 +183,7 @@ func (this *ContainerRunner) AttachLogs() (err error) {
this.loggingDone = make(chan bool)
this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
- this.Stderr = NewThrottledLogger(this.NewLogWriter("stdout"))
+ this.Stderr = NewThrottledLogger(this.NewLogWriter("stderr"))
go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
@@ -171,7 +196,7 @@ func (this *ContainerRunner) WaitFinish() error {
if wr.Error != nil {
return wr.Error
}
- this.ExitCode = wr.ExitCode
+ this.ExitCode = &wr.ExitCode
// drain stdout/stderr
<-this.loggingDone
@@ -184,32 +209,56 @@ func (this *ContainerRunner) WaitFinish() error {
}
func (this *ContainerRunner) CommitLogs() error {
+ if this.Cancelled {
+ this.CrunchLog.Print("Cancelled")
+ } else {
+ this.CrunchLog.Print("Complete")
+ }
+
this.CrunchLog.Stop()
this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
"crunchexec", nil})
mt, err := this.LogCollection.ManifestText()
if err != nil {
- this.CrunchLog.Print(err)
return err
}
- var response map[string]string
+ response := make(map[string]string)
err = this.Api.Create("collections",
arvadosclient.Dict{"name": "logs for " + this.ContainerRecord.Uuid,
- "manifest_text": mt}, &response)
+ "manifest_text": mt},
+ response)
if err != nil {
- this.CrunchLog.Print(err)
return err
}
- this.LogsPDH = response["portable_data_hash"]
+ this.LogsPDH = new(string)
+ *this.LogsPDH = response["portable_data_hash"]
return nil
}
-func (this *ContainerRunner) UpdateContainerRecord() error {
- return nil
+func (this *ContainerRunner) UpdateContainerRecordRunning() error {
+ update := arvadosclient.Dict{"state": "Running"}
+ return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+}
+
+func (this *ContainerRunner) UpdateContainerRecordComplete() error {
+ update := arvadosclient.Dict{}
+ if this.LogsPDH != nil {
+ update["log"] = *this.LogsPDH
+ }
+ if this.ExitCode != nil {
+ update["exit_code"] = *this.ExitCode
+ }
+
+ if this.Cancelled {
+ update["state"] = "Cancelled"
+ } else {
+ update["state"] = "Complete"
+ }
+ return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
}
func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
@@ -217,45 +266,82 @@ func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
}
func (this *ContainerRunner) Run(containerUuid string) (err error) {
+ this.CrunchLog.Printf("Executing container '%s'", containerUuid)
+
+ var runerr, waiterr error
+
+ defer func() {
+ if err != nil {
+ this.CrunchLog.Print(err)
+ }
+
+ // (6) write logs
+ logerr := this.CommitLogs()
+ if logerr != nil {
+ this.CrunchLog.Print(logerr)
+ }
+
+ // (7) update container record with results
+ updateerr := this.UpdateContainerRecordComplete()
+ if updateerr != nil {
+ this.CrunchLog.Print(updateerr)
+ }
+
+ this.CrunchLog.Stop()
+
+ if err == nil {
+ if runerr != nil {
+ err = runerr
+ } else if waiterr != nil {
+ err = runerr
+ } else if logerr != nil {
+ err = logerr
+ } else if updateerr != nil {
+ err = updateerr
+ }
+ }
+ }()
err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
if err != nil {
- this.CrunchLog.Print(err)
return
}
- // (0) start event monitoring goroutines
- err = this.setupMonitoring()
+ // (0) setup signal handling
+ err = this.SetupSignals()
if err != nil {
- this.CrunchLog.Print(err)
return
}
// (1) check for and/or load image
err = this.LoadImage()
if err != nil {
- this.CrunchLog.Print(err)
return
}
// (2) start container
err = this.StartContainer()
if err != nil {
- this.CrunchLog.Print(err)
+ if err.Error() == "Cancelled" {
+ err = nil
+ }
return
}
- // (3) attach container logs
- err = this.AttachLogs()
-
- // (4) wait for container to finish
- err = this.WaitFinish()
+ // (3) update container record state
+ err = this.UpdateContainerRecordRunning()
+ if err != nil {
+ this.CrunchLog.Print(err)
+ }
- // (5) write logs
- err = this.CommitLogs()
+ // (4) attach container logs
+ runerr = this.AttachLogs()
+ if runerr != nil {
+ this.CrunchLog.Print(runerr)
+ }
- // (6) update container record with results
- this.UpdateContainerRecord()
+ // (5) wait for container to finish
+ waiterr = this.WaitFinish()
return
}
@@ -278,12 +364,14 @@ func main() {
if err != nil {
log.Fatal(err)
}
+ api.Retries = 8
- var kc IKeepClient
+ var kc *keepclient.KeepClient
kc, err = keepclient.MakeKeepClient(&api)
if err != nil {
log.Fatal(err)
}
+ kc.Retries = 4
var docker *dockerclient.DockerClient
docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 9112ebb..bab299f 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -2,7 +2,10 @@ package main
import (
"bytes"
+ "crypto/md5"
+ "encoding/json"
"errors"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
@@ -11,7 +14,9 @@ import (
"io"
"os"
"strings"
+ "syscall"
"testing"
+ "time"
)
// Gocheck boilerplate
@@ -25,10 +30,17 @@ type TestSuite struct{}
var _ = Suite(&TestSuite{})
type ArvTestClient struct {
+ Total int64
+ Calls int
+ Content arvadosclient.Dict
+ ContainerRecord
+ Logs map[string]*bytes.Buffer
+ WasSetRunning bool
}
type KeepTestClient struct {
- Called bool
+ Called bool
+ Content []byte
}
var busyboxManifest = ". 59950b5bd8b0854ac44669c5559c4358+1321984+Af83e8fa245ab84f5817064ba4e99aed87060bea5 at 5692cadc 0:1321984:fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar\n"
@@ -38,13 +50,34 @@ var busyboxImageId = "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a4
var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d at 5693216f 0:46:md5sum.txt\n"
var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
-func (this ArvTestClient) Create(resourceType string,
+func (this *ArvTestClient) Create(resourceType string,
parameters arvadosclient.Dict,
output interface{}) error {
+
+ this.Calls += 1
+ this.Content = parameters
+
+ if resourceType == "logs" {
+ et := parameters["event_type"].(string)
+ if this.Logs == nil {
+ this.Logs = make(map[string]*bytes.Buffer)
+ }
+ if this.Logs[et] == nil {
+ this.Logs[et] = &bytes.Buffer{}
+ }
+ this.Logs[et].Write([]byte(parameters["properties"].(map[string]string)["text"]))
+ }
+
+ if resourceType == "collections" && output != nil {
+ mt := parameters["manifest_text"].(string)
+ outmap := output.(map[string]string)
+ outmap["portable_data_hash"] = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+ }
+
return nil
}
-func (this ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
if resourceType == "collections" {
if uuid == busyboxPDH {
output.(*Collection).ManifestText = busyboxManifest
@@ -52,15 +85,27 @@ func (this ArvTestClient) Get(resourceType string, uuid string, parameters arvad
output.(*Collection).ManifestText = otherManifest
}
}
+ if resourceType == "containers" {
+ (*output.(*ContainerRecord)) = this.ContainerRecord
+ }
return nil
}
-func (this ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+func (this *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+
+ this.Content = parameters
+ if resourceType == "containers" {
+ if parameters["state"] == "Running" {
+ this.WasSetRunning = true
+ }
+
+ }
return nil
}
func (this *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- return "", 0, nil
+ this.Content = buf
+ return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
}
type FileWrapper struct {
@@ -84,7 +129,7 @@ func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename str
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
- cr := NewContainerRunner(ArvTestClient{}, kc, docker)
+ cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
_, err = cr.Docker.RemoveImage(busyboxImageId, true)
@@ -236,7 +281,7 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
func (s *TestSuite) TestRunContainer(c *C) {
docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
- cr := NewContainerRunner(ArvTestClient{}, &KeepTestClient{}, docker)
+ cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
@@ -257,3 +302,236 @@ func (s *TestSuite) TestRunContainer(c *C) {
c.Check(strings.HasSuffix(logs.Stdout.String(), "Hello world\n"), Equals, true)
c.Check(logs.Stderr.String(), Equals, "")
}
+
+func (s *LoggingTestSuite) TestCommitLogs(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+ cr.CrunchLog.Print("Hello world!")
+ cr.CrunchLog.Print("Goodbye")
+
+ err := cr.CommitLogs()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Check(api.Content["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunchexec.txt\n")
+ c.Check(*cr.LogsPDH, Equals, "d3a229d2fe3690c2c3e75a71a153c6a3+60")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+ err := cr.UpdateContainerRecordRunning()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["state"], Equals, "Running")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+ cr.LogsPDH = new(string)
+ *cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
+
+ cr.ExitCode = new(int)
+ *cr.ExitCode = 42
+
+ err := cr.UpdateContainerRecordComplete()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["log"], Equals, *cr.LogsPDH)
+ c.Check(api.Content["exit_code"], Equals, *cr.ExitCode)
+ c.Check(api.Content["state"], Equals, "Complete")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.Cancelled = true
+
+ err := cr.UpdateContainerRecordComplete()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["log"], IsNil)
+ c.Check(api.Content["exit_code"], IsNil)
+ c.Check(api.Content["state"], Equals, "Cancelled")
+}
+
+func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner) {
+ rec := ContainerRecord{}
+ err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+ c.Check(err, IsNil)
+
+ docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker.RemoveImage(busyboxImageId, true)
+
+ api = &ArvTestClient{ContainerRecord: rec}
+ cr = NewContainerRunner(api, &KeepTestClient{}, docker)
+
+ err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Check(err, IsNil)
+ c.Check(api.WasSetRunning, Equals, true)
+
+ c.Check(api.Content["log"], NotNil)
+
+ if err != nil {
+ for k, v := range api.Logs {
+ c.Log(k)
+ c.Log(v.String())
+ }
+ }
+
+ return
+}
+
+func (s *LoggingTestSuite) TestFullRunHello(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["log"], NotNil)
+ c.Check(api.Content["exit_code"], Equals, 1)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
+ c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["pwd"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["pwd"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestCancel(c *C) {
+ record := `{
+ "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`
+
+ rec := ContainerRecord{}
+ err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+ c.Check(err, IsNil)
+
+ docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker.RemoveImage(busyboxImageId, true)
+
+ api := &ArvTestClient{ContainerRecord: rec}
+ cr := NewContainerRunner(api, &KeepTestClient{}, docker)
+
+ go func() {
+ for cr.ContainerId == "" {
+ time.Sleep(1 * time.Second)
+ }
+ cr.SigChan <- syscall.SIGINT
+ }()
+
+ err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["log"], NotNil)
+
+ if err != nil {
+ for k, v := range api.Logs {
+ c.Log(k)
+ c.Log(v.String())
+ }
+ }
+
+ c.Check(api.Content["state"], Equals, "Cancelled")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
+}
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
index d5042be..01b6548 100644
--- a/services/crunch-exec/logging.go
+++ b/services/crunch-exec/logging.go
@@ -25,7 +25,15 @@ type ThrottledLogger struct {
}
func RFC3339Timestamp(now time.Time) string {
- return now.Format(time.RFC3339Nano)
+ // return now.Format(time.RFC3339Nano)
+ // Builtin RFC3339Nano format isn't fixed width so
+ // provide our own.
+
+ return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ",
+ now.Year(), now.Month(), now.Day(),
+ now.Hour(), now.Minute(), now.Second(),
+ now.Nanosecond())
+
}
func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
@@ -143,7 +151,9 @@ func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
}
func (this *ArvLogWriter) Close() (err error) {
- err = this.WriteCloser.Close()
- this.WriteCloser = nil
+ if this.WriteCloser != nil {
+ err = this.WriteCloser.Close()
+ this.WriteCloser = nil
+ }
return err
}
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
index bac36f5..27d066a 100644
--- a/services/crunch-exec/logging_test.go
+++ b/services/crunch-exec/logging_test.go
@@ -2,9 +2,6 @@ package main
import (
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
. "gopkg.in/check.v1"
"testing"
"time"
@@ -17,42 +14,6 @@ func Test2(t *testing.T) {
type LoggingTestSuite struct{}
-type LoggingArvTestClient struct {
- Total int64
- Calls int
- Content arvadosclient.Dict
-}
-
-func (this *LoggingArvTestClient) Create(resourceType string,
- parameters arvadosclient.Dict,
- output interface{}) error {
-
- this.Calls += 1
- this.Content = parameters
- return nil
-}
-
-func (this *LoggingArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
- return nil
-}
-
-func (this *LoggingArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
- return nil
-}
-
-type LoggingKeepTestClient struct {
- Content []byte
-}
-
-func (this *LoggingKeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- this.Content = buf
- return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
-}
-
-func (this *LoggingKeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
- return nil, nil
-}
-
type TestTimestamper struct {
count int
}
@@ -66,8 +27,8 @@ func (this *TestTimestamper) Timestamp(t time.Time) string {
var _ = Suite(&LoggingTestSuite{})
func (s *LoggingTestSuite) TestWriteLogs(c *C) {
- api := &LoggingArvTestClient{}
- kc := &LoggingKeepTestClient{}
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
@@ -90,8 +51,8 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
}
func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
- api := &LoggingArvTestClient{}
- kc := &LoggingKeepTestClient{}
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
@@ -110,8 +71,8 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
}
func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
- api := &LoggingArvTestClient{}
- kc := &LoggingKeepTestClient{}
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
ts := &TestTimestamper{}
cr.CrunchLog.Timestamper = ts.Timestamp
@@ -140,5 +101,4 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
c.Check(mt, Equals, ""+
". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunchexec.txt\n"+
". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
-
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list