[ARVADOS] created: c217491f9fdc78bd1c665618137c053a852599ac

Git user git at public.curoverse.com
Thu May 5 17:53:09 EDT 2016


        at  c217491f9fdc78bd1c665618137c053a852599ac (commit)


commit c217491f9fdc78bd1c665618137c053a852599ac
Author: radhika <radhika at curoverse.com>
Date:   Thu May 5 17:51:47 2016 -0400

    8464: Add stdout redirection in crunch2.

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 01edb0a..acaa5a2 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -44,6 +44,7 @@ type Mount struct {
 	PortableDataHash string `json:"portable_data_hash"`
 	UUID             string `json:"uuid"`
 	DeviceType       string `json:"device_type"`
+	Path             string `json:"path"`
 }
 
 // Collection record returned by the API server.
@@ -99,7 +100,7 @@ type ContainerRunner struct {
 	NewLogWriter
 	loggingDone   chan bool
 	CrunchLog     *ThrottledLogger
-	Stdout        *ThrottledLogger
+	Stdout        io.WriteCloser
 	Stderr        *ThrottledLogger
 	LogCollection *CollectionWriter
 	LogsPDH       *string
@@ -246,6 +247,22 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 	runner.Binds = nil
 
 	for bind, mnt := range runner.ContainerRecord.Mounts {
+		if bind == "stdout" {
+			// Is it a "file" mount kind?
+			if mnt.Kind != "file" {
+				return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+			}
+
+			// Does path start with OutputPath?
+			prefix := runner.ContainerRecord.OutputPath
+			if !strings.HasSuffix(prefix, "/") {
+				prefix += "/"
+			}
+			if !strings.HasPrefix(mnt.Path, prefix) {
+				return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+			}
+		}
+
 		if mnt.Kind == "collection" {
 			var src string
 			if mnt.UUID != "" && mnt.PortableDataHash != "" {
@@ -296,6 +313,15 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 			} else {
 				runner.Binds = append(runner.Binds, bind)
 			}
+		} else if mnt.Kind == "file" {
+			runner.HostOutputDir = runner.ContainerRecord.OutputPath
+			st, staterr := os.Stat(runner.HostOutputDir)
+			if staterr != nil {
+				return fmt.Errorf("While getting stat on output_path %v: %v", runner.HostOutputDir, staterr)
+			}
+			if st.IsDir() != true {
+				return fmt.Errorf("Given output_path '%v' is not a directory", runner.HostOutputDir)
+			}
 		} else {
 			return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
 		}
@@ -383,7 +409,39 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 
 	runner.loggingDone = make(chan bool)
 
-	runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+	var stdoutMnt Mount
+	for bind, mnt := range runner.ContainerRecord.Mounts {
+		if bind == "stdout" {
+			stdoutMnt = mnt
+			break
+		}
+	}
+	if stdoutMnt.Path != "" {
+		stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):]
+		index := strings.LastIndex(stdoutPath, "/")
+		if index > 0 {
+			stdoutSubdirs := stdoutPath[:index]
+			if stdoutSubdirs != "" {
+				st, err := os.Stat(runner.HostOutputDir)
+				if err != nil {
+					return fmt.Errorf("While Stat on temp dir: %v", err)
+				}
+				path := runner.HostOutputDir + stdoutSubdirs
+				err = os.MkdirAll(path, st.Mode()|os.ModeSetgid|0777)
+				if err != nil {
+					return fmt.Errorf("While MkdirAll %q: %v", path, err)
+				}
+				st, err = os.Stat(path)
+			}
+		}
+		stdoutFile, err := os.Create(runner.HostOutputDir + "/" + stdoutPath)
+		if err != nil {
+			return fmt.Errorf("While creating stdout file: %v", err)
+		}
+		runner.Stdout = stdoutFile
+	} else {
+		runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+	}
 	runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
 	go runner.ProcessDockerAttach(containerReader)
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 5ee879d..a58fae0 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -732,3 +732,105 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		cr.CleanupDirs()
 	}
 }
+
+func (s *TestSuite) TestStdout(c *C) {
+	tmpdir, _ := ioutil.TempDir("", "test-stdout")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	helperRecord := `{`
+	helperRecord += `"command": ["/bin/sh", "-c", "echo $FROBIZ"],`
+	helperRecord += `"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",`
+	helperRecord += `"cwd": "/bin",`
+	helperRecord += `"environment": {"FROBIZ": "bilbo"},`
+	helperRecord += `"mounts": {"stdout": {"kind": "file", "path": "` + tmpdir + `/a/b/c.out"} },`
+	helperRecord += `"output_path": "` + tmpdir + `",`
+	helperRecord += `"priority": 1,`
+	helperRecord += `"runtime_constraints": {}`
+	helperRecord += `}`
+
+	api, cr := FullRunHelper(c, helperRecord, func(t *TestDockerClient) {
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
+	c.Check(api.Calls, Equals, 6)
+	c.Check(api.Content[5]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+	c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+	stdout := cr.HostOutputDir + "/a/b/c.out"
+	data, err := ioutil.ReadFile(stdout)
+	c.Check(err, IsNil)
+	c.Check("bilbo\n", Equals, string(data))
+}
+
+// Used by the TestStdoutWithWrongPath*()
+func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
+	rec := ContainerRecord{}
+	err = json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+	c.Check(err, IsNil)
+
+	docker := NewTestDockerClient()
+	docker.fn = fn
+	docker.RemoveImage(hwImageId, true)
+
+	api = &ArvTestClient{ContainerRecord: rec}
+	cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	am := &ArvMountCmdLine{}
+	cr.RunArvMount = am.ArvMountTest
+
+	err = cr.Run()
+	return
+}
+
+func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
+	_, _, err := StdoutErrorRunHelper(c, `{
+    "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
+    "output_path": "/tmp"
+}`, func(t *TestDockerClient) {})
+
+	c.Check(err, NotNil)
+	c.Check(strings.Contains(err.Error(), "Stdout path does not start with OutputPath"), Equals, true)
+}
+
+func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
+	_, _, err := StdoutErrorRunHelper(c, `{
+    "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
+    "output_path": "/tmp"
+}`, func(t *TestDockerClient) {})
+
+	c.Check(err, NotNil)
+	c.Check(strings.Contains(err.Error(), "Unsupported mount kind 'tmp' for stdout"), Equals, true)
+}
+
+func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
+	_, _, err := StdoutErrorRunHelper(c, `{
+    "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
+    "output_path": "/tmp"
+}`, func(t *TestDockerClient) {})
+
+	c.Check(err, NotNil)
+	c.Check(strings.Contains(err.Error(), "Unsupported mount kind 'collection' for stdout"), Equals, true)
+}
+
+func (s *TestSuite) TestStdoutNoSuchDir(c *C) {
+	tmpdir, _ := ioutil.TempDir("", "test-stdout")
+	defer func() {
+		os.RemoveAll(tmpdir)
+	}()
+
+	helperRecord := `{`
+	helperRecord += `"command": ["/bin/sh", "-c", "echo $FROBIZ"],`
+	helperRecord += `"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",`
+	helperRecord += `"cwd": "/bin",`
+	helperRecord += `"environment": {"FROBIZ": "bilbo"},`
+	helperRecord += `"mounts": {"stdout": {"kind": "file", "path": "` + tmpdir + `/nosuchsubdir/a/b/c.out"} },`
+	helperRecord += `"output_path": "` + tmpdir + `/nosuchsubdir",`
+	helperRecord += `"priority": 1,`
+	helperRecord += `"runtime_constraints": {}`
+	helperRecord += `}`
+
+	_, _, err := StdoutErrorRunHelper(c, helperRecord, func(t *TestDockerClient) {})
+	c.Check(err, NotNil)
+	c.Check(strings.Contains(err.Error(), "/nosuchsubdir: no such file or directory"), Equals, true)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list