[ARVADOS] updated: c3ae1e2f54d4199a9521bf3d4d515bcbb0711989
Git user
git at public.curoverse.com
Wed Apr 5 19:54:02 EDT 2017
Summary of changes:
services/crunch-run/crunchrun.go | 67 +++++++++++++++++++++++------------
services/crunch-run/crunchrun_test.go | 58 +++++++++++++++++++++++++++---
2 files changed, 98 insertions(+), 27 deletions(-)
via c3ae1e2f54d4199a9521bf3d4d515bcbb0711989 (commit)
from 42e6998c32f8d5553133e4b0b3ea02cd0c6f5554 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c3ae1e2f54d4199a9521bf3d4d515bcbb0711989
Author: radhika <radhika at curoverse.com>
Date: Wed Apr 5 19:53:28 2017 -0400
8465: added stdin redirection for json mount.
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 52829be..f22680f 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1,6 +1,7 @@
package main
import (
+ "bytes"
"context"
"encoding/json"
"errors"
@@ -364,8 +365,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
if bind == "stdin" {
// Is it a "collection" mount kind?
- if mnt.Kind != "collection" {
- return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' is supported.", mnt.Kind)
+ if mnt.Kind != "collection" && mnt.Kind != "json" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
}
}
@@ -672,29 +673,37 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
// If stdin mount is provided, attach it to the docker container
- var stdinUsed bool
var stdinRdr keepclient.Reader
+ var stdinJson []byte
if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
- var stdinColl arvados.Collection
- collId := stdinMnt.UUID
- if collId == "" {
- collId = stdinMnt.PortableDataHash
- }
- err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
- if err != nil {
- return fmt.Errorf("While getting stding collection: %v", err)
- }
+ if stdinMnt.Kind == "collection" {
+ var stdinColl arvados.Collection
+ collId := stdinMnt.UUID
+ if collId == "" {
+ collId = stdinMnt.PortableDataHash
+ }
+ err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+ if err != nil {
+ return fmt.Errorf("While getting stding collection: %v", err)
+ }
- stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
- if os.IsNotExist(err) {
- return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
- } else if err != nil {
- return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+ stdinRdr, err = runner.Kc.CollectionFileReader(map[string]interface{}{"manifest_text": stdinColl.ManifestText}, stdinMnt.Path)
+ if os.IsNotExist(err) {
+ return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+ } else if err != nil {
+ return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+ }
+
+ defer stdinRdr.Close()
+ } else if stdinMnt.Kind == "json" {
+ stdinJson, err = json.Marshal(stdinMnt.Content)
+ if err != nil {
+ return fmt.Errorf("While encoding stdin json data: %v", err)
+ }
}
- stdinUsed = true
- defer stdinRdr.Close()
}
+ stdinUsed := stdinRdr != nil || len(stdinJson) != 0
response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
@@ -730,18 +739,30 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
}
runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
- if stdinUsed {
+ if stdinRdr != nil {
+ copyErrC := make(chan error)
+ go func() {
+ _, err := io.Copy(response.Conn, stdinRdr)
+ copyErrC <- err
+ close(copyErrC)
+ }()
+
+ copyErr := <-copyErrC
+ if copyErr != nil {
+ return fmt.Errorf("While writing stdin collection to docker container %q", copyErr)
+ }
+ } else if len(stdinJson) != 0 {
copyErrC := make(chan error)
go func() {
- n, err := io.Copy(response.Conn, stdinRdr)
- runner.CrunchLog.Printf("BYTES READ = %v", n)
+ jsonRdr := bytes.NewReader(stdinJson)
+ _, err := io.Copy(response.Conn, jsonRdr)
copyErrC <- err
close(copyErrC)
}()
copyErr := <-copyErrC
if copyErr != nil {
- return fmt.Errorf("While writing stdin to docker container %q", copyErr)
+ return fmt.Errorf("While writing stdin json to docker container %q", copyErr)
}
}
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index a65d366..8852bff 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net"
"os"
"os/exec"
"path/filepath"
@@ -94,8 +95,21 @@ func NewTestDockerClient(exitCode int) *TestDockerClient {
return t
}
+type MockConn struct {
+ net.Conn
+}
+
+func (m *MockConn) Write(b []byte) (int, error) {
+ return len(b), nil
+}
+
+func NewMockConn() *MockConn {
+ c := &MockConn{}
+ return c
+}
+
func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
- return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil
+ return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
}
func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
@@ -292,9 +306,9 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
func (client *KeepTestClient) CollectionFileReader(collection map[string]interface{}, filename string) (keepclient.Reader, error) {
if filename == "/file1_in_main.txt" {
- rdr := ioutil.NopCloser(&bytes.Buffer{})
+ rdr := ioutil.NopCloser(strings.NewReader("foo")) // ioutil.NopCloser(&bytes.Buffer{})
client.Called = true
- return FileWrapper{rdr, 1321984}, nil
+ return FileWrapper{rdr, 3}, nil
}
return nil, nil
}
@@ -1142,7 +1156,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
err := cr.SetupMounts()
c.Check(err, NotNil)
- c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin. Only 'collection' is supported.*`)
+ c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
cr.CleanupDirs()
checkEmpty()
}
@@ -1432,3 +1446,39 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
}
}
}
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "stdin": {"kind": "json", "content": "foo"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ manifest := collection["manifest_text"].(string)
+
+ c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+ }
+ }
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list