[ARVADOS] created: 2.1.0-1053-g487925638
Git user
git at public.arvados.org
Fri Jul 16 14:25:21 UTC 2021
at 4879256386a5be9566f31f2c266b682993029e14 (commit)
commit 4879256386a5be9566f31f2c266b682993029e14
Author: Tom Clegg <tom at curii.com>
Date: Fri Jul 16 10:24:42 2021 -0400
17816: Close stdin/stdout/stderr from main instead of executor.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 3c9c38161..08e4aa389 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -77,7 +77,10 @@ type PsProcess interface {
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- executor containerExecutor
+ executor containerExecutor
+ executorStdin io.Closer
+ executorStdout io.Closer
+ executorStderr io.Closer
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
@@ -106,8 +109,6 @@ type ContainerRunner struct {
ExitCode *int
NewLogWriter NewLogWriter
CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
logUUID string
logMtx sync.Mutex
LogCollection arvados.CollectionFileSystem
@@ -877,7 +878,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
- var stdin io.ReadCloser
+ var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
if mnt, ok := runner.Container.Mounts["stdin"]; ok {
switch mnt.Kind {
case "collection":
@@ -954,6 +955,9 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
if !runner.enableMemoryLimit {
ram = 0
}
+ runner.executorStdin = stdin
+ runner.executorStdout = stdout
+ runner.executorStderr = stderr
return runner.executor.Create(containerSpec{
Image: imageID,
VCPUs: runner.Container.RuntimeConstraints.VCPUs,
@@ -1018,6 +1022,27 @@ func (runner *ContainerRunner) WaitFinish() error {
}
runner.ExitCode = &exitcode
+ var returnErr error
+ if err = runner.executorStdin.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdin: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ returnErr = err
+ }
+ if err = runner.executorStdout.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdout: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+ if err = runner.executorStderr.Close(); err != nil {
+ err = fmt.Errorf("error closing container stderr: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+
if runner.statReporter != nil {
runner.statReporter.Stop()
err = runner.statLogger.Close()
@@ -1025,7 +1050,7 @@ func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
- return nil
+ return returnErr
}
func (runner *ContainerRunner) updateLogs() {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 4b1bf8425..22d65334e 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -120,8 +120,6 @@ func (e *stubExecutor) CgroupID() string { return "cgroupid" }
func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
func (e *stubExecutor) Close() { e.closed = true }
func (e *stubExecutor) Wait(context.Context) (int, error) {
- defer e.created.Stdout.Close()
- defer e.created.Stderr.Close()
return <-e.exit, e.waitErr
}
@@ -522,8 +520,6 @@ func dockerLog(fd byte, msg string) []byte {
func (s *TestSuite) TestRunContainer(c *C) {
s.executor.runFunc = func() {
fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
- s.executor.created.Stdout.Close()
- s.executor.created.Stderr.Close()
s.executor.exit <- 0
}
diff --git a/lib/crunchrun/docker.go b/lib/crunchrun/docker.go
index a39b754b3..861f8c8c1 100644
--- a/lib/crunchrun/docker.go
+++ b/lib/crunchrun/docker.go
@@ -186,7 +186,7 @@ func (e *dockerExecutor) Wait(ctx context.Context) (int, error) {
}
}
-func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
+func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
Stream: true,
Stdin: stdin != nil,
@@ -213,8 +213,7 @@ func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteClo
return nil
}
-func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeConn func() error) error {
- defer stdin.Close()
+func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
defer closeConn()
_, err := io.Copy(conn, stdin)
if err != nil {
@@ -225,7 +224,7 @@ func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeC
// Handle docker log protocol; see
// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
-func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reader io.Reader) error {
+func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
header := make([]byte, 8)
var err error
for err == nil {
@@ -247,14 +246,6 @@ func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reade
if err != nil {
return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
}
- err = stdout.Close()
- if err != nil {
- return fmt.Errorf("error writing stdout: close: %v", err)
- }
- err = stderr.Close()
- if err != nil {
- return fmt.Errorf("error writing stderr: close: %v", err)
- }
return nil
}
diff --git a/lib/crunchrun/executor.go b/lib/crunchrun/executor.go
index c773febe9..f4feaa06c 100644
--- a/lib/crunchrun/executor.go
+++ b/lib/crunchrun/executor.go
@@ -25,9 +25,9 @@ type containerSpec struct {
EnableNetwork bool
NetworkMode string // docker network mode, normally "default"
CgroupParent string
- Stdin io.ReadCloser
- Stdout io.WriteCloser
- Stderr io.WriteCloser
+ Stdin io.Reader
+ Stdout io.Writer
+ Stderr io.Writer
}
// containerExecutor is an interface to a container runtime
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list