[ARVADOS] created: 42e6998c32f8d5553133e4b0b3ea02cd0c6f5554
Git user
git at public.curoverse.com
Wed Apr 5 15:57:04 EDT 2017
at 42e6998c32f8d5553133e4b0b3ea02cd0c6f5554 (commit)
commit 42e6998c32f8d5553133e4b0b3ea02cd0c6f5554
Author: radhika <radhika at curoverse.com>
Date: Tue Apr 4 16:48:12 2017 -0400
8465: added stdin redirection for collection
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index a05f61a..52829be 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -49,6 +49,7 @@ var ErrCancelled = errors.New("Cancelled")
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+ CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error)
}
// NewLogWriter is a factory function to create a new log writer.
@@ -361,6 +362,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
}
}
+ if bind == "stdin" {
+ // Is it a "collection" mount kind?
+ if mnt.Kind != "collection" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' is supported.", mnt.Kind)
+ }
+ }
+
if bind == "/etc/arvados/ca-certificates.crt" {
needCertMount = false
}
@@ -663,8 +671,32 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
+ // If stdin mount is provided, attach it to the docker container
+ var stdinUsed bool
+ var stdinRdr keepclient.Reader
+ if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+ var stdinColl arvados.Collection
+ collId := stdinMnt.UUID
+ if collId == "" {
+ collId = stdinMnt.PortableDataHash
+ }
+ err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+ if err != nil {
+ return fmt.Errorf("While getting stding collection: %v", err)
+ }
+
+ stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+ if os.IsNotExist(err) {
+ return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+ } else if err != nil {
+ return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+ }
+ stdinUsed = true
+ defer stdinRdr.Close()
+ }
+
response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+ dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
@@ -698,6 +730,21 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
}
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ if stdinUsed {
+ copyErrC := make(chan error)
+ go func() {
+ n, err := io.Copy(response.Conn, stdinRdr)
+ runner.CrunchLog.Printf("BYTES READ = %v", n)
+ copyErrC <- err
+ close(copyErrC)
+ }()
+
+ copyErr := <-copyErrC
+ if copyErr != nil {
+ return fmt.Errorf("While writing stdin to docker container %q", copyErr)
+ }
+ }
+
go runner.ProcessDockerAttach(response.Reader)
return nil
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 98462f8..a65d366 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -290,6 +290,15 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
return nil, nil
}
+func (client *KeepTestClient) CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) {
+ if filename == "/file1_in_main.txt" {
+ rdr := ioutil.NopCloser(&bytes.Buffer{})
+ client.Called = true
+ return FileWrapper{rdr, 1321984}, nil
+ }
+ return nil, nil
+}
+
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
docker := NewTestDockerClient(0)
@@ -369,6 +378,10 @@ func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename stri
return nil, errors.New("KeepError")
}
+func (KeepErrorTestClient) CollectionFileReader(c map[string]interface{}, f string) (keepclient.Reader, error) {
+ return nil, errors.New("KeepError")
+}
+
type KeepReadErrorTestClient struct{}
func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
@@ -397,6 +410,10 @@ func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename
return ErrorReader{}, nil
}
+func (KeepReadErrorTestClient) CollectionFileReader(c map[string]interface{}, f string) (keepclient.Reader, error) {
+ return ErrorReader{}, nil
+}
+
func (s *TestSuite) TestLoadImageArvError(c *C) {
// (1) Arvados error
cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
@@ -1113,6 +1130,22 @@ func (s *TestSuite) TestSetupMounts(c *C) {
cr.CleanupDirs()
checkEmpty()
}
+
+ // Only mount point of kind 'collection' is allowed for stdin
+ {
+ i = 0
+ cr.ArvMountPoint = ""
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts = map[string]arvados.Mount{
+ "stdin": {Kind: "tmp"},
+ }
+
+ err := cr.SetupMounts()
+ c.Check(err, NotNil)
+ c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin. Only 'collection' is supported.*`)
+ cr.CleanupDirs()
+ checkEmpty()
+ }
}
func (s *TestSuite) TestStdout(c *C) {
@@ -1359,3 +1392,43 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
}
}
}
+
+func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ extraMounts := []string{
+ "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
+ }
+
+ api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ manifest := collection["manifest_text"].(string)
+
+ c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+ }
+ }
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list