[ARVADOS] created: f7115688cc3a63b4721ca59259bc8276685bac9a
git at public.curoverse.com
git at public.curoverse.com
Wed Jan 6 16:39:08 EST 2016
at f7115688cc3a63b4721ca59259bc8276685bac9a (commit)
commit f7115688cc3a63b4721ca59259bc8276685bac9a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 6 16:38:58 2016 -0500
7816: Converted testing to use stubbed out ThinDockerClient instead of real Docker.
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index d24073b..9e6a12f 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -48,8 +48,19 @@ type ContainerRecord struct {
type NewLogWriter func(name string) io.WriteCloser
+type ThinDockerClient interface {
+ StopContainer(id string, timeout int) error
+ InspectImage(id string) (*dockerclient.ImageInfo, error)
+ LoadImage(reader io.Reader) error
+ CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
+ StartContainer(id string, config *dockerclient.HostConfig) error
+ ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
+ Wait(id string) <-chan dockerclient.WaitResult
+ RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+}
+
type ContainerRunner struct {
- Docker *dockerclient.DockerClient
+ Docker ThinDockerClient
Api IArvadosClient
Kc IKeepClient
ContainerRecord
@@ -348,7 +359,7 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
func NewContainerRunner(api IArvadosClient,
kc IKeepClient,
- docker *dockerclient.DockerClient) *ContainerRunner {
+ docker ThinDockerClient) *ContainerRunner {
cr := &ContainerRunner{Api: api, Kc: kc, Docker: docker}
cr.NewLogWriter = cr.NewArvLogWriter
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 0c8f2c1..fcd5f80 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -12,7 +12,7 @@ import (
"github.com/curoverse/dockerclient"
. "gopkg.in/check.v1"
"io"
- "os"
+ "io/ioutil"
"strings"
"syscall"
"testing"
@@ -20,7 +20,7 @@ import (
)
// Gocheck boilerplate
-func Test(t *testing.T) {
+func TestCrunchExec(t *testing.T) {
TestingT(t)
}
@@ -45,11 +45,92 @@ type KeepTestClient struct {
var hwManifest = ". 82ab40c24fc8df01798e57ba66795bb1+841216+Aa124ac75e5168396c73c0a18eda641a4f41791c0 at 569fa8c3 0:841216:9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar\n"
var hwPDH = "a45557269dcb65a6b78f9ac061c0850b+120"
-var hwImageId = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar"
+var hwImageId = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7"
var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d at 5693216f 0:46:md5sum.txt\n"
var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
+type TestDockerClient struct {
+ imageLoaded string
+ stdoutReader io.ReadCloser
+ stderrReader io.ReadCloser
+ stdoutWriter io.WriteCloser
+ stderrWriter io.WriteCloser
+ fn func(t *TestDockerClient)
+ finish chan dockerclient.WaitResult
+ stop chan bool
+ cwd string
+ env []string
+}
+
+func NewTestDockerClient() *TestDockerClient {
+ t := &TestDockerClient{}
+ t.stdoutReader, t.stdoutWriter = io.Pipe()
+ t.stderrReader, t.stderrWriter = io.Pipe()
+ t.finish = make(chan dockerclient.WaitResult)
+ t.stop = make(chan bool)
+ t.cwd = "/"
+ return t
+}
+
+func (t *TestDockerClient) StopContainer(id string, timeout int) error {
+ t.stop <- true
+ return nil
+}
+
+func (t *TestDockerClient) InspectImage(id string) (*dockerclient.ImageInfo, error) {
+ if t.imageLoaded == id {
+ return &dockerclient.ImageInfo{}, nil
+ } else {
+ return nil, errors.New("")
+ }
+}
+
+func (t *TestDockerClient) LoadImage(reader io.Reader) error {
+ _, err := io.Copy(ioutil.Discard, reader)
+ if err != nil {
+ return err
+ } else {
+ t.imageLoaded = hwImageId
+ return nil
+ }
+}
+
+func (t *TestDockerClient) CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error) {
+ if config.WorkingDir != "" {
+ t.cwd = config.WorkingDir
+ }
+ t.env = config.Env
+ return "abcde", nil
+}
+
+func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostConfig) error {
+ if id == "abcde" {
+ go t.fn(t)
+ return nil
+ } else {
+ return errors.New("Invalid container id")
+ }
+}
+
+func (t *TestDockerClient) ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error) {
+ if options.Stdout {
+ return t.stdoutReader, nil
+ }
+ if options.Stderr {
+ return t.stderrReader, nil
+ }
+ return nil, nil
+}
+
+func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
+ return t.finish
+}
+
+func (*TestDockerClient) RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error) {
+ return nil, nil
+}
+
func (this *ArvTestClient) Create(resourceType string,
parameters arvadosclient.Dict,
output interface{}) error {
@@ -118,20 +199,20 @@ func (this FileWrapper) Len() uint64 {
}
func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
- if filename == "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar" {
- rdr, err := os.Open("fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar")
+ if filename == hwImageId+".tar" {
+ rdr := ioutil.NopCloser(&bytes.Buffer{})
this.Called = true
- return FileWrapper{rdr, 1321984}, err
+ return FileWrapper{rdr, 1321984}, nil
}
return nil, nil
}
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
- docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker := NewTestDockerClient()
cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
- _, err = cr.Docker.RemoveImage(hwImageId, true)
+ _, err := cr.Docker.RemoveImage(hwImageId, true)
_, err = cr.Docker.InspectImage(hwImageId)
c.Check(err, NotNil)
@@ -225,7 +306,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker := NewTestDockerClient()
cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker)
cr.ContainerRecord.ContainerImage = hwPDH
@@ -244,7 +325,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker := NewTestDockerClient()
cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker)
cr.ContainerRecord.ContainerImage = hwPDH
@@ -280,13 +361,19 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
}
func (s *TestSuite) TestRunContainer(c *C) {
- docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker := NewTestDockerClient()
+ docker.fn = func(t *TestDockerClient) {
+ t.stdoutWriter.Write([]byte("Hello world\n"))
+ t.stdoutWriter.Close()
+ t.stderrWriter.Close()
+ t.finish <- dockerclient.WaitResult{}
+ }
cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
cr.ContainerRecord.ContainerImage = hwPDH
-d cr.ContainerRecord.Command = []string{"./hw"}
+ cr.ContainerRecord.Command = []string{"./hw"}
err := cr.LoadImage()
c.Check(err, IsNil)
@@ -303,7 +390,7 @@ d cr.ContainerRecord.Command = []string{"./hw"}
c.Check(logs.Stderr.String(), Equals, "")
}
-func (s *LoggingTestSuite) TestCommitLogs(c *C) {
+func (s *TestSuite) TestCommitLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
@@ -321,7 +408,7 @@ func (s *LoggingTestSuite) TestCommitLogs(c *C) {
c.Check(*cr.LogsPDH, Equals, "d3a229d2fe3690c2c3e75a71a153c6a3+60")
}
-func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
+func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
@@ -333,7 +420,7 @@ func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
c.Check(api.Content["state"], Equals, "Running")
}
-func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
+func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
@@ -353,7 +440,7 @@ func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
c.Check(api.Content["state"], Equals, "Complete")
}
-func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
+func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil)
@@ -368,12 +455,13 @@ func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
c.Check(api.Content["state"], Equals, "Cancelled")
}
-func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner) {
+func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
rec := ContainerRecord{}
err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
c.Check(err, IsNil)
- docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker := NewTestDockerClient()
+ docker.fn = fn
docker.RemoveImage(hwImageId, true)
api = &ArvTestClient{ContainerRecord: rec}
@@ -395,7 +483,7 @@ func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner
return
}
-func (s *LoggingTestSuite) TestFullRunHello(c *C) {
+func (s *TestSuite) TestFullRunHello(c *C) {
api, _ := FullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -405,7 +493,12 @@ func (s *LoggingTestSuite) TestFullRunHello(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+ t.stdoutWriter.Write([]byte("hello world\n"))
+ t.stdoutWriter.Close()
+ t.stderrWriter.Close()
+ t.finish <- dockerclient.WaitResult{}
+ })
c.Check(api.Content["exit_code"], Equals, 0)
c.Check(api.Content["state"], Equals, "Complete")
@@ -414,7 +507,7 @@ func (s *LoggingTestSuite) TestFullRunHello(c *C) {
}
-func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
+func (s *TestSuite) TestFullRunStderr(c *C) {
api, _ := FullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -424,7 +517,13 @@ func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+ t.stdoutWriter.Write([]byte("hello\n"))
+ t.stderrWriter.Write([]byte("world\n"))
+ t.stdoutWriter.Close()
+ t.stderrWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 1}
+ })
c.Check(api.Content["log"], NotNil)
c.Check(api.Content["exit_code"], Equals, 1)
@@ -434,7 +533,7 @@ func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
}
-func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
+func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
api, _ := FullRunHelper(c, `{
"command": ["pwd"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -444,7 +543,12 @@ func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+ t.stdoutWriter.Write([]byte(t.cwd + "\n"))
+ t.stdoutWriter.Close()
+ t.stderrWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
c.Check(api.Content["exit_code"], Equals, 0)
c.Check(api.Content["state"], Equals, "Complete")
@@ -452,7 +556,7 @@ func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
}
-func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
+func (s *TestSuite) TestFullRunSetCwd(c *C) {
api, _ := FullRunHelper(c, `{
"command": ["pwd"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -462,7 +566,12 @@ func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+ t.stdoutWriter.Write([]byte(t.cwd + "\n"))
+ t.stdoutWriter.Close()
+ t.stderrWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
c.Check(api.Content["exit_code"], Equals, 0)
c.Check(api.Content["state"], Equals, "Complete")
@@ -470,7 +579,7 @@ func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
}
-func (s *LoggingTestSuite) TestCancel(c *C) {
+func (s *TestSuite) TestCancel(c *C) {
record := `{
"command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -486,7 +595,14 @@ func (s *LoggingTestSuite) TestCancel(c *C) {
err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
c.Check(err, IsNil)
- docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker := NewTestDockerClient()
+ docker.fn = func(t *TestDockerClient) {
+ <-t.stop
+ t.stdoutWriter.Write([]byte("foo\n"))
+ t.stdoutWriter.Close()
+ t.stderrWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ }
docker.RemoveImage(hwImageId, true)
api := &ArvTestClient{ContainerRecord: rec}
@@ -518,7 +634,7 @@ func (s *LoggingTestSuite) TestCancel(c *C) {
}
-func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
+func (s *TestSuite) TestFullRunSetEnv(c *C) {
api, _ := FullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $FROBIZ"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -528,7 +644,12 @@ func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+ t.stdoutWriter.Write([]byte(t.env[0][7:] + "\n"))
+ t.stdoutWriter.Close()
+ t.stderrWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
c.Check(api.Content["exit_code"], Equals, 0)
c.Check(api.Content["state"], Equals, "Complete")
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
index 27d066a..c86d918 100644
--- a/services/crunch-exec/logging_test.go
+++ b/services/crunch-exec/logging_test.go
@@ -3,15 +3,9 @@ package main
import (
"fmt"
. "gopkg.in/check.v1"
- "testing"
"time"
)
-// Gocheck boilerplate
-func Test2(t *testing.T) {
- TestingT(t)
-}
-
type LoggingTestSuite struct{}
type TestTimestamper struct {
commit c152ad446aaa61efde38cba91307cc8c29c4667f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 6 10:52:07 2016 -0500
7816: Crunch2 executor prototype work in progress.
diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
new file mode 100644
index 0000000..d24073b
--- /dev/null
+++ b/services/crunch-exec/crunchexec.go
@@ -0,0 +1,389 @@
+package main
+
+import (
+ "errors"
+ "flag"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "github.com/curoverse/dockerclient"
+ "io"
+ "log"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+)
+
+type IArvadosClient interface {
+ Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+ Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
+ Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
+}
+
+type IKeepClient interface {
+ PutHB(hash string, buf []byte) (string, int, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+}
+
+type Mount struct{}
+
+type Collection struct {
+ ManifestText string `json:"manifest_text"`
+}
+
+type ContainerRecord struct {
+ Uuid string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ Mounts map[string]Mount `json:"mounts"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
+ RuntimeConstraints map[string]string `json:"runtime_constraints"`
+ State string `json:"state"`
+}
+
+type NewLogWriter func(name string) io.WriteCloser
+
+type ContainerRunner struct {
+ Docker *dockerclient.DockerClient
+ Api IArvadosClient
+ Kc IKeepClient
+ ContainerRecord
+ dockerclient.ContainerConfig
+ ContainerId string
+ ExitCode *int
+ NewLogWriter
+ loggingDone chan bool
+ CrunchLog *ThrottledLogger
+ Stdout *ThrottledLogger
+ Stderr *ThrottledLogger
+ LogCollection *CollectionWriter
+ LogsPDH *string
+ CancelLock sync.Mutex
+ Cancelled bool
+ SigChan chan os.Signal
+}
+
+func (this *ContainerRunner) SetupSignals() error {
+ this.SigChan = make(chan os.Signal, 1)
+ signal.Notify(this.SigChan, syscall.SIGTERM)
+ signal.Notify(this.SigChan, syscall.SIGINT)
+ signal.Notify(this.SigChan, syscall.SIGQUIT)
+
+ go func(sig <-chan os.Signal) {
+ for _ = range sig {
+ if !this.Cancelled {
+ this.CancelLock.Lock()
+ this.Cancelled = true
+ if this.ContainerId != "" {
+ this.Docker.StopContainer(this.ContainerId, 10)
+ }
+ this.CancelLock.Unlock()
+ }
+ }
+ }(this.SigChan)
+
+ return nil
+}
+
+func (this *ContainerRunner) LoadImage() (err error) {
+
+ this.CrunchLog.Printf("Fetching Docker image from collection '%s'", this.ContainerRecord.ContainerImage)
+
+ var collection Collection
+ err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
+ if err != nil {
+ return err
+ }
+ manifest := manifest.Manifest{Text: collection.ManifestText}
+ var img, imageId string
+ for ms := range manifest.StreamIter() {
+ img = ms.FileStreamSegments[0].Name
+ if !strings.HasSuffix(img, ".tar") {
+ return errors.New("First file in the collection does not end in .tar")
+ }
+ imageId = img[:len(img)-4]
+ }
+
+ this.CrunchLog.Printf("Using Docker image id '%s'", imageId)
+
+ _, err = this.Docker.InspectImage(imageId)
+ if err != nil {
+ this.CrunchLog.Print("Loading Docker image from keep")
+
+ var readCloser io.ReadCloser
+ readCloser, err = this.Kc.ManifestFileReader(manifest, img)
+ if err != nil {
+ return err
+ }
+
+ err = this.Docker.LoadImage(readCloser)
+ if err != nil {
+ return err
+ }
+ } else {
+ this.CrunchLog.Print("Docker image is available")
+ }
+
+ this.ContainerConfig.Image = imageId
+
+ return nil
+}
+
+func (this *ContainerRunner) StartContainer() (err error) {
+ this.CrunchLog.Print("Creating Docker container")
+
+ this.CancelLock.Lock()
+ defer this.CancelLock.Unlock()
+
+ if this.Cancelled {
+ return errors.New("Cancelled")
+ }
+
+ this.ContainerConfig.Cmd = this.ContainerRecord.Command
+ if this.ContainerRecord.Cwd != "." {
+ this.ContainerConfig.WorkingDir = this.ContainerRecord.Cwd
+ }
+ for k, v := range this.ContainerRecord.Environment {
+ this.ContainerConfig.Env = append(this.ContainerConfig.Env, k+"="+v)
+ }
+ this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
+ if err != nil {
+ return
+ }
+ hostConfig := &dockerclient.HostConfig{}
+
+ this.CrunchLog.Printf("Starting Docker container id '%s'", this.ContainerId)
+ err = this.Docker.StartContainer(this.ContainerId, hostConfig)
+ if err != nil {
+ return
+ }
+
+ return nil
+}
+
+func (this *ContainerRunner) AttachLogs() (err error) {
+
+ this.CrunchLog.Print("Attaching container logs")
+
+ var stderrReader, stdoutReader io.Reader
+ stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true})
+ if err != nil {
+ return
+ }
+ stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true})
+ if err != nil {
+ return
+ }
+
+ this.loggingDone = make(chan bool)
+
+ this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
+ this.Stderr = NewThrottledLogger(this.NewLogWriter("stderr"))
+ go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
+ go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
+
+ return nil
+}
+
+func (this *ContainerRunner) WaitFinish() error {
+ result := this.Docker.Wait(this.ContainerId)
+ wr := <-result
+ if wr.Error != nil {
+ return wr.Error
+ }
+ this.ExitCode = &wr.ExitCode
+
+ // drain stdout/stderr
+ <-this.loggingDone
+ <-this.loggingDone
+
+ this.Stdout.Stop()
+ this.Stderr.Stop()
+
+ return nil
+}
+
+func (this *ContainerRunner) CommitLogs() error {
+ if this.Cancelled {
+ this.CrunchLog.Print("Cancelled")
+ } else {
+ this.CrunchLog.Print("Complete")
+ }
+
+ this.CrunchLog.Stop()
+ this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
+ "crunchexec", nil})
+
+ mt, err := this.LogCollection.ManifestText()
+ if err != nil {
+ return err
+ }
+
+ response := make(map[string]string)
+ err = this.Api.Create("collections",
+ arvadosclient.Dict{"name": "logs for " + this.ContainerRecord.Uuid,
+ "manifest_text": mt},
+ response)
+ if err != nil {
+ return err
+ }
+
+ this.LogsPDH = new(string)
+ *this.LogsPDH = response["portable_data_hash"]
+
+ return nil
+}
+
+func (this *ContainerRunner) UpdateContainerRecordRunning() error {
+ update := arvadosclient.Dict{"state": "Running"}
+ return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+}
+
+func (this *ContainerRunner) UpdateContainerRecordComplete() error {
+ update := arvadosclient.Dict{}
+ if this.LogsPDH != nil {
+ update["log"] = *this.LogsPDH
+ }
+ if this.ExitCode != nil {
+ update["exit_code"] = *this.ExitCode
+ }
+
+ if this.Cancelled {
+ update["state"] = "Cancelled"
+ } else {
+ update["state"] = "Complete"
+ }
+ return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+}
+
+func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+ return &ArvLogWriter{this.Api, this.ContainerRecord.Uuid, name, this.LogCollection.Open(name + ".txt")}
+}
+
+func (this *ContainerRunner) Run(containerUuid string) (err error) {
+ this.CrunchLog.Printf("Executing container '%s'", containerUuid)
+
+ var runerr, waiterr error
+
+ defer func() {
+ if err != nil {
+ this.CrunchLog.Print(err)
+ }
+
+ // (6) write logs
+ logerr := this.CommitLogs()
+ if logerr != nil {
+ this.CrunchLog.Print(logerr)
+ }
+
+ // (7) update container record with results
+ updateerr := this.UpdateContainerRecordComplete()
+ if updateerr != nil {
+ this.CrunchLog.Print(updateerr)
+ }
+
+ this.CrunchLog.Stop()
+
+ if err == nil {
+ if runerr != nil {
+ err = runerr
+ } else if waiterr != nil {
+ err = runerr
+ } else if logerr != nil {
+ err = logerr
+ } else if updateerr != nil {
+ err = updateerr
+ }
+ }
+ }()
+
+ err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
+ if err != nil {
+ return
+ }
+
+ // (0) setup signal handling
+ err = this.SetupSignals()
+ if err != nil {
+ return
+ }
+
+ // (1) check for and/or load image
+ err = this.LoadImage()
+ if err != nil {
+ return
+ }
+
+ // (2) start container
+ err = this.StartContainer()
+ if err != nil {
+ if err.Error() == "Cancelled" {
+ err = nil
+ }
+ return
+ }
+
+ // (3) update container record state
+ err = this.UpdateContainerRecordRunning()
+ if err != nil {
+ this.CrunchLog.Print(err)
+ }
+
+ // (4) attach container logs
+ runerr = this.AttachLogs()
+ if runerr != nil {
+ this.CrunchLog.Print(runerr)
+ }
+
+ // (5) wait for container to finish
+ waiterr = this.WaitFinish()
+
+ return
+}
+
+func NewContainerRunner(api IArvadosClient,
+ kc IKeepClient,
+ docker *dockerclient.DockerClient) *ContainerRunner {
+
+ cr := &ContainerRunner{Api: api, Kc: kc, Docker: docker}
+ cr.NewLogWriter = cr.NewArvLogWriter
+ cr.LogCollection = &CollectionWriter{kc, nil}
+ cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
+ return cr
+}
+
+func main() {
+ flag.Parse()
+
+ api, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatal(err)
+ }
+ api.Retries = 8
+
+ var kc *keepclient.KeepClient
+ kc, err = keepclient.MakeKeepClient(&api)
+ if err != nil {
+ log.Fatal(err)
+ }
+ kc.Retries = 4
+
+ var docker *dockerclient.DockerClient
+ docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ cr := NewContainerRunner(api, kc, docker)
+
+ err = cr.Run(flag.Arg(0))
+ if err != nil {
+ log.Fatal(err)
+ }
+
+}
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
new file mode 100644
index 0000000..0c8f2c1
--- /dev/null
+++ b/services/crunch-exec/crunchexec_test.go
@@ -0,0 +1,537 @@
+package main
+
+import (
+ "bytes"
+ "crypto/md5"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "github.com/curoverse/dockerclient"
+ . "gopkg.in/check.v1"
+ "io"
+ "os"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+ Total int64
+ Calls int
+ Content arvadosclient.Dict
+ ContainerRecord
+ Logs map[string]*bytes.Buffer
+ WasSetRunning bool
+}
+
+type KeepTestClient struct {
+ Called bool
+ Content []byte
+}
+
+var hwManifest = ". 82ab40c24fc8df01798e57ba66795bb1+841216+Aa124ac75e5168396c73c0a18eda641a4f41791c0 at 569fa8c3 0:841216:9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar\n"
+var hwPDH = "a45557269dcb65a6b78f9ac061c0850b+120"
+var hwImageId = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar"
+
+var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d at 5693216f 0:46:md5sum.txt\n"
+var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
+
+func (this *ArvTestClient) Create(resourceType string,
+ parameters arvadosclient.Dict,
+ output interface{}) error {
+
+ this.Calls += 1
+ this.Content = parameters
+
+ if resourceType == "logs" {
+ et := parameters["event_type"].(string)
+ if this.Logs == nil {
+ this.Logs = make(map[string]*bytes.Buffer)
+ }
+ if this.Logs[et] == nil {
+ this.Logs[et] = &bytes.Buffer{}
+ }
+ this.Logs[et].Write([]byte(parameters["properties"].(map[string]string)["text"]))
+ }
+
+ if resourceType == "collections" && output != nil {
+ mt := parameters["manifest_text"].(string)
+ outmap := output.(map[string]string)
+ outmap["portable_data_hash"] = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+ }
+
+ return nil
+}
+
+func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+ if resourceType == "collections" {
+ if uuid == hwPDH {
+ output.(*Collection).ManifestText = hwManifest
+ } else if uuid == otherPDH {
+ output.(*Collection).ManifestText = otherManifest
+ }
+ }
+ if resourceType == "containers" {
+ (*output.(*ContainerRecord)) = this.ContainerRecord
+ }
+ return nil
+}
+
+func (this *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+
+ this.Content = parameters
+ if resourceType == "containers" {
+ if parameters["state"] == "Running" {
+ this.WasSetRunning = true
+ }
+
+ }
+ return nil
+}
+
+func (this *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ this.Content = buf
+ return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
+}
+
+type FileWrapper struct {
+ io.ReadCloser
+ len uint64
+}
+
+func (this FileWrapper) Len() uint64 {
+ return this.len
+}
+
+func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+ if filename == "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar" {
+ rdr, err := os.Open("fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar")
+ this.Called = true
+ return FileWrapper{rdr, 1321984}, err
+ }
+ return nil, nil
+}
+
+func (s *TestSuite) TestLoadImage(c *C) {
+ kc := &KeepTestClient{}
+ docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
+
+ _, err = cr.Docker.RemoveImage(hwImageId, true)
+
+ _, err = cr.Docker.InspectImage(hwImageId)
+ c.Check(err, NotNil)
+
+ cr.ContainerRecord.ContainerImage = hwPDH
+
+ // (1) Test loading image from keep
+ c.Check(kc.Called, Equals, false)
+ c.Check(cr.ContainerConfig.Image, Equals, "")
+
+ err = cr.LoadImage()
+
+ c.Check(err, IsNil)
+ defer func() {
+ cr.Docker.RemoveImage(hwImageId, true)
+ }()
+
+ c.Check(kc.Called, Equals, true)
+ c.Check(cr.ContainerConfig.Image, Equals, hwImageId)
+
+ _, err = cr.Docker.InspectImage(hwImageId)
+ c.Check(err, IsNil)
+
+ // (2) Test using image that's already loaded
+ kc.Called = false
+ cr.ContainerConfig.Image = ""
+
+ err = cr.LoadImage()
+ c.Check(err, IsNil)
+ c.Check(kc.Called, Equals, false)
+ c.Check(cr.ContainerConfig.Image, Equals, hwImageId)
+
+}
+
+type ArvErrorTestClient struct{}
+type KeepErrorTestClient struct{}
+type KeepReadErrorTestClient struct{}
+
+func (this ArvErrorTestClient) Create(resourceType string,
+ parameters arvadosclient.Dict,
+ output interface{}) error {
+ return nil
+}
+
+func (this ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+ return errors.New("ArvError")
+}
+
+func (this ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (this KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return "", 0, nil
+}
+
+func (this KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+ return nil, errors.New("KeepError")
+}
+
+func (this KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return "", 0, nil
+}
+
+type ErrorReader struct{}
+
+func (this ErrorReader) Read(p []byte) (n int, err error) {
+ return 0, errors.New("ErrorReader")
+}
+
+func (this ErrorReader) Close() error {
+ return nil
+}
+
+func (this ErrorReader) Len() uint64 {
+ return 0
+}
+
+func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+ return ErrorReader{}, nil
+}
+
+func (s *TestSuite) TestLoadImageArvError(c *C) {
+ // (1) Arvados error
+ cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil)
+ cr.ContainerRecord.ContainerImage = hwPDH
+
+ err := cr.LoadImage()
+ c.Check(err.Error(), Equals, "ArvError")
+}
+
+func (s *TestSuite) TestLoadImageKeepError(c *C) {
+ // (2) Keep error
+ docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker)
+ cr.ContainerRecord.ContainerImage = hwPDH
+
+ err := cr.LoadImage()
+ c.Check(err.Error(), Equals, "KeepError")
+}
+
+func (s *TestSuite) TestLoadImageCollectionError(c *C) {
+ // (3) Collection doesn't contain image
+ cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil)
+ cr.ContainerRecord.ContainerImage = otherPDH
+
+ err := cr.LoadImage()
+ c.Check(err.Error(), Equals, "First file in the collection does not end in .tar")
+}
+
+func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
+ // (4) Collection doesn't contain image
+ docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker)
+ cr.ContainerRecord.ContainerImage = hwPDH
+
+ err := cr.LoadImage()
+ c.Check(err, NotNil)
+}
+
+type ClosableBuffer struct {
+ bytes.Buffer
+}
+
+type TestLogs struct {
+ Stdout ClosableBuffer
+ Stderr ClosableBuffer
+}
+
+func (this *ClosableBuffer) Write(p []byte) (n int, err error) {
+ return this.Buffer.Write(p)
+}
+
+func (this *ClosableBuffer) Close() error {
+ return nil
+}
+
+func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
+ if logstr == "stdout" {
+ return &this.Stdout
+ }
+ if logstr == "stderr" {
+ return &this.Stderr
+ }
+ return nil
+}
+
+func (s *TestSuite) TestRunContainer(c *C) {
+ docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
+
+ var logs TestLogs
+ cr.NewLogWriter = logs.NewTestLoggingWriter
+ cr.ContainerRecord.ContainerImage = hwPDH
+d cr.ContainerRecord.Command = []string{"./hw"}
+ err := cr.LoadImage()
+ c.Check(err, IsNil)
+
+ err = cr.StartContainer()
+ c.Check(err, IsNil)
+
+ err = cr.AttachLogs()
+ c.Check(err, IsNil)
+
+ err = cr.WaitFinish()
+ c.Check(err, IsNil)
+
+ c.Check(strings.HasSuffix(logs.Stdout.String(), "Hello world\n"), Equals, true)
+ c.Check(logs.Stderr.String(), Equals, "")
+}
+
+func (s *LoggingTestSuite) TestCommitLogs(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+ cr.CrunchLog.Print("Hello world!")
+ cr.CrunchLog.Print("Goodbye")
+
+ err := cr.CommitLogs()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Check(api.Content["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunchexec.txt\n")
+ c.Check(*cr.LogsPDH, Equals, "d3a229d2fe3690c2c3e75a71a153c6a3+60")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+ err := cr.UpdateContainerRecordRunning()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["state"], Equals, "Running")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+ cr.LogsPDH = new(string)
+ *cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
+
+ cr.ExitCode = new(int)
+ *cr.ExitCode = 42
+
+ err := cr.UpdateContainerRecordComplete()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["log"], Equals, *cr.LogsPDH)
+ c.Check(api.Content["exit_code"], Equals, *cr.ExitCode)
+ c.Check(api.Content["state"], Equals, "Complete")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr.Cancelled = true
+
+ err := cr.UpdateContainerRecordComplete()
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["log"], IsNil)
+ c.Check(api.Content["exit_code"], IsNil)
+ c.Check(api.Content["state"], Equals, "Cancelled")
+}
+
+func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner) {
+ rec := ContainerRecord{}
+ err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+ c.Check(err, IsNil)
+
+ docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker.RemoveImage(hwImageId, true)
+
+ api = &ArvTestClient{ContainerRecord: rec}
+ cr = NewContainerRunner(api, &KeepTestClient{}, docker)
+
+ err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Check(err, IsNil)
+ c.Check(api.WasSetRunning, Equals, true)
+
+ c.Check(api.Content["log"], NotNil)
+
+ if err != nil {
+ for k, v := range api.Logs {
+ c.Log(k)
+ c.Log(v.String())
+ }
+ }
+
+ return
+}
+
+func (s *LoggingTestSuite) TestFullRunHello(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["log"], NotNil)
+ c.Check(api.Content["exit_code"], Equals, 1)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
+ c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["pwd"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["pwd"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestCancel(c *C) {
+ record := `{
+ "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`
+
+ rec := ContainerRecord{}
+ err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+ c.Check(err, IsNil)
+
+ docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ docker.RemoveImage(hwImageId, true)
+
+ api := &ArvTestClient{ContainerRecord: rec}
+ cr := NewContainerRunner(api, &KeepTestClient{}, docker)
+
+ go func() {
+ for cr.ContainerId == "" {
+ time.Sleep(1 * time.Second)
+ }
+ cr.SigChan <- syscall.SIGINT
+ }()
+
+ err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+
+ c.Check(err, IsNil)
+
+ c.Check(api.Content["log"], NotNil)
+
+ if err != nil {
+ for k, v := range api.Logs {
+ c.Log(k)
+ c.Log(v.String())
+ }
+ }
+
+ c.Check(api.Content["state"], Equals, "Cancelled")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`)
+
+ c.Check(api.Content["exit_code"], Equals, 0)
+ c.Check(api.Content["state"], Equals, "Complete")
+
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
+}
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
new file mode 100644
index 0000000..01b6548
--- /dev/null
+++ b/services/crunch-exec/logging.go
@@ -0,0 +1,159 @@
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "io"
+ "log"
+ "sync"
+ "time"
+)
+
+type Timestamper func(t time.Time) string
+
+type ThrottledLogger struct {
+ *log.Logger
+ buf *bytes.Buffer
+ sync.Mutex
+ writer io.WriteCloser
+ stop bool
+ flusherDone chan bool
+ Timestamper
+}
+
+func RFC3339Timestamp(now time.Time) string {
+ // return now.Format(time.RFC3339Nano)
+ // Builtin RFC3339Nano format isn't fixed width so
+ // provide our own.
+
+ return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ",
+ now.Year(), now.Month(), now.Day(),
+ now.Hour(), now.Minute(), now.Second(),
+ now.Nanosecond())
+
+}
+
+func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
+ this.Mutex.Lock()
+ if this.buf == nil {
+ this.buf = &bytes.Buffer{}
+ }
+ defer this.Mutex.Unlock()
+
+ now := time.Now().UTC()
+ _, err = fmt.Fprintf(this.buf, "%s %s", this.Timestamper(now), p)
+ return len(p), err
+}
+
+func (this *ThrottledLogger) Stop() {
+ this.stop = true
+ <-this.flusherDone
+ this.writer.Close()
+}
+
+func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
+ for b := range c {
+ writer.Write(b.Bytes())
+ }
+ t <- true
+}
+
+func (this *ThrottledLogger) flusher() {
+ bufchan := make(chan *bytes.Buffer)
+ bufterm := make(chan bool)
+ go goWriter(this.writer, bufchan, bufterm)
+ for {
+ if !this.stop {
+ time.Sleep(1 * time.Second)
+ }
+ this.Mutex.Lock()
+ if this.buf != nil && this.buf.Len() > 0 {
+ bufchan <- this.buf
+ this.buf = nil
+ } else if this.stop {
+ this.Mutex.Unlock()
+ break
+ }
+ this.Mutex.Unlock()
+ }
+ close(bufchan)
+ <-bufterm
+ this.flusherDone <- true
+}
+
+const (
+ MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
+)
+
+func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
+ reader := bufio.NewReaderSize(in, MaxLogLine)
+ var prefix string
+ for {
+ line, isPrefix, err := reader.ReadLine()
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ logger.Print("error reading container log:", err)
+ }
+ var suffix string
+ if isPrefix {
+ suffix = "[...]"
+ }
+ logger.Print(prefix, string(line), suffix)
+ // Set up prefix for following line
+ if isPrefix {
+ prefix = "[...]"
+ } else {
+ prefix = ""
+ }
+ }
+ done <- true
+}
+
+func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
+ alw := &ThrottledLogger{}
+ alw.flusherDone = make(chan bool)
+ alw.writer = writer
+ alw.Logger = log.New(alw, "", 0)
+ alw.Timestamper = RFC3339Timestamp
+ go alw.flusher()
+ return alw
+}
+
+type ArvLogWriter struct {
+ Api IArvadosClient
+ Uuid string
+ loggingStream string
+ io.WriteCloser
+}
+
+func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+ var err1 error
+ if this.WriteCloser != nil {
+ _, err1 = this.WriteCloser.Write(p)
+ }
+
+ // write to API
+ lr := arvadosclient.Dict{"object_uuid": this.Uuid,
+ "event_type": this.loggingStream,
+ "properties": map[string]string{"text": string(p)}}
+ err2 := this.Api.Create("logs", lr, nil)
+
+ if err1 != nil || err2 != nil {
+ return 0, errors.New(fmt.Sprintf("%s ; %s", err1, err2))
+ } else {
+ return len(p), nil
+ }
+
+}
+
+func (this *ArvLogWriter) Close() (err error) {
+ if this.WriteCloser != nil {
+ err = this.WriteCloser.Close()
+ this.WriteCloser = nil
+ }
+ return err
+}
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
new file mode 100644
index 0000000..27d066a
--- /dev/null
+++ b/services/crunch-exec/logging_test.go
@@ -0,0 +1,104 @@
+package main
+
+import (
+ "fmt"
+ . "gopkg.in/check.v1"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func Test2(t *testing.T) {
+ TestingT(t)
+}
+
+type LoggingTestSuite struct{}
+
+type TestTimestamper struct {
+ count int
+}
+
+func (this *TestTimestamper) Timestamp(t time.Time) string {
+ this.count += 1
+ return fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&LoggingTestSuite{})
+
+func (s *LoggingTestSuite) TestWriteLogs(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+ cr.CrunchLog.Print("Hello world!")
+ cr.CrunchLog.Print("Goodbye")
+ cr.CrunchLog.Stop()
+
+ c.Check(api.Calls, Equals, 1)
+
+ mt, err := cr.LogCollection.ManifestText()
+ c.Check(err, IsNil)
+ c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunchexec.txt\n")
+
+ logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+ "2015-12-29T15:51:45.000000002Z Goodbye\n"
+
+ c.Check(api.Content["event_type"], Equals, "crunchexec")
+ c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext)
+ c.Check(string(kc.Content), Equals, logtext)
+}
+
+func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+ for i := 0; i < 2000000; i += 1 {
+ cr.CrunchLog.Printf("Hello %d", i)
+ }
+ cr.CrunchLog.Print("Goodbye")
+ cr.CrunchLog.Stop()
+
+ c.Check(api.Calls > 1, Equals, true)
+ c.Check(api.Calls < 2000000, Equals, true)
+
+ mt, err := cr.LogCollection.ManifestText()
+ c.Check(err, IsNil)
+ c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunchexec.txt\n")
+}
+
+func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil)
+ ts := &TestTimestamper{}
+ cr.CrunchLog.Timestamper = ts.Timestamp
+ stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
+ stdout.Timestamper = ts.Timestamp
+
+ cr.CrunchLog.Print("Hello world!")
+ stdout.Print("Doing stuff")
+ cr.CrunchLog.Print("Goodbye")
+ stdout.Print("Blurb")
+
+ cr.CrunchLog.Stop()
+ logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+ "2015-12-29T15:51:45.000000003Z Goodbye\n"
+ c.Check(api.Content["event_type"], Equals, "crunchexec")
+ c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext1)
+
+ stdout.Stop()
+ logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
+ "2015-12-29T15:51:45.000000004Z Blurb\n"
+ c.Check(api.Content["event_type"], Equals, "stdout")
+ c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext2)
+
+ mt, err := cr.LogCollection.ManifestText()
+ c.Check(err, IsNil)
+ c.Check(mt, Equals, ""+
+ ". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunchexec.txt\n"+
+ ". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
+}
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
new file mode 100644
index 0000000..e85d636
--- /dev/null
+++ b/services/crunch-exec/upload.go
@@ -0,0 +1,174 @@
+package main
+
+import (
+ "bytes"
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "io"
+ "strings"
+)
+
+type Block struct {
+ data []byte
+ offset int64
+}
+
+type CollectionFileWriter struct {
+ IKeepClient
+ *manifest.ManifestStream
+ offset uint64
+ length uint64
+ *Block
+ uploader chan *Block
+ finish chan []error
+ fn string
+}
+
+func (m *CollectionFileWriter) Write(p []byte) (int, error) {
+ n, err := m.ReadFrom(bytes.NewReader(p))
+ return int(n), err
+}
+
+func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
+ var total int64
+ var count int
+
+ for err == nil {
+ if m.Block == nil {
+ m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+ }
+ count, err = r.Read(m.Block.data[m.Block.offset:])
+ total += int64(count)
+ m.Block.offset += int64(count)
+ if m.Block.offset == keepclient.BLOCKSIZE {
+ m.uploader <- m.Block
+ m.Block = nil
+ }
+ }
+
+ m.length += uint64(total)
+
+ if err == io.EOF {
+ return total, nil
+ } else {
+ return total, err
+ }
+}
+
+func (m *CollectionFileWriter) Close() error {
+ m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
+ manifest.FileStreamSegment{m.offset, m.length, m.fn})
+ return nil
+}
+
+func (m *CollectionFileWriter) goUpload() {
+ var errors []error
+ uploader := m.uploader
+ finish := m.finish
+ for block := range uploader {
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
+ }
+ finish <- errors
+}
+
+type CollectionWriter struct {
+ IKeepClient
+ Streams []*CollectionFileWriter
+}
+
+func (m *CollectionWriter) Open(path string) io.WriteCloser {
+ var dir string
+ var fn string
+
+ i := strings.Index(path, "/")
+ if i > -1 {
+ dir = "./" + path[0:i]
+ fn = path[i+1:]
+ } else {
+ dir = "."
+ fn = path
+ }
+
+ fw := &CollectionFileWriter{
+ m.IKeepClient,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ 0,
+ nil,
+ make(chan *Block),
+ make(chan []error),
+ fn}
+ go fw.goUpload()
+
+ m.Streams = append(m.Streams, fw)
+
+ return fw
+}
+
+func (m *CollectionWriter) Finish() error {
+ var errstring string
+ for _, stream := range m.Streams {
+ if stream.uploader == nil {
+ continue
+ }
+ if stream.Block != nil {
+ stream.uploader <- stream.Block
+ }
+ close(stream.uploader)
+ stream.uploader = nil
+
+ errors := <-stream.finish
+ close(stream.finish)
+ stream.finish = nil
+
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+ }
+ }
+ if errstring != "" {
+ return errors.New(errstring)
+ } else {
+ return nil
+ }
+}
+
+func (m *CollectionWriter) ManifestText() (mt string, err error) {
+ err = m.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ var buf bytes.Buffer
+
+ for _, v := range m.Streams {
+ k := v.StreamName
+ if k == "." {
+ buf.WriteString(".")
+ } else {
+ k = strings.Replace(k, " ", "\\040", -1)
+ k = strings.Replace(k, "\n", "", -1)
+ buf.WriteString("./" + k)
+ }
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ for _, f := range v.FileStreamSegments {
+ buf.WriteString(" ")
+ name := strings.Replace(f.Name, " ", "\\040", -1)
+ name = strings.Replace(name, "\n", "", -1)
+ buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
+ }
+ buf.WriteString("\n")
+ }
+ return buf.String(), nil
+}
commit 0a46b75ac1e96a6f5ce3ae797eb4306b352fab36
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jan 6 10:51:28 2016 -0500
7816: Update Go SDK to use correct block locator pattern. Change FileToken
string in manifest to FileStreamSegment struct.
diff --git a/sdk/go/blockdigest/blockdigest.go b/sdk/go/blockdigest/blockdigest.go
index a5a668f..cfc510c 100644
--- a/sdk/go/blockdigest/blockdigest.go
+++ b/sdk/go/blockdigest/blockdigest.go
@@ -9,7 +9,7 @@ import (
)
var LocatorPattern = regexp.MustCompile(
- "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9 at _-]+)*$")
+ "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9 at _-]*)*$")
// Stores a Block Locator Digest compactly, up to 128 bits.
// Can be used as a map key.
diff --git a/sdk/go/blockdigest/blockdigest_test.go b/sdk/go/blockdigest/blockdigest_test.go
index e520dee..395e2f8 100644
--- a/sdk/go/blockdigest/blockdigest_test.go
+++ b/sdk/go/blockdigest/blockdigest_test.go
@@ -143,6 +143,9 @@ func TestLocatorPatternBasic(t *testing.T) {
"12345678901234567890123456789012+12345+A1+B123wxyz at _-")
expectLocatorPatternMatch(t,
"12345678901234567890123456789012+12345+A1+B123wxyz at _-+C@")
+ expectLocatorPatternMatch(t, "12345678901234567890123456789012+12345+A")
+ expectLocatorPatternMatch(t, "12345678901234567890123456789012+12345+A1+B")
+ expectLocatorPatternMatch(t, "12345678901234567890123456789012+12345+A+B2")
expectLocatorPatternFail(t, "12345678901234567890123456789012")
expectLocatorPatternFail(t, "12345678901234567890123456789012+")
@@ -153,11 +156,9 @@ func TestLocatorPatternBasic(t *testing.T) {
expectLocatorPatternFail(t, "12345678901234567890123456789012+12345 ")
expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+1")
expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+1A")
- expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A")
expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+a1")
expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A1+")
- expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A1+B")
- expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A+B2")
+
}
func TestParseBlockLocatorSimple(t *testing.T) {
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index 06a6678..a3dc3d5 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -130,8 +130,8 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
stream.offset += count
- stream.ManifestStream.FileTokens = append(stream.ManifestStream.FileTokens,
- fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+ stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
+ manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
return nil
}
@@ -189,11 +189,11 @@ func (m *ManifestWriter) ManifestText() string {
buf.WriteString(" ")
buf.WriteString(b)
}
- for _, f := range v.FileTokens {
+ for _, f := range v.FileStreamSegments {
buf.WriteString(" ")
- f = strings.Replace(f, " ", "\\040", -1)
- f = strings.Replace(f, "\n", "", -1)
- buf.WriteString(f)
+ name := strings.Replace(f.Name, " ", "\\040", -1)
+ name = strings.Replace(name, "\n", "", -1)
+ buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
}
buf.WriteString("\n")
}
diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index b532a16..d2c171d 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -40,6 +40,10 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
return nil, ErrNoManifest
}
m := manifest.Manifest{Text: mText}
+ return kc.ManifestFileReader(m, filename)
+}
+
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
rdrChan := make(chan *cfReader)
go kc.queueSegmentsToGet(m, filename, rdrChan)
r, ok := <-rdrChan
diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go
index d4b9830..cf0ae85 100644
--- a/sdk/go/manifest/manifest.go
+++ b/sdk/go/manifest/manifest.go
@@ -44,12 +44,19 @@ type FileSegment struct {
Len int
}
+// FileStreamSegment is a portion of a file described as a segment of a stream.
+type FileStreamSegment struct {
+ SegPos uint64
+ SegLen uint64
+ Name string
+}
+
// Represents a single line from a manifest.
type ManifestStream struct {
- StreamName string
- Blocks []string
- FileTokens []string
- Err error
+ StreamName string
+ Blocks []string
+ FileStreamSegments []FileStreamSegment
+ Err error
}
var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
@@ -97,21 +104,21 @@ func ParseBlockLocator(s string) (b BlockLocator, err error) {
return
}
-func parseFileToken(tok string) (segPos, segLen uint64, name string, err error) {
+func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
parts := strings.SplitN(tok, ":", 3)
if len(parts) != 3 {
err = ErrInvalidToken
return
}
- segPos, err = strconv.ParseUint(parts[0], 10, 64)
+ ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return
}
- segLen, err = strconv.ParseUint(parts[1], 10, 64)
+ ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return
}
- name = UnescapeName(parts[2])
+ ft.Name = UnescapeName(parts[2])
return
}
@@ -128,12 +135,11 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
blockLens := make([]int, 0, len(s.Blocks))
// This is what streamName+"/"+fileName will look like:
target := "./" + filepath
- for _, fTok := range s.FileTokens {
- wantPos, wantLen, name, err := parseFileToken(fTok)
- if err != nil {
- // Skip (!) invalid file tokens.
- continue
- }
+ for _, fTok := range s.FileStreamSegments {
+ wantPos := fTok.SegPos
+ wantLen := fTok.SegLen
+ name := fTok.Name
+
if s.StreamName+"/"+name != target {
continue
}
@@ -199,24 +205,25 @@ func parseManifestStream(s string) (m ManifestStream) {
}
}
m.Blocks = tokens[:i]
- m.FileTokens = tokens[i:]
+ fileTokens := tokens[i:]
if len(m.Blocks) == 0 {
m.Err = fmt.Errorf("No block locators found")
return
}
- if len(m.FileTokens) == 0 {
+ if len(fileTokens) == 0 {
m.Err = fmt.Errorf("No file tokens found")
return
}
- for _, ft := range m.FileTokens {
- _, _, _, err := parseFileToken(ft)
+ for _, ft := range fileTokens {
+ pft, err := parseFileStreamSegment(ft)
if err != nil {
m.Err = fmt.Errorf("Invalid file token: %s", ft)
break
}
+ m.FileStreamSegments = append(m.FileStreamSegments, pft)
}
return
diff --git a/sdk/go/manifest/manifest_test.go b/sdk/go/manifest/manifest_test.go
index 43c3bd3..f52d564 100644
--- a/sdk/go/manifest/manifest_test.go
+++ b/sdk/go/manifest/manifest_test.go
@@ -61,10 +61,21 @@ func expectStringSlicesEqual(t *testing.T, actual []string, expected []string) {
}
}
+func expectFileStreamSegmentsEqual(t *testing.T, actual []FileStreamSegment, expected []FileStreamSegment) {
+ if len(actual) != len(expected) {
+ t.Fatalf("Expected %v (length %d), but received %v (length %d) instead. %s", expected, len(expected), actual, len(actual), getStackTrace())
+ }
+ for i := range actual {
+ if actual[i] != expected[i] {
+ t.Fatalf("Expected %v but received %v instead (first disagreement at position %d). %s", expected, actual, i, getStackTrace())
+ }
+ }
+}
+
func expectManifestStream(t *testing.T, actual ManifestStream, expected ManifestStream) {
expectEqual(t, actual.StreamName, expected.StreamName)
expectStringSlicesEqual(t, actual.Blocks, expected.Blocks)
- expectStringSlicesEqual(t, actual.FileTokens, expected.FileTokens)
+ expectFileStreamSegmentsEqual(t, actual.FileStreamSegments, expected.FileStreamSegments)
}
func expectBlockLocator(t *testing.T, actual blockdigest.BlockLocator, expected blockdigest.BlockLocator) {
@@ -76,8 +87,8 @@ func expectBlockLocator(t *testing.T, actual blockdigest.BlockLocator, expected
func TestParseManifestStreamSimple(t *testing.T) {
m := parseManifestStream(". 365f83f5f808896ec834c8b595288735+2310+K at qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf 0:2310:qr1hi-8i9sb-ienvmpve1a0vpoi.log.txt")
expectManifestStream(t, m, ManifestStream{StreamName: ".",
- Blocks: []string{"365f83f5f808896ec834c8b595288735+2310+K at qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf"},
- FileTokens: []string{"0:2310:qr1hi-8i9sb-ienvmpve1a0vpoi.log.txt"}})
+ Blocks: []string{"365f83f5f808896ec834c8b595288735+2310+K at qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf"},
+ FileStreamSegments: []FileStreamSegment{{0, 2310, "qr1hi-8i9sb-ienvmpve1a0vpoi.log.txt"}}})
}
func TestParseBlockLocatorSimple(t *testing.T) {
@@ -108,8 +119,8 @@ func TestStreamIterShortManifestWithBlankStreams(t *testing.T) {
expectManifestStream(t,
firstStream,
ManifestStream{StreamName: ".",
- Blocks: []string{"b746e3d2104645f2f64cd3cc69dd895d+15693477+E2866e643690156651c03d876e638e674dcd79475 at 5441920c"},
- FileTokens: []string{"0:15893477:chr10_band0_s0_e3000000.fj"}})
+ Blocks: []string{"b746e3d2104645f2f64cd3cc69dd895d+15693477+E2866e643690156651c03d876e638e674dcd79475 at 5441920c"},
+ FileStreamSegments: []FileStreamSegment{{0, 15893477, "chr10_band0_s0_e3000000.fj"}}})
received, ok := <-streamIter
if ok {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list