[ARVADOS] updated: 856776f2435858d0851f0a1b895f43260f43dee2
git at public.curoverse.com
git at public.curoverse.com
Mon Feb 15 15:20:28 EST 2016
Summary of changes:
.../crunch-dispatch-local/crunch-dispatch-local.go | 2 +
services/crunch-run/crunchrun.go | 140 +++++++++++++------
services/crunch-run/crunchrun_test.go | 152 +++++++++++----------
3 files changed, 180 insertions(+), 114 deletions(-)
via 856776f2435858d0851f0a1b895f43260f43dee2 (commit)
via b719ef57055ba2fd06c7a1377cc0d47ee5df935e (commit)
from 2033d7f870dd0e4c65b563d689514fe21e478992 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 856776f2435858d0851f0a1b895f43260f43dee2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Feb 15 15:20:20 2016 -0500
Report when container runner finishes.
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index be1fef8..e05c0c5 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -215,4 +215,6 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
}
}
+
+ log.Printf("Finished container run for %v", uuid)
}
commit b719ef57055ba2fd06c7a1377cc0d47ee5df935e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Feb 15 15:20:04 2016 -0500
8015: Handle Docker logging correctly, interpret frame headers and
demultiplex stdout/stderr.
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 07436f5..775b06d 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -105,16 +105,17 @@ type ContainerRunner struct {
LogsPDH *string
RunArvMount
MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- Binds []string
- OutputPDH *string
- CancelLock sync.Mutex
- Cancelled bool
- SigChan chan os.Signal
- ArvMountExit chan error
- finalState string
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ CleanupTempDir []string
+ Binds []string
+ OutputPDH *string
+ CancelLock sync.Mutex
+ Cancelled bool
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ finalState string
}
// SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -238,9 +239,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
return fmt.Errorf("While creating keep mount temp dir: %v", err)
}
+ runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
+
pdhOnly := true
tmpcount := 0
- arvMountCmd := []string{"--foreground"}
+ arvMountCmd := []string{"--foreground", "--allow-other"}
collectionPaths := []string{}
runner.Binds = nil
@@ -282,7 +285,15 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
if err != nil {
return fmt.Errorf("While creating mount temp dir: %v", err)
}
-
+ st, staterr := os.Stat(runner.HostOutputDir)
+ if staterr != nil {
+ return fmt.Errorf("While Stat on temp dir: %v", staterr)
+ }
+ err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid)
+ if staterr != nil {
+ return fmt.Errorf("While Chmod temp dir: %v", err)
+ }
+ runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
} else {
runner.Binds = append(runner.Binds, bind)
@@ -352,28 +363,65 @@ func (runner *ContainerRunner) StartContainer() (err error) {
return nil
}
+func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
+ // Handle docker log protocol
+ // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+
+ header := make([]byte, 8)
+ for {
+ _, readerr := io.ReadAtLeast(containerReader, header, 8)
+
+ if readerr == nil {
+ readsize := int64(header[4]) | (int64(header[5]) << 8) | (int64(header[6]) << 16) | (int64(header[7]) << 24)
+ if header[0] == 1 {
+ // stdout
+ _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
+ } else {
+ // stderr
+ _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+ }
+ }
+
+ if readerr != nil {
+ if readerr != io.EOF {
+ runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
+ }
+
+ closeerr := runner.Stdout.Close()
+ if closeerr != nil {
+ runner.CrunchLog.Printf("While closing stdout logs: %v", readerr)
+ }
+
+ closeerr = runner.Stderr.Close()
+ if closeerr != nil {
+ runner.CrunchLog.Printf("While closing stderr logs: %v", readerr)
+ }
+
+ runner.loggingDone <- true
+ close(runner.loggingDone)
+ return
+ }
+ }
+}
+
// AttachLogs connects the docker container stdout and stderr logs to the
// Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachLogs() (err error) {
runner.CrunchLog.Print("Attaching container logs")
- var stderrReader, stdoutReader io.Reader
- stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
+ var containerReader io.Reader
+ containerReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true, Stderr: true})
if err != nil {
- return fmt.Errorf("While getting container standard error: %v", err)
- }
- stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
- if err != nil {
- return fmt.Errorf("While getting container standard output: %v", err)
+ return fmt.Errorf("While attaching container logs: %v", err)
}
runner.loggingDone = make(chan bool)
runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
- go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
- go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
+
+ go runner.ProcessDockerAttach(containerReader)
return nil
}
@@ -388,33 +436,14 @@ func (runner *ContainerRunner) WaitFinish() error {
}
runner.ExitCode = &wr.ExitCode
- // drain stdout/stderr
- <-runner.loggingDone
+ // wait for stdout/stderr to complete
<-runner.loggingDone
- runner.Stdout.Close()
- runner.Stderr.Close()
-
return nil
}
-// HandleOutput sets the output and unmounts the FUSE mount.
+// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
func (runner *ContainerRunner) CaptureOutput() error {
- if runner.ArvMount != nil {
- defer func() {
- umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
- umnterr := umount.Run()
- if umnterr != nil {
- runner.CrunchLog.Print("While running fusermount: %v", umnterr)
- }
-
- mnterr := <-runner.ArvMountExit
- if mnterr != nil {
- runner.CrunchLog.Print("Arv-mount exit error: %v", mnterr)
- }
- }()
- }
-
if runner.finalState != "Complete" {
return nil
}
@@ -471,6 +500,26 @@ func (runner *ContainerRunner) CaptureOutput() error {
return nil
}
+func (runner *ContainerRunner) CleanupDirs() {
+ if runner.ArvMount != nil {
+ umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+ umnterr := umount.Run()
+ if umnterr != nil {
+ runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+ }
+
+ mnterr := <-runner.ArvMountExit
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+ }
+ }
+
+ for _, tmpdir := range runner.CleanupTempDir {
+ rmerr := os.RemoveAll(tmpdir)
+ runner.CrunchLog.Printf("While cleaning up temporary directories: %v", rmerr)
+ }
+}
+
// CommitLogs posts the collection containing the final container logs.
func (runner *ContainerRunner) CommitLogs() error {
runner.CrunchLog.Print(runner.finalState)
@@ -558,13 +607,16 @@ func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Print(outputerr)
}
- // (8) write logs
+ // (8) clean up temporary directories
+ runner.CleanupDirs()
+
+ // (9) write logs
logerr := runner.CommitLogs()
if logerr != nil {
runner.CrunchLog.Print(logerr)
}
- // (9) update container record with results
+ // (10) update container record with results
updateerr := runner.UpdateContainerRecordComplete()
if updateerr != nil {
runner.CrunchLog.Print(updateerr)
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 72e09b1..cfe3108 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -13,8 +13,10 @@ import (
. "gopkg.in/check.v1"
"io"
"io/ioutil"
+ "log"
"os"
"os/exec"
+ "sort"
"strings"
"syscall"
"testing"
@@ -53,22 +55,19 @@ var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb
var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
type TestDockerClient struct {
- imageLoaded string
- stdoutReader io.ReadCloser
- stderrReader io.ReadCloser
- stdoutWriter io.WriteCloser
- stderrWriter io.WriteCloser
- fn func(t *TestDockerClient)
- finish chan dockerclient.WaitResult
- stop chan bool
- cwd string
- env []string
+ imageLoaded string
+ logReader io.ReadCloser
+ logWriter io.WriteCloser
+ fn func(t *TestDockerClient)
+ finish chan dockerclient.WaitResult
+ stop chan bool
+ cwd string
+ env []string
}
func NewTestDockerClient() *TestDockerClient {
t := &TestDockerClient{}
- t.stdoutReader, t.stdoutWriter = io.Pipe()
- t.stderrReader, t.stderrWriter = io.Pipe()
+ t.logReader, t.logWriter = io.Pipe()
t.finish = make(chan dockerclient.WaitResult)
t.stop = make(chan bool)
t.cwd = "/"
@@ -116,13 +115,7 @@ func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostCo
}
func (t *TestDockerClient) ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error) {
- if options.Stdout {
- return t.stdoutReader, nil
- }
- if options.Stderr {
- return t.stderrReader, nil
- }
- return nil, nil
+ return t.logReader, nil
}
func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
@@ -358,12 +351,20 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
return nil
}
+func dockerLog(fd byte, msg string) []byte {
+ by := []byte(msg)
+ header := make([]byte, 8+len(by))
+ header[0] = fd
+ header[4] = byte(len(by))
+ copy(header[8:], by)
+ return header
+}
+
func (s *TestSuite) TestRunContainer(c *C) {
docker := NewTestDockerClient()
docker.fn = func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte("Hello world\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "Hello world\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{}
}
cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
@@ -493,9 +494,8 @@ func (s *TestSuite) TestFullRunHello(c *C) {
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte("hello world\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{}
})
@@ -517,10 +517,9 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte("hello\n"))
- t.stderrWriter.Write([]byte("world\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "hello\n"))
+ t.logWriter.Write(dockerLog(2, "world\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 1}
})
@@ -543,15 +542,16 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte(t.cwd + "\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+ log.Print(api.Logs["stdout"].String())
+
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
}
@@ -566,9 +566,8 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte(t.cwd + "\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
@@ -597,9 +596,8 @@ func (s *TestSuite) TestCancel(c *C) {
docker := NewTestDockerClient()
docker.fn = func(t *TestDockerClient) {
<-t.stop
- t.stdoutWriter.Write([]byte("foo\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "foo\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
}
docker.RemoveImage(hwImageId, true)
@@ -644,9 +642,8 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte(t.env[0][7:] + "\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
@@ -675,40 +672,55 @@ func (s *TestSuite) TestSetupMounts(c *C) {
i := 0
cr.MkTempDir = func(string, string) (string, error) {
i += 1
- return fmt.Sprintf("/tmp/mktmpdir%d", i), nil
+ d := fmt.Sprintf("/tmp/mktmpdir%d", i)
+ os.Mkdir(d, os.ModePerm)
+ return d, nil
}
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
- cr.OutputPath = "/tmp"
-
- err := cr.SetupMounts()
- c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
- c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir2:/tmp"})
-
- i = 0
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
- cr.OutputPath = "/keeptmp"
+ {
+ cr.ContainerRecord.Mounts = make(map[string]Mount)
+ cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
+ cr.OutputPath = "/tmp"
- os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+ err := cr.SetupMounts()
+ c.Check(err, IsNil)
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir2:/tmp"})
+ cr.CleanupDirs()
+ }
- err = cr.SetupMounts()
- c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
- c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/tmp0:/keeptmp"})
+ {
+ i = 0
+ cr.ContainerRecord.Mounts = make(map[string]Mount)
+ cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
+ cr.OutputPath = "/keeptmp"
- i = 0
- cr.ContainerRecord.Mounts = make(map[string]Mount)
- cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
- cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
- cr.OutputPath = "/keeptmp"
+ os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
- os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+ err := cr.SetupMounts()
+ c.Check(err, IsNil)
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/tmp0:/keeptmp"})
+ cr.CleanupDirs()
+ }
- err = cr.SetupMounts()
- c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
- c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro", "/tmp/mktmpdir1/tmp0:/keeptmp"})
+ {
+ i = 0
+ cr.ContainerRecord.Mounts = make(map[string]Mount)
+ cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
+ cr.ContainerRecord.Mounts["/keepout"] = Mount{Kind: "collection", Writable: true}
+ cr.OutputPath = "/keepout"
+
+ os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+ os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+
+ err := cr.SetupMounts()
+ c.Check(err, IsNil)
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ var ss sort.StringSlice = cr.Binds
+ ss.Sort()
+ c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
+ "/tmp/mktmpdir1/tmp0:/keepout"})
+ cr.CleanupDirs()
+ }
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list