[ARVADOS] created: 42e6998c32f8d5553133e4b0b3ea02cd0c6f5554

Git user git at public.curoverse.com
Wed Apr 5 15:57:04 EDT 2017


        at  42e6998c32f8d5553133e4b0b3ea02cd0c6f5554 (commit)


commit 42e6998c32f8d5553133e4b0b3ea02cd0c6f5554
Author: radhika <radhika at curoverse.com>
Date:   Tue Apr 4 16:48:12 2017 -0400

    8465: added stdin redirection for collection

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index a05f61a..52829be 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -49,6 +49,7 @@ var ErrCancelled = errors.New("Cancelled")
 type IKeepClient interface {
 	PutHB(hash string, buf []byte) (string, int, error)
 	ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
+	CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -361,6 +362,13 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 			}
 		}
 
+		if bind == "stdin" {
+			// Is it a "collection" mount kind?
+			if mnt.Kind != "collection" {
+				return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' is supported.", mnt.Kind)
+			}
+		}
+
 		if bind == "/etc/arvados/ca-certificates.crt" {
 			needCertMount = false
 		}
@@ -663,8 +671,32 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 
 	runner.CrunchLog.Print("Attaching container streams")
 
+	// If stdin mount is provided, attach it to the docker container
+	var stdinUsed bool
+	var stdinRdr keepclient.Reader
+	if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+		var stdinColl arvados.Collection
+		collId := stdinMnt.UUID
+		if collId == "" {
+			collId = stdinMnt.PortableDataHash
+		}
+		err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+		if err != nil {
+			return fmt.Errorf("While getting stding collection: %v", err)
+		}
+
+		stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+		if os.IsNotExist(err) {
+			return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+		} else if err != nil {
+			return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+		}
+		stdinUsed = true
+		defer stdinRdr.Close()
+	}
+
 	response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-		dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+		dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
 	if err != nil {
 		return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
 	}
@@ -698,6 +730,21 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 	}
 	runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
+	if stdinUsed {
+		copyErrC := make(chan error)
+		go func() {
+			n, err := io.Copy(response.Conn, stdinRdr)
+			runner.CrunchLog.Printf("BYTES READ = %v", n)
+			copyErrC <- err
+			close(copyErrC)
+		}()
+
+		copyErr := <-copyErrC
+		if copyErr != nil {
+			return fmt.Errorf("While writing stdin to docker container %q", copyErr)
+		}
+	}
+
 	go runner.ProcessDockerAttach(response.Reader)
 
 	return nil
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 98462f8..a65d366 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -290,6 +290,15 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 	return nil, nil
 }
 
+func (client *KeepTestClient) CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) {
+	if filename == "/file1_in_main.txt" {
+		rdr := ioutil.NopCloser(&bytes.Buffer{})
+		client.Called = true
+		return FileWrapper{rdr, 1321984}, nil
+	}
+	return nil, nil
+}
+
 func (s *TestSuite) TestLoadImage(c *C) {
 	kc := &KeepTestClient{}
 	docker := NewTestDockerClient(0)
@@ -369,6 +378,10 @@ func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename stri
 	return nil, errors.New("KeepError")
 }
 
+func (KeepErrorTestClient) CollectionFileReader(c map[string]interface{}, f string) (keepclient.Reader, error) {
+	return nil, errors.New("KeepError")
+}
+
 type KeepReadErrorTestClient struct{}
 
 func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
@@ -397,6 +410,10 @@ func (KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename
 	return ErrorReader{}, nil
 }
 
+func (KeepReadErrorTestClient) CollectionFileReader(c map[string]interface{}, f string) (keepclient.Reader, error) {
+	return ErrorReader{}, nil
+}
+
 func (s *TestSuite) TestLoadImageArvError(c *C) {
 	// (1) Arvados error
 	cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
@@ -1113,6 +1130,22 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		cr.CleanupDirs()
 		checkEmpty()
 	}
+
+	// Only mount point of kind 'collection' is allowed for stdin
+	{
+		i = 0
+		cr.ArvMountPoint = ""
+		cr.Container.Mounts = make(map[string]arvados.Mount)
+		cr.Container.Mounts = map[string]arvados.Mount{
+			"stdin": {Kind: "tmp"},
+		}
+
+		err := cr.SetupMounts()
+		c.Check(err, NotNil)
+		c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin. Only 'collection' is supported.*`)
+		cr.CleanupDirs()
+		checkEmpty()
+	}
 }
 
 func (s *TestSuite) TestStdout(c *C) {
@@ -1359,3 +1392,43 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
 		}
 	}
 }
+
+func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
+	helperRecord := `{
+		"command": ["/bin/sh", "-c", "echo $FROBIZ"],
+		"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+		"cwd": "/bin",
+		"environment": {"FROBIZ": "bilbo"},
+		"mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+		"output_path": "/tmp",
+		"priority": 1,
+		"runtime_constraints": {}
+	}`
+
+	extraMounts := []string{
+		"b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
+	}
+
+	api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
+	})
+
+	c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+	c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+	for _, v := range api.Content {
+		if v["collection"] != nil {
+			collection := v["collection"].(arvadosclient.Dict)
+			if strings.Index(collection["name"].(string), "output") == 0 {
+				manifest := collection["manifest_text"].(string)
+
+				c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+			}
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list