[ARVADOS] updated: c3ae1e2f54d4199a9521bf3d4d515bcbb0711989

Git user git at public.curoverse.com
Wed Apr 5 19:54:02 EDT 2017


Summary of changes:
 services/crunch-run/crunchrun.go      | 67 +++++++++++++++++++++++------------
 services/crunch-run/crunchrun_test.go | 58 +++++++++++++++++++++++++++---
 2 files changed, 98 insertions(+), 27 deletions(-)

       via  c3ae1e2f54d4199a9521bf3d4d515bcbb0711989 (commit)
      from  42e6998c32f8d5553133e4b0b3ea02cd0c6f5554 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c3ae1e2f54d4199a9521bf3d4d515bcbb0711989
Author: radhika <radhika at curoverse.com>
Date:   Wed Apr 5 19:53:28 2017 -0400

    8465: added stdin redirection for json mount.

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 52829be..f22680f 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"bytes"
 	"context"
 	"encoding/json"
 	"errors"
@@ -364,8 +365,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
 		if bind == "stdin" {
 			// Is it a "collection" mount kind?
-			if mnt.Kind != "collection" {
-				return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' is supported.", mnt.Kind)
+			if mnt.Kind != "collection" && mnt.Kind != "json" {
+				return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
 			}
 		}
 
@@ -672,29 +673,37 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 	runner.CrunchLog.Print("Attaching container streams")
 
 	// If stdin mount is provided, attach it to the docker container
-	var stdinUsed bool
 	var stdinRdr keepclient.Reader
+	var stdinJson []byte
 	if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
-		var stdinColl arvados.Collection
-		collId := stdinMnt.UUID
-		if collId == "" {
-			collId = stdinMnt.PortableDataHash
-		}
-		err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
-		if err != nil {
-			return fmt.Errorf("While getting stding collection: %v", err)
-		}
+		if stdinMnt.Kind == "collection" {
+			var stdinColl arvados.Collection
+			collId := stdinMnt.UUID
+			if collId == "" {
+				collId = stdinMnt.PortableDataHash
+			}
+			err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+			if err != nil {
+				return fmt.Errorf("While getting stding collection: %v", err)
+			}
 
-		stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
-		if os.IsNotExist(err) {
-			return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
-		} else if err != nil {
-			return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+			stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+			if os.IsNotExist(err) {
+				return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+			} else if err != nil {
+				return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+			}
+
+			defer stdinRdr.Close()
+		} else if stdinMnt.Kind == "json" {
+			stdinJson, err = json.Marshal(stdinMnt.Content)
+			if err != nil {
+				return fmt.Errorf("While encoding stdin json data: %v", err)
+			}
 		}
-		stdinUsed = true
-		defer stdinRdr.Close()
 	}
 
+	stdinUsed := stdinRdr != nil || len(stdinJson) != 0
 	response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
 		dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
 	if err != nil {
@@ -730,18 +739,30 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
 	}
 	runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
-	if stdinUsed {
+	if stdinRdr != nil {
+		copyErrC := make(chan error)
+		go func() {
+			_, err := io.Copy(response.Conn, stdinRdr)
+			copyErrC <- err
+			close(copyErrC)
+		}()
+
+		copyErr := <-copyErrC
+		if copyErr != nil {
+			return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
+		}
+	} else if len(stdinJson) != 0 {
 		copyErrC := make(chan error)
 		go func() {
-			n, err := io.Copy(response.Conn, stdinRdr)
-			runner.CrunchLog.Printf("BYTES READ = %v", n)
+			jsonRdr := bytes.NewReader(stdinJson)
+			_, err := io.Copy(response.Conn, jsonRdr)
 			copyErrC <- err
 			close(copyErrC)
 		}()
 
 		copyErr := <-copyErrC
 		if copyErr != nil {
-			return fmt.Errorf("While writing stdin to docker container %q", copyErr)
+			return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
 		}
 	}
 
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index a65d366..8852bff 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -10,6 +10,7 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"net"
 	"os"
 	"os/exec"
 	"path/filepath"
@@ -94,8 +95,21 @@ func NewTestDockerClient(exitCode int) *TestDockerClient {
 	return t
 }
 
+type MockConn struct {
+	net.Conn
+}
+
+func (m *MockConn) Write(b []byte) (int, error) {
+	return len(b), nil
+}
+
+func NewMockConn() *MockConn {
+	c := &MockConn{}
+	return c
+}
+
 func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-	return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil
+	return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
 }
 
 func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
@@ -292,9 +306,9 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 
 func (client *KeepTestClient) CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) {
 	if filename == "/file1_in_main.txt" {
-		rdr := ioutil.NopCloser(&bytes.Buffer{})
+		rdr := ioutil.NopCloser(strings.NewReader("foo")) // ioutil.NopCloser(&bytes.Buffer{})
 		client.Called = true
-		return FileWrapper{rdr, 1321984}, nil
+		return FileWrapper{rdr, 3}, nil
 	}
 	return nil, nil
 }
@@ -1142,7 +1156,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 
 		err := cr.SetupMounts()
 		c.Check(err, NotNil)
-		c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin. Only 'collection' is supported.*`)
+		c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
 		cr.CleanupDirs()
 		checkEmpty()
 	}
@@ -1432,3 +1446,39 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
 		}
 	}
 }
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+	helperRecord := `{
+		"command": ["/bin/sh", "-c", "echo $FROBIZ"],
+		"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+		"cwd": "/bin",
+		"environment": {"FROBIZ": "bilbo"},
+		"mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "json", "content": "foo"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+		"output_path": "/tmp",
+		"priority": 1,
+		"runtime_constraints": {}
+	}`
+
+	api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+		t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+		t.logWriter.Close()
+	})
+
+	c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+	c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+	for _, v := range api.Content {
+		if v["collection"] != nil {
+			collection := v["collection"].(arvadosclient.Dict)
+			if strings.Index(collection["name"].(string), "output") == 0 {
+				manifest := collection["manifest_text"].(string)
+
+				c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+			}
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list