[ARVADOS] updated: 856776f2435858d0851f0a1b895f43260f43dee2

git at public.curoverse.com git at public.curoverse.com
Mon Feb 15 15:20:28 EST 2016


Summary of changes:
 .../crunch-dispatch-local/crunch-dispatch-local.go |   2 +
 services/crunch-run/crunchrun.go                   | 140 +++++++++++++------
 services/crunch-run/crunchrun_test.go              | 152 +++++++++++----------
 3 files changed, 180 insertions(+), 114 deletions(-)

       via  856776f2435858d0851f0a1b895f43260f43dee2 (commit)
       via  b719ef57055ba2fd06c7a1377cc0d47ee5df935e (commit)
      from  2033d7f870dd0e4c65b563d689514fe21e478992 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 856776f2435858d0851f0a1b895f43260f43dee2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Feb 15 15:20:20 2016 -0500

    Report when container runner finishes.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index be1fef8..e05c0c5 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -215,4 +215,6 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 			log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
 		}
 	}
+
+	log.Printf("Finished container run for %v", uuid)
 }

commit b719ef57055ba2fd06c7a1377cc0d47ee5df935e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Feb 15 15:20:04 2016 -0500

    8015: Handle Docker logging correctly, interpret frame headers and
    demultiplex stdout/stderr.

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 07436f5..775b06d 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -105,16 +105,17 @@ type ContainerRunner struct {
 	LogsPDH       *string
 	RunArvMount
 	MkTempDir
-	ArvMount      *exec.Cmd
-	ArvMountPoint string
-	HostOutputDir string
-	Binds         []string
-	OutputPDH     *string
-	CancelLock    sync.Mutex
-	Cancelled     bool
-	SigChan       chan os.Signal
-	ArvMountExit  chan error
-	finalState    string
+	ArvMount       *exec.Cmd
+	ArvMountPoint  string
+	HostOutputDir  string
+	CleanupTempDir []string
+	Binds          []string
+	OutputPDH      *string
+	CancelLock     sync.Mutex
+	Cancelled      bool
+	SigChan        chan os.Signal
+	ArvMountExit   chan error
+	finalState     string
 }
 
 // SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -238,9 +239,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 		return fmt.Errorf("While creating keep mount temp dir: %v", err)
 	}
 
+	runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
+
 	pdhOnly := true
 	tmpcount := 0
-	arvMountCmd := []string{"--foreground"}
+	arvMountCmd := []string{"--foreground", "--allow-other"}
 	collectionPaths := []string{}
 	runner.Binds = nil
 
@@ -282,7 +285,15 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 				if err != nil {
 					return fmt.Errorf("While creating mount temp dir: %v", err)
 				}
-
+				st, staterr := os.Stat(runner.HostOutputDir)
+				if staterr != nil {
+					return fmt.Errorf("While Stat on temp dir: %v", staterr)
+				}
+				err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid)
+				if staterr != nil {
+					return fmt.Errorf("While Chmod temp dir: %v", err)
+				}
+				runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
 				runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
 			} else {
 				runner.Binds = append(runner.Binds, bind)
@@ -352,28 +363,65 @@ func (runner *ContainerRunner) StartContainer() (err error) {
 	return nil
 }
 
+func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
+	// Handle docker log protocol
+	// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+
+	header := make([]byte, 8)
+	for {
+		_, readerr := io.ReadAtLeast(containerReader, header, 8)
+
+		if readerr == nil {
+			readsize := int64(header[4]) | (int64(header[5]) << 8) | (int64(header[6]) << 16) | (int64(header[7]) << 24)
+			if header[0] == 1 {
+				// stdout
+				_, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
+			} else {
+				// stderr
+				_, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+			}
+		}
+
+		if readerr != nil {
+			if readerr != io.EOF {
+				runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
+			}
+
+			closeerr := runner.Stdout.Close()
+			if closeerr != nil {
+				runner.CrunchLog.Printf("While closing stdout logs: %v", readerr)
+			}
+
+			closeerr = runner.Stderr.Close()
+			if closeerr != nil {
+				runner.CrunchLog.Printf("While closing stderr logs: %v", readerr)
+			}
+
+			runner.loggingDone <- true
+			close(runner.loggingDone)
+			return
+		}
+	}
+}
+
 // AttachLogs connects the docker container stdout and stderr logs to the
 // Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachLogs() (err error) {
 
 	runner.CrunchLog.Print("Attaching container logs")
 
-	var stderrReader, stdoutReader io.Reader
-	stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
+	var containerReader io.Reader
+	containerReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true, Stderr: true})
 	if err != nil {
-		return fmt.Errorf("While getting container standard error: %v", err)
-	}
-	stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
-	if err != nil {
-		return fmt.Errorf("While getting container standard output: %v", err)
+		return fmt.Errorf("While attaching container logs: %v", err)
 	}
 
 	runner.loggingDone = make(chan bool)
 
 	runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
 	runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
-	go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
-	go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
+
+	go runner.ProcessDockerAttach(containerReader)
 
 	return nil
 }
@@ -388,33 +436,14 @@ func (runner *ContainerRunner) WaitFinish() error {
 	}
 	runner.ExitCode = &wr.ExitCode
 
-	// drain stdout/stderr
-	<-runner.loggingDone
+	// wait for stdout/stderr to complete
 	<-runner.loggingDone
 
-	runner.Stdout.Close()
-	runner.Stderr.Close()
-
 	return nil
 }
 
-// HandleOutput sets the output and unmounts the FUSE mount.
+// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
 func (runner *ContainerRunner) CaptureOutput() error {
-	if runner.ArvMount != nil {
-		defer func() {
-			umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
-			umnterr := umount.Run()
-			if umnterr != nil {
-				runner.CrunchLog.Print("While running fusermount: %v", umnterr)
-			}
-
-			mnterr := <-runner.ArvMountExit
-			if mnterr != nil {
-				runner.CrunchLog.Print("Arv-mount exit error: %v", mnterr)
-			}
-		}()
-	}
-
 	if runner.finalState != "Complete" {
 		return nil
 	}
@@ -471,6 +500,26 @@ func (runner *ContainerRunner) CaptureOutput() error {
 	return nil
 }
 
+func (runner *ContainerRunner) CleanupDirs() {
+	if runner.ArvMount != nil {
+		umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+		umnterr := umount.Run()
+		if umnterr != nil {
+			runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+		}
+
+		mnterr := <-runner.ArvMountExit
+		if mnterr != nil {
+			runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+		}
+	}
+
+	for _, tmpdir := range runner.CleanupTempDir {
+		rmerr := os.RemoveAll(tmpdir)
+		runner.CrunchLog.Printf("While cleaning up temporary directories: %v", rmerr)
+	}
+}
+
 // CommitLogs posts the collection containing the final container logs.
 func (runner *ContainerRunner) CommitLogs() error {
 	runner.CrunchLog.Print(runner.finalState)
@@ -558,13 +607,16 @@ func (runner *ContainerRunner) Run() (err error) {
 			runner.CrunchLog.Print(outputerr)
 		}
 
-		// (8) write logs
+		// (8) clean up temporary directories
+		runner.CleanupDirs()
+
+		// (9) write logs
 		logerr := runner.CommitLogs()
 		if logerr != nil {
 			runner.CrunchLog.Print(logerr)
 		}
 
-		// (9) update container record with results
+		// (10) update container record with results
 		updateerr := runner.UpdateContainerRecordComplete()
 		if updateerr != nil {
 			runner.CrunchLog.Print(updateerr)
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 72e09b1..cfe3108 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -13,8 +13,10 @@ import (
 	. "gopkg.in/check.v1"
 	"io"
 	"io/ioutil"
+	"log"
 	"os"
 	"os/exec"
+	"sort"
 	"strings"
 	"syscall"
 	"testing"
@@ -53,22 +55,19 @@ var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb
 var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
 
 type TestDockerClient struct {
-	imageLoaded  string
-	stdoutReader io.ReadCloser
-	stderrReader io.ReadCloser
-	stdoutWriter io.WriteCloser
-	stderrWriter io.WriteCloser
-	fn           func(t *TestDockerClient)
-	finish       chan dockerclient.WaitResult
-	stop         chan bool
-	cwd          string
-	env          []string
+	imageLoaded string
+	logReader   io.ReadCloser
+	logWriter   io.WriteCloser
+	fn          func(t *TestDockerClient)
+	finish      chan dockerclient.WaitResult
+	stop        chan bool
+	cwd         string
+	env         []string
 }
 
 func NewTestDockerClient() *TestDockerClient {
 	t := &TestDockerClient{}
-	t.stdoutReader, t.stdoutWriter = io.Pipe()
-	t.stderrReader, t.stderrWriter = io.Pipe()
+	t.logReader, t.logWriter = io.Pipe()
 	t.finish = make(chan dockerclient.WaitResult)
 	t.stop = make(chan bool)
 	t.cwd = "/"
@@ -116,13 +115,7 @@ func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostCo
 }
 
 func (t *TestDockerClient) ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error) {
-	if options.Stdout {
-		return t.stdoutReader, nil
-	}
-	if options.Stderr {
-		return t.stderrReader, nil
-	}
-	return nil, nil
+	return t.logReader, nil
 }
 
 func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
@@ -358,12 +351,20 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
 	return nil
 }
 
+func dockerLog(fd byte, msg string) []byte {
+	by := []byte(msg)
+	header := make([]byte, 8+len(by))
+	header[0] = fd
+	header[4] = byte(len(by))
+	copy(header[8:], by)
+	return header
+}
+
 func (s *TestSuite) TestRunContainer(c *C) {
 	docker := NewTestDockerClient()
 	docker.fn = func(t *TestDockerClient) {
-		t.stdoutWriter.Write([]byte("Hello world\n"))
-		t.stdoutWriter.Close()
-		t.stderrWriter.Close()
+		t.logWriter.Write(dockerLog(1, "Hello world\n"))
+		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{}
 	}
 	cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
@@ -493,9 +494,8 @@ func (s *TestSuite) TestFullRunHello(c *C) {
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-		t.stdoutWriter.Write([]byte("hello world\n"))
-		t.stdoutWriter.Close()
-		t.stderrWriter.Close()
+		t.logWriter.Write(dockerLog(1, "hello world\n"))
+		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{}
 	})
 
@@ -517,10 +517,9 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-		t.stdoutWriter.Write([]byte("hello\n"))
-		t.stderrWriter.Write([]byte("world\n"))
-		t.stdoutWriter.Close()
-		t.stderrWriter.Close()
+		t.logWriter.Write(dockerLog(1, "hello\n"))
+		t.logWriter.Write(dockerLog(2, "world\n"))
+		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 1}
 	})
 
@@ -543,15 +542,16 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-		t.stdoutWriter.Write([]byte(t.cwd + "\n"))
-		t.stdoutWriter.Close()
-		t.stderrWriter.Close()
+		t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
 	})
 
 	c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
 	c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
+	log.Print(api.Logs["stdout"].String())
+
 	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
 }
 
@@ -566,9 +566,8 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-		t.stdoutWriter.Write([]byte(t.cwd + "\n"))
-		t.stdoutWriter.Close()
-		t.stderrWriter.Close()
+		t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
 	})
 
@@ -597,9 +596,8 @@ func (s *TestSuite) TestCancel(c *C) {
 	docker := NewTestDockerClient()
 	docker.fn = func(t *TestDockerClient) {
 		<-t.stop
-		t.stdoutWriter.Write([]byte("foo\n"))
-		t.stdoutWriter.Close()
-		t.stderrWriter.Close()
+		t.logWriter.Write(dockerLog(1, "foo\n"))
+		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
 	}
 	docker.RemoveImage(hwImageId, true)
@@ -644,9 +642,8 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-		t.stdoutWriter.Write([]byte(t.env[0][7:] + "\n"))
-		t.stdoutWriter.Close()
-		t.stderrWriter.Close()
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
 	})
 
@@ -675,40 +672,55 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 	i := 0
 	cr.MkTempDir = func(string, string) (string, error) {
 		i += 1
-		return fmt.Sprintf("/tmp/mktmpdir%d", i), nil
+		d := fmt.Sprintf("/tmp/mktmpdir%d", i)
+		os.Mkdir(d, os.ModePerm)
+		return d, nil
 	}
 
-	cr.ContainerRecord.Mounts = make(map[string]Mount)
-	cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
-	cr.OutputPath = "/tmp"
-
-	err := cr.SetupMounts()
-	c.Check(err, IsNil)
-	c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
-	c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir2:/tmp"})
-
-	i = 0
-	cr.ContainerRecord.Mounts = make(map[string]Mount)
-	cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
-	cr.OutputPath = "/keeptmp"
+	{
+		cr.ContainerRecord.Mounts = make(map[string]Mount)
+		cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
+		cr.OutputPath = "/tmp"
 
-	os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+		err := cr.SetupMounts()
+		c.Check(err, IsNil)
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+		c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir2:/tmp"})
+		cr.CleanupDirs()
+	}
 
-	err = cr.SetupMounts()
-	c.Check(err, IsNil)
-	c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
-	c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/tmp0:/keeptmp"})
+	{
+		i = 0
+		cr.ContainerRecord.Mounts = make(map[string]Mount)
+		cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
+		cr.OutputPath = "/keeptmp"
 
-	i = 0
-	cr.ContainerRecord.Mounts = make(map[string]Mount)
-	cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
-	cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
-	cr.OutputPath = "/keeptmp"
+		os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
 
-	os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+		err := cr.SetupMounts()
+		c.Check(err, IsNil)
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+		c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/tmp0:/keeptmp"})
+		cr.CleanupDirs()
+	}
 
-	err = cr.SetupMounts()
-	c.Check(err, IsNil)
-	c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
-	c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro", "/tmp/mktmpdir1/tmp0:/keeptmp"})
+	{
+		i = 0
+		cr.ContainerRecord.Mounts = make(map[string]Mount)
+		cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
+		cr.ContainerRecord.Mounts["/keepout"] = Mount{Kind: "collection", Writable: true}
+		cr.OutputPath = "/keepout"
+
+		os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+		os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+
+		err := cr.SetupMounts()
+		c.Check(err, IsNil)
+		c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+		var ss sort.StringSlice = cr.Binds
+		ss.Sort()
+		c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
+			"/tmp/mktmpdir1/tmp0:/keepout"})
+		cr.CleanupDirs()
+	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list