[ARVADOS] updated: 6b91e8201f24d2b4126532d809abba42a5ab959c

Git user git at public.curoverse.com
Thu Jan 26 17:57:20 EST 2017


Summary of changes:
 services/crunch-run/crunchrun.go      | 133 +++++++++++++++++++++++-
 services/crunch-run/crunchrun_test.go | 184 ++++++++++++++++++++++++++++++++--
 2 files changed, 306 insertions(+), 11 deletions(-)

       via  6b91e8201f24d2b4126532d809abba42a5ab959c (commit)
      from  fbacdaec3dbba425155ab6348c7e6b80ff4e710b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 6b91e8201f24d2b4126532d809abba42a5ab959c
Author: radhika <radhika at curoverse.com>
Date:   Sun Jan 22 05:50:55 2017 -0500

    9397: pre-populate output with mounts under output_dir (wip)

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index dfe2dfa..f2b4277 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -242,8 +242,15 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
 
 var tmpBackedOutputDir = false
 
+func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
+	if runner.ArvMountPoint == "" {
+		runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+	}
+	return
+}
+
 func (runner *ContainerRunner) SetupMounts() (err error) {
-	runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+	err = runner.SetupArvMountPoint("keep")
 	if err != nil {
 		return fmt.Errorf("While creating keep mount temp dir: %v", err)
 	}
@@ -653,6 +660,57 @@ func (runner *ContainerRunner) CaptureOutput() error {
 		manifestText = rec.ManifestText
 	}
 
+	// Pre-populate output from the configured mount points
+	var binds []string
+	for bind, _ := range runner.Container.Mounts {
+		binds = append(binds, bind)
+	}
+	sort.Strings(binds)
+
+	for _, bind := range binds {
+		mnt := runner.Container.Mounts[bind]
+
+		bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
+
+		if bindSuffix == bind || len(bindSuffix) <= 0 {
+			// either doesn't start with OutputPath or is OutputPath itself
+			continue
+		}
+
+		if strings.Index(bindSuffix, "/") != 0 {
+			return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind)
+		}
+
+		jsondata, err := json.Marshal(mnt.Content)
+		if err != nil {
+			return fmt.Errorf("While marshal of mount content: %v", err)
+		}
+		var content map[string]interface{}
+		err = json.Unmarshal(jsondata, &content)
+		if err != nil {
+			return fmt.Errorf("While unmarshal of mount content: %v", err)
+		}
+
+		if content["exclude_from_output"] == true {
+			continue
+		}
+
+		idx := strings.Index(mnt.PortableDataHash, "/")
+		if idx > 0 {
+			mnt.Path = mnt.PortableDataHash[idx:]
+			mnt.PortableDataHash = mnt.PortableDataHash[0 : idx-1]
+		}
+
+		// append to manifest_text
+		m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
+		if err != nil {
+			return err
+		}
+
+		manifestText = manifestText + m
+	}
+
+	// Save output
 	var response arvados.Collection
 	err = runner.ArvClient.Create("collections",
 		arvadosclient.Dict{
@@ -668,6 +726,79 @@ func (runner *ContainerRunner) CaptureOutput() error {
 	return nil
 }
 
+// Fetch the collection for the mnt.PortableDataHash
+// Return the manifest_text fragment corresponding to the specified mnt.Path
+//  after making any required updates.
+//  Ex:
+//    If mnt.Path is not speficied,
+//      return the entire manifest_text after replacing any "." with bindSuffix
+//    If mnt.Path corresponds to one stream,
+//      return the manitest_text for that stream after replacing that stream name with bindSuffix
+//    Otherwise, check if a filename in any one stream is being sought. Return the manitest_text
+//      for that stream after replacing that stream name and file name using bindSuffix components.
+func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
+	var collection arvados.Collection
+	err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+	if err != nil {
+		return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
+	}
+
+	manifestText := ""
+	if mnt.Path == "" {
+		// no path specified; return the entire manifest text
+		manifestText = collection.ManifestText
+		manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
+		manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
+	} else {
+		// either a stream or file from a stream is being sought
+		bindIdx := strings.LastIndex(bindSuffix, "/")
+		var bindSubdir, bindFileName string
+		if bindIdx >= 0 {
+			bindSubdir = bindSuffix[0:bindIdx]
+			bindFileName = bindSuffix[bindIdx+1:]
+		}
+		pathIdx := strings.LastIndex(mnt.Path, "/")
+		var pathSubdir, pathFileName string
+		if pathIdx >= 0 {
+			pathSubdir = mnt.Path[0:pathIdx]
+			pathFileName = mnt.Path[pathIdx+1:]
+		}
+		streams := strings.Split(collection.ManifestText, "\n")
+		for _, stream := range streams {
+			tokens := strings.Split(stream, " ")
+			if tokens[0] == "."+mnt.Path {
+				// path refers to this stream
+				adjustedStream := strings.Replace(stream, mnt.Path, bindSuffix, -1)
+				manifestText = adjustedStream + "\n"
+				break
+			} else {
+				// look for a matching file in this stream
+				if tokens[0] == "."+pathSubdir {
+					// path refers to a file in this stream
+					for _, token := range tokens {
+						if strings.Index(token, ":"+pathFileName) > 0 {
+							// found the file in the stream; discard all other file tokens
+							for _, t := range tokens {
+								if strings.Index(t, ":") == 0 {
+									manifestText = " " + t
+								} else {
+									break // done reading all non-file tokens
+								}
+								token = strings.Replace(token, pathFileName, bindFileName, -1)
+								manifestText = manifestText + token + "\n"
+								manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
+								break
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+
+	return manifestText, nil
+}
+
 func (runner *ContainerRunner) loadDiscoveryVars() {
 	tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
 	if err != nil {
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 1cbaa6c..b96b7c7 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -518,7 +518,7 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
 // dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
+func FullRunHelper(c *C, record string, extraMounts []string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
 	rec := arvados.Container{}
 	err := json.Unmarshal([]byte(record), &rec)
 	c.Check(err, IsNil)
@@ -534,6 +534,15 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
 	am := &ArvMountCmdLine{}
 	cr.RunArvMount = am.ArvMountTest
 
+	if extraMounts != nil && len(extraMounts) > 0 {
+		err := cr.SetupArvMountPoint("keep")
+		c.Check(err, IsNil)
+
+		for _, m := range extraMounts {
+			os.MkdirAll(cr.ArvMountPoint+"/by_id/"+m, os.ModePerm)
+		}
+	}
+
 	err = cr.Run()
 	c.Check(err, IsNil)
 	c.Check(api.WasSetRunning, Equals, true)
@@ -560,7 +569,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
 		t.logWriter.Write(dockerLog(1, "hello world\n"))
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{}
@@ -582,7 +591,7 @@ func (s *TestSuite) TestCrunchstat(c *C) {
 		"output_path": "/tmp",
 		"priority": 1,
 		"runtime_constraints": {}
-	}`, func(t *TestDockerClient) {
+	}`, nil, func(t *TestDockerClient) {
 		time.Sleep(time.Second)
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{}
@@ -615,7 +624,7 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
 		t.logWriter.Write(dockerLog(1, "hello\n"))
 		t.logWriter.Write(dockerLog(2, "world\n"))
 		t.logWriter.Close()
@@ -641,7 +650,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
 		t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -663,7 +672,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
 		t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -737,7 +746,7 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
 		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -805,6 +814,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
 		cr.OutputPath = "/tmp"
@@ -819,6 +829,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
 		cr.OutputPath = "/tmp"
@@ -838,6 +849,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/keeptmp": {Kind: "collection", Writable: true},
 		}
@@ -855,6 +867,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
 			"/keepout": {Kind: "collection", Writable: true},
@@ -876,6 +889,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.RuntimeConstraints.KeepCacheRAM = 512
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
@@ -905,6 +919,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		{in: map[string]int{"foo": 123}, out: `{"foo":123}`},
 	} {
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/mnt/test.json": {Kind: "json", Content: test.in},
 		}
@@ -922,6 +937,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 	// Read-only mount points are allowed underneath output_dir mount point
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/tmp":     {Kind: "tmp"},
@@ -942,6 +958,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 	// Writable mount points are not allowed underneath output_dir mount point
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/tmp":     {Kind: "tmp"},
@@ -959,6 +976,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 	// Only mount points of kind 'collection' are allowed underneath output_dir mount point
 	{
 		i = 0
+		cr.ArvMountPoint = ""
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/tmp":     {Kind: "tmp"},
@@ -986,7 +1004,7 @@ func (s *TestSuite) TestStdout(c *C) {
 		"runtime_constraints": {}
 	}`
 
-	api, _ := FullRunHelper(c, helperRecord, func(t *TestDockerClient) {
+	api, _ := FullRunHelper(c, helperRecord, nil, func(t *TestDockerClient) {
 		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -1058,7 +1076,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {"API": true}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
 		t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -1082,7 +1100,7 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {"API": true}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
 		t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
 		t.logWriter.Close()
 		t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -1092,3 +1110,149 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
 	c.Check(api.CalledWith("container.state", "Complete"), NotNil)
 	c.Check(api.CalledWith("container.output", "d4ab34d3d4f8a72f5c4973051ae69fab+122"), NotNil)
 }
+
+func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
+	helperRecord := `{
+		"command": ["/bin/sh", "-c", "echo $FROBIZ"],
+		"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+		"cwd": "/bin",
+		"environment": {"FROBIZ": "bilbo"},
+		"mounts": {
+        "/tmp": {"kind": "tmp"},
+        "/tmp/foo": {"kind": "collection",
+                     "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54",
+                     "content": {"exclude_from_output": true}
+        },
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+		"output_path": "/tmp",
+		"priority": 1,
+		"runtime_constraints": {}
+	}`
+
+	extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+	api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
+
+	c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+	c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+	c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+}
+
+func (s *TestSuite) TestStdoutWithMountPointWithNoPathUnderOutputDir(c *C) {
+	helperRecord := `{
+		"command": ["/bin/sh", "-c", "echo $FROBIZ"],
+		"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+		"cwd": "/bin",
+		"environment": {"FROBIZ": "bilbo"},
+		"mounts": {
+        "/tmp": {"kind": "tmp"},
+        "/tmp/foo": {"kind": "collection", "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+		"output_path": "/tmp",
+		"priority": 1,
+		"runtime_constraints": {}
+	}`
+
+	extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+	api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
+
+	c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+	c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+	for _, v := range api.Content {
+		if v["collection"] != nil {
+			collection := v["collection"].(arvadosclient.Dict)
+			if strings.Index(collection["name"].(string), "output") == 0 {
+				streams := strings.Split(collection["manifest_text"].(string), "\n")
+				c.Check(streams[0], Equals, "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out")
+				c.Check(streams[1], Matches, `\.\/foo.*md5sum\.txt`)
+			}
+		}
+	}
+}
+
+func (s *TestSuite) TestStdoutWithMountPointForFileUnderOutputDir(c *C) {
+	helperRecord := `{
+		"command": ["/bin/sh", "-c", "echo $FROBIZ"],
+		"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+		"cwd": "/bin",
+		"environment": {"FROBIZ": "bilbo"},
+		"mounts": {
+        "/tmp": {"kind": "tmp"},
+        "/tmp/foo": {"kind": "collection", "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54/md5sum.txt"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+		"output_path": "/tmp",
+		"priority": 1,
+		"runtime_constraints": {}
+	}`
+
+	extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+	api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
+
+	c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+	c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+	for _, v := range api.Content {
+		if v["collection"] != nil {
+			collection := v["collection"].(arvadosclient.Dict)
+			if strings.Index(collection["name"].(string), "output") == 0 {
+				streams := strings.Split(collection["manifest_text"].(string), "\n")
+				c.Check(streams[0], Equals, "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out")
+				c.Check(streams[1], Matches, `\.\/foo.*md5sum\.txt`)
+			}
+		}
+	}
+}
+
+func (s *TestSuite) TestStdoutWithMountPointForFileAsPathUnderOutputDir(c *C) {
+	helperRecord := `{
+		"command": ["/bin/sh", "-c", "echo $FROBIZ"],
+		"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+		"cwd": "/bin",
+		"environment": {"FROBIZ": "bilbo"},
+		"mounts": {
+        "/tmp": {"kind": "tmp"},
+        "/tmp/foo": {"kind": "collection", "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54", "path":"/md5sum.txt"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+		"output_path": "/tmp",
+		"priority": 1,
+		"runtime_constraints": {}
+	}`
+
+	extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+	api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
+
+	c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+	c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+	for _, v := range api.Content {
+		if v["collection"] != nil {
+			collection := v["collection"].(arvadosclient.Dict)
+			if strings.Index(collection["name"].(string), "output") == 0 {
+				streams := strings.Split(collection["manifest_text"].(string), "\n")
+				c.Check(streams[0], Equals, "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out")
+				c.Check(streams[1], Matches, `\.\/foo.*md5sum\.txt`)
+			}
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list