[ARVADOS] updated: 6b91e8201f24d2b4126532d809abba42a5ab959c
Git user
git at public.curoverse.com
Thu Jan 26 17:57:20 EST 2017
Summary of changes:
services/crunch-run/crunchrun.go | 133 +++++++++++++++++++++++-
services/crunch-run/crunchrun_test.go | 184 ++++++++++++++++++++++++++++++++--
2 files changed, 306 insertions(+), 11 deletions(-)
via 6b91e8201f24d2b4126532d809abba42a5ab959c (commit)
from fbacdaec3dbba425155ab6348c7e6b80ff4e710b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 6b91e8201f24d2b4126532d809abba42a5ab959c
Author: radhika <radhika at curoverse.com>
Date: Sun Jan 22 05:50:55 2017 -0500
9397: pre-populate output with mounts under output_dir (wip)
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index dfe2dfa..f2b4277 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -242,8 +242,15 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
var tmpBackedOutputDir = false
+func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
+ if runner.ArvMountPoint == "" {
+ runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+ }
+ return
+}
+
func (runner *ContainerRunner) SetupMounts() (err error) {
- runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+ err = runner.SetupArvMountPoint("keep")
if err != nil {
return fmt.Errorf("While creating keep mount temp dir: %v", err)
}
@@ -653,6 +660,57 @@ func (runner *ContainerRunner) CaptureOutput() error {
manifestText = rec.ManifestText
}
+ // Pre-populate output from the configured mount points
+ var binds []string
+ for bind, _ := range runner.Container.Mounts {
+ binds = append(binds, bind)
+ }
+ sort.Strings(binds)
+
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+
+ bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
+
+ if bindSuffix == bind || len(bindSuffix) <= 0 {
+ // either doesn't start with OutputPath or is OutputPath itself
+ continue
+ }
+
+ if strings.Index(bindSuffix, "/") != 0 {
+ return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind)
+ }
+
+ jsondata, err := json.Marshal(mnt.Content)
+ if err != nil {
+ return fmt.Errorf("While marshal of mount content: %v", err)
+ }
+ var content map[string]interface{}
+ err = json.Unmarshal(jsondata, &content)
+ if err != nil {
+ return fmt.Errorf("While unmarshal of mount content: %v", err)
+ }
+
+ if content["exclude_from_output"] == true {
+ continue
+ }
+
+ idx := strings.Index(mnt.PortableDataHash, "/")
+ if idx > 0 {
+ mnt.Path = mnt.PortableDataHash[idx:]
+ mnt.PortableDataHash = mnt.PortableDataHash[0 : idx-1]
+ }
+
+ // append to manifest_text
+ m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
+ if err != nil {
+ return err
+ }
+
+ manifestText = manifestText + m
+ }
+
+ // Save output
var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
@@ -668,6 +726,79 @@ func (runner *ContainerRunner) CaptureOutput() error {
return nil
}
+// Fetch the collection for the mnt.PortableDataHash
+// Return the manifest_text fragment corresponding to the specified mnt.Path
+// after making any required updates.
+// Ex:
+// If mnt.Path is not speficied,
+// return the entire manifest_text after replacing any "." with bindSuffix
+// If mnt.Path corresponds to one stream,
+// return the manitest_text for that stream after replacing that stream name with bindSuffix
+// Otherwise, check if a filename in any one stream is being sought. Return the manitest_text
+// for that stream after replacing that stream name and file name using bindSuffix components.
+func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
+ var collection arvados.Collection
+ err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+ if err != nil {
+ return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
+ }
+
+ manifestText := ""
+ if mnt.Path == "" {
+ // no path specified; return the entire manifest text
+ manifestText = collection.ManifestText
+ manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
+ manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
+ } else {
+ // either a stream or file from a stream is being sought
+ bindIdx := strings.LastIndex(bindSuffix, "/")
+ var bindSubdir, bindFileName string
+ if bindIdx >= 0 {
+ bindSubdir = bindSuffix[0:bindIdx]
+ bindFileName = bindSuffix[bindIdx+1:]
+ }
+ pathIdx := strings.LastIndex(mnt.Path, "/")
+ var pathSubdir, pathFileName string
+ if pathIdx >= 0 {
+ pathSubdir = mnt.Path[0:pathIdx]
+ pathFileName = mnt.Path[pathIdx+1:]
+ }
+ streams := strings.Split(collection.ManifestText, "\n")
+ for _, stream := range streams {
+ tokens := strings.Split(stream, " ")
+ if tokens[0] == "."+mnt.Path {
+ // path refers to this stream
+ adjustedStream := strings.Replace(stream, mnt.Path, bindSuffix, -1)
+ manifestText = adjustedStream + "\n"
+ break
+ } else {
+ // look for a matching file in this stream
+ if tokens[0] == "."+pathSubdir {
+ // path refers to a file in this stream
+ for _, token := range tokens {
+ if strings.Index(token, ":"+pathFileName) > 0 {
+ // found the file in the stream; discard all other file tokens
+ for _, t := range tokens {
+ if strings.Index(t, ":") == 0 {
+ manifestText = " " + t
+ } else {
+ break // done reading all non-file tokens
+ }
+ token = strings.Replace(token, pathFileName, bindFileName, -1)
+ manifestText = manifestText + token + "\n"
+ manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
+ break
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return manifestText, nil
+}
+
func (runner *ContainerRunner) loadDiscoveryVars() {
tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
if err != nil {
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 1cbaa6c..b96b7c7 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -518,7 +518,7 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
+func FullRunHelper(c *C, record string, extraMounts []string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
@@ -534,6 +534,15 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
+ if extraMounts != nil && len(extraMounts) > 0 {
+ err := cr.SetupArvMountPoint("keep")
+ c.Check(err, IsNil)
+
+ for _, m := range extraMounts {
+ os.MkdirAll(cr.ArvMountPoint+"/by_id/"+m, os.ModePerm)
+ }
+ }
+
err = cr.Run()
c.Check(err, IsNil)
c.Check(api.WasSetRunning, Equals, true)
@@ -560,7 +569,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "hello world\n"))
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{}
@@ -582,7 +591,7 @@ func (s *TestSuite) TestCrunchstat(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
- }`, func(t *TestDockerClient) {
+ }`, nil, func(t *TestDockerClient) {
time.Sleep(time.Second)
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{}
@@ -615,7 +624,7 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "hello\n"))
t.logWriter.Write(dockerLog(2, "world\n"))
t.logWriter.Close()
@@ -641,7 +650,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -663,7 +672,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -737,7 +746,7 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -805,6 +814,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
cr.OutputPath = "/tmp"
@@ -819,6 +829,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
cr.OutputPath = "/tmp"
@@ -838,6 +849,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = map[string]arvados.Mount{
"/keeptmp": {Kind: "collection", Writable: true},
}
@@ -855,6 +867,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = map[string]arvados.Mount{
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
"/keepout": {Kind: "collection", Writable: true},
@@ -876,6 +889,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.RuntimeConstraints.KeepCacheRAM = 512
cr.Container.Mounts = map[string]arvados.Mount{
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
@@ -905,6 +919,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{in: map[string]int{"foo": 123}, out: `{"foo":123}`},
} {
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = map[string]arvados.Mount{
"/mnt/test.json": {Kind: "json", Content: test.in},
}
@@ -922,6 +937,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
// Read-only mount points are allowed underneath output_dir mount point
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
"/tmp": {Kind: "tmp"},
@@ -942,6 +958,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
// Writable mount points are not allowed underneath output_dir mount point
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
"/tmp": {Kind: "tmp"},
@@ -959,6 +976,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
// Only mount points of kind 'collection' are allowed underneath output_dir mount point
{
i = 0
+ cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
"/tmp": {Kind: "tmp"},
@@ -986,7 +1004,7 @@ func (s *TestSuite) TestStdout(c *C) {
"runtime_constraints": {}
}`
- api, _ := FullRunHelper(c, helperRecord, func(t *TestDockerClient) {
+ api, _ := FullRunHelper(c, helperRecord, nil, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -1058,7 +1076,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {"API": true}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -1082,7 +1100,7 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {"API": true}
-}`, func(t *TestDockerClient) {
+}`, nil, func(t *TestDockerClient) {
t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
@@ -1092,3 +1110,149 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
c.Check(api.CalledWith("container.output", "d4ab34d3d4f8a72f5c4973051ae69fab+122"), NotNil)
}
+
+func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "/tmp/foo": {"kind": "collection",
+ "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54",
+ "content": {"exclude_from_output": true}
+ },
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+ api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
+}
+
+func (s *TestSuite) TestStdoutWithMountPointWithNoPathUnderOutputDir(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "/tmp/foo": {"kind": "collection", "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+ api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ streams := strings.Split(collection["manifest_text"].(string), "\n")
+ c.Check(streams[0], Equals, "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out")
+ c.Check(streams[1], Matches, `\.\/foo.*md5sum\.txt`)
+ }
+ }
+ }
+}
+
+func (s *TestSuite) TestStdoutWithMountPointForFileUnderOutputDir(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "/tmp/foo": {"kind": "collection", "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54/md5sum.txt"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+ api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ streams := strings.Split(collection["manifest_text"].(string), "\n")
+ c.Check(streams[0], Equals, "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out")
+ c.Check(streams[1], Matches, `\.\/foo.*md5sum\.txt`)
+ }
+ }
+ }
+}
+
+func (s *TestSuite) TestStdoutWithMountPointForFileAsPathUnderOutputDir(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "/tmp/foo": {"kind": "collection", "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54", "path":"/md5sum.txt"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
+
+ api, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ streams := strings.Split(collection["manifest_text"].(string), "\n")
+ c.Check(streams[0], Equals, "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out")
+ c.Check(streams[1], Matches, `\.\/foo.*md5sum\.txt`)
+ }
+ }
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list