[ARVADOS] created: f7115688cc3a63b4721ca59259bc8276685bac9a

git at public.curoverse.com git at public.curoverse.com
Wed Jan 6 16:39:08 EST 2016


        at  f7115688cc3a63b4721ca59259bc8276685bac9a (commit)


commit f7115688cc3a63b4721ca59259bc8276685bac9a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jan 6 16:38:58 2016 -0500

    7816: Converted testing to use stubbed out ThinDockerClient instead of real Docker.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
index d24073b..9e6a12f 100644
--- a/services/crunch-exec/crunchexec.go
+++ b/services/crunch-exec/crunchexec.go
@@ -48,8 +48,19 @@ type ContainerRecord struct {
 
 type NewLogWriter func(name string) io.WriteCloser
 
+type ThinDockerClient interface {
+	StopContainer(id string, timeout int) error
+	InspectImage(id string) (*dockerclient.ImageInfo, error)
+	LoadImage(reader io.Reader) error
+	CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
+	StartContainer(id string, config *dockerclient.HostConfig) error
+	ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
+	Wait(id string) <-chan dockerclient.WaitResult
+	RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+}
+
 type ContainerRunner struct {
-	Docker *dockerclient.DockerClient
+	Docker ThinDockerClient
 	Api    IArvadosClient
 	Kc     IKeepClient
 	ContainerRecord
@@ -348,7 +359,7 @@ func (this *ContainerRunner) Run(containerUuid string) (err error) {
 
 func NewContainerRunner(api IArvadosClient,
 	kc IKeepClient,
-	docker *dockerclient.DockerClient) *ContainerRunner {
+	docker ThinDockerClient) *ContainerRunner {
 
 	cr := &ContainerRunner{Api: api, Kc: kc, Docker: docker}
 	cr.NewLogWriter = cr.NewArvLogWriter
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
index 0c8f2c1..fcd5f80 100644
--- a/services/crunch-exec/crunchexec_test.go
+++ b/services/crunch-exec/crunchexec_test.go
@@ -12,7 +12,7 @@ import (
 	"github.com/curoverse/dockerclient"
 	. "gopkg.in/check.v1"
 	"io"
-	"os"
+	"io/ioutil"
 	"strings"
 	"syscall"
 	"testing"
@@ -20,7 +20,7 @@ import (
 )
 
 // Gocheck boilerplate
-func Test(t *testing.T) {
+func TestCrunchExec(t *testing.T) {
 	TestingT(t)
 }
 
@@ -45,11 +45,92 @@ type KeepTestClient struct {
 
 var hwManifest = ". 82ab40c24fc8df01798e57ba66795bb1+841216+Aa124ac75e5168396c73c0a18eda641a4f41791c0 at 569fa8c3 0:841216:9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar\n"
 var hwPDH = "a45557269dcb65a6b78f9ac061c0850b+120"
-var hwImageId = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar"
+var hwImageId = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7"
 
 var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d at 5693216f 0:46:md5sum.txt\n"
 var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
 
+type TestDockerClient struct {
+	imageLoaded  string
+	stdoutReader io.ReadCloser
+	stderrReader io.ReadCloser
+	stdoutWriter io.WriteCloser
+	stderrWriter io.WriteCloser
+	fn           func(t *TestDockerClient)
+	finish       chan dockerclient.WaitResult
+	stop         chan bool
+	cwd          string
+	env          []string
+}
+
+func NewTestDockerClient() *TestDockerClient {
+	t := &TestDockerClient{}
+	t.stdoutReader, t.stdoutWriter = io.Pipe()
+	t.stderrReader, t.stderrWriter = io.Pipe()
+	t.finish = make(chan dockerclient.WaitResult)
+	t.stop = make(chan bool)
+	t.cwd = "/"
+	return t
+}
+
+func (t *TestDockerClient) StopContainer(id string, timeout int) error {
+	t.stop <- true
+	return nil
+}
+
+func (t *TestDockerClient) InspectImage(id string) (*dockerclient.ImageInfo, error) {
+	if t.imageLoaded == id {
+		return &dockerclient.ImageInfo{}, nil
+	} else {
+		return nil, errors.New("")
+	}
+}
+
+func (t *TestDockerClient) LoadImage(reader io.Reader) error {
+	_, err := io.Copy(ioutil.Discard, reader)
+	if err != nil {
+		return err
+	} else {
+		t.imageLoaded = hwImageId
+		return nil
+	}
+}
+
+func (t *TestDockerClient) CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error) {
+	if config.WorkingDir != "" {
+		t.cwd = config.WorkingDir
+	}
+	t.env = config.Env
+	return "abcde", nil
+}
+
+func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostConfig) error {
+	if id == "abcde" {
+		go t.fn(t)
+		return nil
+	} else {
+		return errors.New("Invalid container id")
+	}
+}
+
+func (t *TestDockerClient) ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error) {
+	if options.Stdout {
+		return t.stdoutReader, nil
+	}
+	if options.Stderr {
+		return t.stderrReader, nil
+	}
+	return nil, nil
+}
+
+func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
+	return t.finish
+}
+
+func (*TestDockerClient) RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error) {
+	return nil, nil
+}
+
 func (this *ArvTestClient) Create(resourceType string,
 	parameters arvadosclient.Dict,
 	output interface{}) error {
@@ -118,20 +199,20 @@ func (this FileWrapper) Len() uint64 {
 }
 
 func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
-	if filename == "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar" {
-		rdr, err := os.Open("fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar")
+	if filename == hwImageId+".tar" {
+		rdr := ioutil.NopCloser(&bytes.Buffer{})
 		this.Called = true
-		return FileWrapper{rdr, 1321984}, err
+		return FileWrapper{rdr, 1321984}, nil
 	}
 	return nil, nil
 }
 
 func (s *TestSuite) TestLoadImage(c *C) {
 	kc := &KeepTestClient{}
-	docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker := NewTestDockerClient()
 	cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
 
-	_, err = cr.Docker.RemoveImage(hwImageId, true)
+	_, err := cr.Docker.RemoveImage(hwImageId, true)
 
 	_, err = cr.Docker.InspectImage(hwImageId)
 	c.Check(err, NotNil)
@@ -225,7 +306,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
 	// (2) Keep error
-	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker := NewTestDockerClient()
 	cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker)
 	cr.ContainerRecord.ContainerImage = hwPDH
 
@@ -244,7 +325,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
 	// (4) Collection doesn't contain image
-	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker := NewTestDockerClient()
 	cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker)
 	cr.ContainerRecord.ContainerImage = hwPDH
 
@@ -280,13 +361,19 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker := NewTestDockerClient()
+	docker.fn = func(t *TestDockerClient) {
+		t.stdoutWriter.Write([]byte("Hello world\n"))
+		t.stdoutWriter.Close()
+		t.stderrWriter.Close()
+		t.finish <- dockerclient.WaitResult{}
+	}
 	cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
 
 	var logs TestLogs
 	cr.NewLogWriter = logs.NewTestLoggingWriter
 	cr.ContainerRecord.ContainerImage = hwPDH
-d	cr.ContainerRecord.Command = []string{"./hw"}
+	cr.ContainerRecord.Command = []string{"./hw"}
 	err := cr.LoadImage()
 	c.Check(err, IsNil)
 
@@ -303,7 +390,7 @@ d	cr.ContainerRecord.Command = []string{"./hw"}
 	c.Check(logs.Stderr.String(), Equals, "")
 }
 
-func (s *LoggingTestSuite) TestCommitLogs(c *C) {
+func (s *TestSuite) TestCommitLogs(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
@@ -321,7 +408,7 @@ func (s *LoggingTestSuite) TestCommitLogs(c *C) {
 	c.Check(*cr.LogsPDH, Equals, "d3a229d2fe3690c2c3e75a71a153c6a3+60")
 }
 
-func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
+func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
@@ -333,7 +420,7 @@ func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
 	c.Check(api.Content["state"], Equals, "Running")
 }
 
-func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
+func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
@@ -353,7 +440,7 @@ func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
 	c.Check(api.Content["state"], Equals, "Complete")
 }
 
-func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
+func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
 	api := &ArvTestClient{}
 	kc := &KeepTestClient{}
 	cr := NewContainerRunner(api, kc, nil)
@@ -368,12 +455,13 @@ func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
 	c.Check(api.Content["state"], Equals, "Cancelled")
 }
 
-func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner) {
+func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
 	rec := ContainerRecord{}
 	err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
 	c.Check(err, IsNil)
 
-	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker := NewTestDockerClient()
+	docker.fn = fn
 	docker.RemoveImage(hwImageId, true)
 
 	api = &ArvTestClient{ContainerRecord: rec}
@@ -395,7 +483,7 @@ func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner
 	return
 }
 
-func (s *LoggingTestSuite) TestFullRunHello(c *C) {
+func (s *TestSuite) TestFullRunHello(c *C) {
 	api, _ := FullRunHelper(c, `{
     "command": ["echo", "hello world"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -405,7 +493,12 @@ func (s *LoggingTestSuite) TestFullRunHello(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+		t.stdoutWriter.Write([]byte("hello world\n"))
+		t.stdoutWriter.Close()
+		t.stderrWriter.Close()
+		t.finish <- dockerclient.WaitResult{}
+	})
 
 	c.Check(api.Content["exit_code"], Equals, 0)
 	c.Check(api.Content["state"], Equals, "Complete")
@@ -414,7 +507,7 @@ func (s *LoggingTestSuite) TestFullRunHello(c *C) {
 
 }
 
-func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
+func (s *TestSuite) TestFullRunStderr(c *C) {
 	api, _ := FullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -424,7 +517,13 @@ func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+		t.stdoutWriter.Write([]byte("hello\n"))
+		t.stderrWriter.Write([]byte("world\n"))
+		t.stdoutWriter.Close()
+		t.stderrWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 1}
+	})
 
 	c.Check(api.Content["log"], NotNil)
 	c.Check(api.Content["exit_code"], Equals, 1)
@@ -434,7 +533,7 @@ func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
 	c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
 }
 
-func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
+func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
 	api, _ := FullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -444,7 +543,12 @@ func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+		t.stdoutWriter.Write([]byte(t.cwd + "\n"))
+		t.stdoutWriter.Close()
+		t.stderrWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
 
 	c.Check(api.Content["exit_code"], Equals, 0)
 	c.Check(api.Content["state"], Equals, "Complete")
@@ -452,7 +556,7 @@ func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
 	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
 }
 
-func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
+func (s *TestSuite) TestFullRunSetCwd(c *C) {
 	api, _ := FullRunHelper(c, `{
     "command": ["pwd"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -462,7 +566,12 @@ func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+		t.stdoutWriter.Write([]byte(t.cwd + "\n"))
+		t.stdoutWriter.Close()
+		t.stderrWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
 
 	c.Check(api.Content["exit_code"], Equals, 0)
 	c.Check(api.Content["state"], Equals, "Complete")
@@ -470,7 +579,7 @@ func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
 	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
 }
 
-func (s *LoggingTestSuite) TestCancel(c *C) {
+func (s *TestSuite) TestCancel(c *C) {
 	record := `{
     "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -486,7 +595,14 @@ func (s *LoggingTestSuite) TestCancel(c *C) {
 	err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
 	c.Check(err, IsNil)
 
-	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker := NewTestDockerClient()
+	docker.fn = func(t *TestDockerClient) {
+		<-t.stop
+		t.stdoutWriter.Write([]byte("foo\n"))
+		t.stdoutWriter.Close()
+		t.stderrWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	}
 	docker.RemoveImage(hwImageId, true)
 
 	api := &ArvTestClient{ContainerRecord: rec}
@@ -518,7 +634,7 @@ func (s *LoggingTestSuite) TestCancel(c *C) {
 
 }
 
-func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
+func (s *TestSuite) TestFullRunSetEnv(c *C) {
 	api, _ := FullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $FROBIZ"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -528,7 +644,12 @@ func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`)
+}`, func(t *TestDockerClient) {
+		t.stdoutWriter.Write([]byte(t.env[0][7:] + "\n"))
+		t.stdoutWriter.Close()
+		t.stderrWriter.Close()
+		t.finish <- dockerclient.WaitResult{ExitCode: 0}
+	})
 
 	c.Check(api.Content["exit_code"], Equals, 0)
 	c.Check(api.Content["state"], Equals, "Complete")
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
index 27d066a..c86d918 100644
--- a/services/crunch-exec/logging_test.go
+++ b/services/crunch-exec/logging_test.go
@@ -3,15 +3,9 @@ package main
 import (
 	"fmt"
 	. "gopkg.in/check.v1"
-	"testing"
 	"time"
 )
 
-// Gocheck boilerplate
-func Test2(t *testing.T) {
-	TestingT(t)
-}
-
 type LoggingTestSuite struct{}
 
 type TestTimestamper struct {

commit c152ad446aaa61efde38cba91307cc8c29c4667f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jan 6 10:52:07 2016 -0500

    7816: Crunch2 executor prototype work in progress.

diff --git a/services/crunch-exec/crunchexec.go b/services/crunch-exec/crunchexec.go
new file mode 100644
index 0000000..d24073b
--- /dev/null
+++ b/services/crunch-exec/crunchexec.go
@@ -0,0 +1,389 @@
+package main
+
+import (
+	"errors"
+	"flag"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"github.com/curoverse/dockerclient"
+	"io"
+	"log"
+	"os"
+	"os/signal"
+	"strings"
+	"sync"
+	"syscall"
+)
+
+type IArvadosClient interface {
+	Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+	Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
+	Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
+}
+
+type IKeepClient interface {
+	PutHB(hash string, buf []byte) (string, int, error)
+	ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+}
+
+type Mount struct{}
+
+type Collection struct {
+	ManifestText string `json:"manifest_text"`
+}
+
+type ContainerRecord struct {
+	Uuid               string            `json:"uuid"`
+	Command            []string          `json:"command"`
+	ContainerImage     string            `json:"container_image"`
+	Cwd                string            `json:"cwd"`
+	Environment        map[string]string `json:"environment"`
+	Mounts             map[string]Mount  `json:"mounts"`
+	OutputPath         string            `json:"output_path"`
+	Priority           int               `json:"priority"`
+	RuntimeConstraints map[string]string `json:"runtime_constraints"`
+	State              string            `json:"state"`
+}
+
+type NewLogWriter func(name string) io.WriteCloser
+
+type ContainerRunner struct {
+	Docker *dockerclient.DockerClient
+	Api    IArvadosClient
+	Kc     IKeepClient
+	ContainerRecord
+	dockerclient.ContainerConfig
+	ContainerId string
+	ExitCode    *int
+	NewLogWriter
+	loggingDone   chan bool
+	CrunchLog     *ThrottledLogger
+	Stdout        *ThrottledLogger
+	Stderr        *ThrottledLogger
+	LogCollection *CollectionWriter
+	LogsPDH       *string
+	CancelLock    sync.Mutex
+	Cancelled     bool
+	SigChan       chan os.Signal
+}
+
+func (this *ContainerRunner) SetupSignals() error {
+	this.SigChan = make(chan os.Signal, 1)
+	signal.Notify(this.SigChan, syscall.SIGTERM)
+	signal.Notify(this.SigChan, syscall.SIGINT)
+	signal.Notify(this.SigChan, syscall.SIGQUIT)
+
+	go func(sig <-chan os.Signal) {
+		for _ = range sig {
+			if !this.Cancelled {
+				this.CancelLock.Lock()
+				this.Cancelled = true
+				if this.ContainerId != "" {
+					this.Docker.StopContainer(this.ContainerId, 10)
+				}
+				this.CancelLock.Unlock()
+			}
+		}
+	}(this.SigChan)
+
+	return nil
+}
+
+func (this *ContainerRunner) LoadImage() (err error) {
+
+	this.CrunchLog.Printf("Fetching Docker image from collection '%s'", this.ContainerRecord.ContainerImage)
+
+	var collection Collection
+	err = this.Api.Get("collections", this.ContainerRecord.ContainerImage, nil, &collection)
+	if err != nil {
+		return err
+	}
+	manifest := manifest.Manifest{Text: collection.ManifestText}
+	var img, imageId string
+	for ms := range manifest.StreamIter() {
+		img = ms.FileStreamSegments[0].Name
+		if !strings.HasSuffix(img, ".tar") {
+			return errors.New("First file in the collection does not end in .tar")
+		}
+		imageId = img[:len(img)-4]
+	}
+
+	this.CrunchLog.Printf("Using Docker image id '%s'", imageId)
+
+	_, err = this.Docker.InspectImage(imageId)
+	if err != nil {
+		this.CrunchLog.Print("Loading Docker image from keep")
+
+		var readCloser io.ReadCloser
+		readCloser, err = this.Kc.ManifestFileReader(manifest, img)
+		if err != nil {
+			return err
+		}
+
+		err = this.Docker.LoadImage(readCloser)
+		if err != nil {
+			return err
+		}
+	} else {
+		this.CrunchLog.Print("Docker image is available")
+	}
+
+	this.ContainerConfig.Image = imageId
+
+	return nil
+}
+
+func (this *ContainerRunner) StartContainer() (err error) {
+	this.CrunchLog.Print("Creating Docker container")
+
+	this.CancelLock.Lock()
+	defer this.CancelLock.Unlock()
+
+	if this.Cancelled {
+		return errors.New("Cancelled")
+	}
+
+	this.ContainerConfig.Cmd = this.ContainerRecord.Command
+	if this.ContainerRecord.Cwd != "." {
+		this.ContainerConfig.WorkingDir = this.ContainerRecord.Cwd
+	}
+	for k, v := range this.ContainerRecord.Environment {
+		this.ContainerConfig.Env = append(this.ContainerConfig.Env, k+"="+v)
+	}
+	this.ContainerId, err = this.Docker.CreateContainer(&this.ContainerConfig, "", nil)
+	if err != nil {
+		return
+	}
+	hostConfig := &dockerclient.HostConfig{}
+
+	this.CrunchLog.Printf("Starting Docker container id '%s'", this.ContainerId)
+	err = this.Docker.StartContainer(this.ContainerId, hostConfig)
+	if err != nil {
+		return
+	}
+
+	return nil
+}
+
+func (this *ContainerRunner) AttachLogs() (err error) {
+
+	this.CrunchLog.Print("Attaching container logs")
+
+	var stderrReader, stdoutReader io.Reader
+	stderrReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stderr: true})
+	if err != nil {
+		return
+	}
+	stdoutReader, err = this.Docker.ContainerLogs(this.ContainerId, &dockerclient.LogOptions{Follow: true, Stdout: true})
+	if err != nil {
+		return
+	}
+
+	this.loggingDone = make(chan bool)
+
+	this.Stdout = NewThrottledLogger(this.NewLogWriter("stdout"))
+	this.Stderr = NewThrottledLogger(this.NewLogWriter("stderr"))
+	go CopyReaderToLog(stdoutReader, this.Stdout.Logger, this.loggingDone)
+	go CopyReaderToLog(stderrReader, this.Stderr.Logger, this.loggingDone)
+
+	return nil
+}
+
+func (this *ContainerRunner) WaitFinish() error {
+	result := this.Docker.Wait(this.ContainerId)
+	wr := <-result
+	if wr.Error != nil {
+		return wr.Error
+	}
+	this.ExitCode = &wr.ExitCode
+
+	// drain stdout/stderr
+	<-this.loggingDone
+	<-this.loggingDone
+
+	this.Stdout.Stop()
+	this.Stderr.Stop()
+
+	return nil
+}
+
+func (this *ContainerRunner) CommitLogs() error {
+	if this.Cancelled {
+		this.CrunchLog.Print("Cancelled")
+	} else {
+		this.CrunchLog.Print("Complete")
+	}
+
+	this.CrunchLog.Stop()
+	this.CrunchLog = NewThrottledLogger(&ArvLogWriter{this.Api, this.ContainerRecord.Uuid,
+		"crunchexec", nil})
+
+	mt, err := this.LogCollection.ManifestText()
+	if err != nil {
+		return err
+	}
+
+	response := make(map[string]string)
+	err = this.Api.Create("collections",
+		arvadosclient.Dict{"name": "logs for " + this.ContainerRecord.Uuid,
+			"manifest_text": mt},
+		response)
+	if err != nil {
+		return err
+	}
+
+	this.LogsPDH = new(string)
+	*this.LogsPDH = response["portable_data_hash"]
+
+	return nil
+}
+
+func (this *ContainerRunner) UpdateContainerRecordRunning() error {
+	update := arvadosclient.Dict{"state": "Running"}
+	return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+}
+
+func (this *ContainerRunner) UpdateContainerRecordComplete() error {
+	update := arvadosclient.Dict{}
+	if this.LogsPDH != nil {
+		update["log"] = *this.LogsPDH
+	}
+	if this.ExitCode != nil {
+		update["exit_code"] = *this.ExitCode
+	}
+
+	if this.Cancelled {
+		update["state"] = "Cancelled"
+	} else {
+		update["state"] = "Complete"
+	}
+	return this.Api.Update("containers", this.ContainerRecord.Uuid, update, nil)
+}
+
+func (this *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+	return &ArvLogWriter{this.Api, this.ContainerRecord.Uuid, name, this.LogCollection.Open(name + ".txt")}
+}
+
+func (this *ContainerRunner) Run(containerUuid string) (err error) {
+	this.CrunchLog.Printf("Executing container '%s'", containerUuid)
+
+	var runerr, waiterr error
+
+	defer func() {
+		if err != nil {
+			this.CrunchLog.Print(err)
+		}
+
+		// (6) write logs
+		logerr := this.CommitLogs()
+		if logerr != nil {
+			this.CrunchLog.Print(logerr)
+		}
+
+		// (7) update container record with results
+		updateerr := this.UpdateContainerRecordComplete()
+		if updateerr != nil {
+			this.CrunchLog.Print(updateerr)
+		}
+
+		this.CrunchLog.Stop()
+
+		if err == nil {
+			if runerr != nil {
+				err = runerr
+			} else if waiterr != nil {
+				err = runerr
+			} else if logerr != nil {
+				err = logerr
+			} else if updateerr != nil {
+				err = updateerr
+			}
+		}
+	}()
+
+	err = this.Api.Get("containers", containerUuid, nil, &this.ContainerRecord)
+	if err != nil {
+		return
+	}
+
+	// (0) setup signal handling
+	err = this.SetupSignals()
+	if err != nil {
+		return
+	}
+
+	// (1) check for and/or load image
+	err = this.LoadImage()
+	if err != nil {
+		return
+	}
+
+	// (2) start container
+	err = this.StartContainer()
+	if err != nil {
+		if err.Error() == "Cancelled" {
+			err = nil
+		}
+		return
+	}
+
+	// (3) update container record state
+	err = this.UpdateContainerRecordRunning()
+	if err != nil {
+		this.CrunchLog.Print(err)
+	}
+
+	// (4) attach container logs
+	runerr = this.AttachLogs()
+	if runerr != nil {
+		this.CrunchLog.Print(runerr)
+	}
+
+	// (5) wait for container to finish
+	waiterr = this.WaitFinish()
+
+	return
+}
+
+func NewContainerRunner(api IArvadosClient,
+	kc IKeepClient,
+	docker *dockerclient.DockerClient) *ContainerRunner {
+
+	cr := &ContainerRunner{Api: api, Kc: kc, Docker: docker}
+	cr.NewLogWriter = cr.NewArvLogWriter
+	cr.LogCollection = &CollectionWriter{kc, nil}
+	cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunchexec"))
+	return cr
+}
+
+func main() {
+	flag.Parse()
+
+	api, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Fatal(err)
+	}
+	api.Retries = 8
+
+	var kc *keepclient.KeepClient
+	kc, err = keepclient.MakeKeepClient(&api)
+	if err != nil {
+		log.Fatal(err)
+	}
+	kc.Retries = 4
+
+	var docker *dockerclient.DockerClient
+	docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	cr := NewContainerRunner(api, kc, docker)
+
+	err = cr.Run(flag.Arg(0))
+	if err != nil {
+		log.Fatal(err)
+	}
+
+}
diff --git a/services/crunch-exec/crunchexec_test.go b/services/crunch-exec/crunchexec_test.go
new file mode 100644
index 0000000..0c8f2c1
--- /dev/null
+++ b/services/crunch-exec/crunchexec_test.go
@@ -0,0 +1,537 @@
+package main
+
+import (
+	"bytes"
+	"crypto/md5"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"github.com/curoverse/dockerclient"
+	. "gopkg.in/check.v1"
+	"io"
+	"os"
+	"strings"
+	"syscall"
+	"testing"
+	"time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+	Total   int64
+	Calls   int
+	Content arvadosclient.Dict
+	ContainerRecord
+	Logs          map[string]*bytes.Buffer
+	WasSetRunning bool
+}
+
+type KeepTestClient struct {
+	Called  bool
+	Content []byte
+}
+
+var hwManifest = ". 82ab40c24fc8df01798e57ba66795bb1+841216+Aa124ac75e5168396c73c0a18eda641a4f41791c0 at 569fa8c3 0:841216:9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar\n"
+var hwPDH = "a45557269dcb65a6b78f9ac061c0850b+120"
+var hwImageId = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar"
+
+var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d at 5693216f 0:46:md5sum.txt\n"
+var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
+
+func (this *ArvTestClient) Create(resourceType string,
+	parameters arvadosclient.Dict,
+	output interface{}) error {
+
+	this.Calls += 1
+	this.Content = parameters
+
+	if resourceType == "logs" {
+		et := parameters["event_type"].(string)
+		if this.Logs == nil {
+			this.Logs = make(map[string]*bytes.Buffer)
+		}
+		if this.Logs[et] == nil {
+			this.Logs[et] = &bytes.Buffer{}
+		}
+		this.Logs[et].Write([]byte(parameters["properties"].(map[string]string)["text"]))
+	}
+
+	if resourceType == "collections" && output != nil {
+		mt := parameters["manifest_text"].(string)
+		outmap := output.(map[string]string)
+		outmap["portable_data_hash"] = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+	}
+
+	return nil
+}
+
+func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+	if resourceType == "collections" {
+		if uuid == hwPDH {
+			output.(*Collection).ManifestText = hwManifest
+		} else if uuid == otherPDH {
+			output.(*Collection).ManifestText = otherManifest
+		}
+	}
+	if resourceType == "containers" {
+		(*output.(*ContainerRecord)) = this.ContainerRecord
+	}
+	return nil
+}
+
+func (this *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+
+	this.Content = parameters
+	if resourceType == "containers" {
+		if parameters["state"] == "Running" {
+			this.WasSetRunning = true
+		}
+
+	}
+	return nil
+}
+
+func (this *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	this.Content = buf
+	return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
+}
+
+type FileWrapper struct {
+	io.ReadCloser
+	len uint64
+}
+
+func (this FileWrapper) Len() uint64 {
+	return this.len
+}
+
+func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+	if filename == "fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar" {
+		rdr, err := os.Open("fc0db02f30724abc777d7ae2b2404c6d074f1e2ceca19912352aea30a42f50b7.tar")
+		this.Called = true
+		return FileWrapper{rdr, 1321984}, err
+	}
+	return nil, nil
+}
+
+func (s *TestSuite) TestLoadImage(c *C) {
+	kc := &KeepTestClient{}
+	docker, err := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
+
+	_, err = cr.Docker.RemoveImage(hwImageId, true)
+
+	_, err = cr.Docker.InspectImage(hwImageId)
+	c.Check(err, NotNil)
+
+	cr.ContainerRecord.ContainerImage = hwPDH
+
+	// (1) Test loading image from keep
+	c.Check(kc.Called, Equals, false)
+	c.Check(cr.ContainerConfig.Image, Equals, "")
+
+	err = cr.LoadImage()
+
+	c.Check(err, IsNil)
+	defer func() {
+		cr.Docker.RemoveImage(hwImageId, true)
+	}()
+
+	c.Check(kc.Called, Equals, true)
+	c.Check(cr.ContainerConfig.Image, Equals, hwImageId)
+
+	_, err = cr.Docker.InspectImage(hwImageId)
+	c.Check(err, IsNil)
+
+	// (2) Test using image that's already loaded
+	kc.Called = false
+	cr.ContainerConfig.Image = ""
+
+	err = cr.LoadImage()
+	c.Check(err, IsNil)
+	c.Check(kc.Called, Equals, false)
+	c.Check(cr.ContainerConfig.Image, Equals, hwImageId)
+
+}
+
+type ArvErrorTestClient struct{}
+type KeepErrorTestClient struct{}
+type KeepReadErrorTestClient struct{}
+
+func (this ArvErrorTestClient) Create(resourceType string,
+	parameters arvadosclient.Dict,
+	output interface{}) error {
+	return nil
+}
+
+func (this ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
+	return errors.New("ArvError")
+}
+
+func (this ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+	return nil
+}
+
+func (this KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	return "", 0, nil
+}
+
+func (this KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+	return nil, errors.New("KeepError")
+}
+
+func (this KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+	return "", 0, nil
+}
+
+type ErrorReader struct{}
+
+func (this ErrorReader) Read(p []byte) (n int, err error) {
+	return 0, errors.New("ErrorReader")
+}
+
+func (this ErrorReader) Close() error {
+	return nil
+}
+
+func (this ErrorReader) Len() uint64 {
+	return 0
+}
+
+func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
+	return ErrorReader{}, nil
+}
+
+func (s *TestSuite) TestLoadImageArvError(c *C) {
+	// (1) Arvados error
+	cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil)
+	cr.ContainerRecord.ContainerImage = hwPDH
+
+	err := cr.LoadImage()
+	c.Check(err.Error(), Equals, "ArvError")
+}
+
+func (s *TestSuite) TestLoadImageKeepError(c *C) {
+	// (2) Keep error
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker)
+	cr.ContainerRecord.ContainerImage = hwPDH
+
+	err := cr.LoadImage()
+	c.Check(err.Error(), Equals, "KeepError")
+}
+
+func (s *TestSuite) TestLoadImageCollectionError(c *C) {
+	// (3) Collection doesn't contain image
+	cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil)
+	cr.ContainerRecord.ContainerImage = otherPDH
+
+	err := cr.LoadImage()
+	c.Check(err.Error(), Equals, "First file in the collection does not end in .tar")
+}
+
+func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
+	// (4) Collection doesn't contain image
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker)
+	cr.ContainerRecord.ContainerImage = hwPDH
+
+	err := cr.LoadImage()
+	c.Check(err, NotNil)
+}
+
+type ClosableBuffer struct {
+	bytes.Buffer
+}
+
+type TestLogs struct {
+	Stdout ClosableBuffer
+	Stderr ClosableBuffer
+}
+
+func (this *ClosableBuffer) Write(p []byte) (n int, err error) {
+	return this.Buffer.Write(p)
+}
+
+func (this *ClosableBuffer) Close() error {
+	return nil
+}
+
+func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
+	if logstr == "stdout" {
+		return &this.Stdout
+	}
+	if logstr == "stderr" {
+		return &this.Stderr
+	}
+	return nil
+}
+
+func (s *TestSuite) TestRunContainer(c *C) {
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
+
+	var logs TestLogs
+	cr.NewLogWriter = logs.NewTestLoggingWriter
+	cr.ContainerRecord.ContainerImage = hwPDH
+d	cr.ContainerRecord.Command = []string{"./hw"}
+	err := cr.LoadImage()
+	c.Check(err, IsNil)
+
+	err = cr.StartContainer()
+	c.Check(err, IsNil)
+
+	err = cr.AttachLogs()
+	c.Check(err, IsNil)
+
+	err = cr.WaitFinish()
+	c.Check(err, IsNil)
+
+	c.Check(strings.HasSuffix(logs.Stdout.String(), "Hello world\n"), Equals, true)
+	c.Check(logs.Stderr.String(), Equals, "")
+}
+
+func (s *LoggingTestSuite) TestCommitLogs(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+	cr.CrunchLog.Print("Hello world!")
+	cr.CrunchLog.Print("Goodbye")
+
+	err := cr.CommitLogs()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	c.Check(api.Content["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunchexec.txt\n")
+	c.Check(*cr.LogsPDH, Equals, "d3a229d2fe3690c2c3e75a71a153c6a3+60")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordRunning(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+	err := cr.UpdateContainerRecordRunning()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["state"], Equals, "Running")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordComplete(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+
+	cr.LogsPDH = new(string)
+	*cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
+
+	cr.ExitCode = new(int)
+	*cr.ExitCode = 42
+
+	err := cr.UpdateContainerRecordComplete()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["log"], Equals, *cr.LogsPDH)
+	c.Check(api.Content["exit_code"], Equals, *cr.ExitCode)
+	c.Check(api.Content["state"], Equals, "Complete")
+}
+
+func (s *LoggingTestSuite) TestUpdateContainerRecordCancelled(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.ContainerRecord.Uuid = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+	cr.Cancelled = true
+
+	err := cr.UpdateContainerRecordComplete()
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["log"], IsNil)
+	c.Check(api.Content["exit_code"], IsNil)
+	c.Check(api.Content["state"], Equals, "Cancelled")
+}
+
+func FullRunHelper(c *C, record string) (api *ArvTestClient, cr *ContainerRunner) {
+	rec := ContainerRecord{}
+	err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+	c.Check(err, IsNil)
+
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker.RemoveImage(hwImageId, true)
+
+	api = &ArvTestClient{ContainerRecord: rec}
+	cr = NewContainerRunner(api, &KeepTestClient{}, docker)
+
+	err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+	c.Check(err, IsNil)
+	c.Check(api.WasSetRunning, Equals, true)
+
+	c.Check(api.Content["log"], NotNil)
+
+	if err != nil {
+		for k, v := range api.Logs {
+			c.Log(k)
+			c.Log(v.String())
+		}
+	}
+
+	return
+}
+
+func (s *LoggingTestSuite) TestFullRunHello(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunStderr(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["log"], NotNil)
+	c.Check(api.Content["exit_code"], Equals, 1)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
+	c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunDefaultCwd(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["pwd"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestFullRunSetCwd(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["pwd"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": "/bin",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
+}
+
+func (s *LoggingTestSuite) TestCancel(c *C) {
+	record := `{
+    "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`
+
+	rec := ContainerRecord{}
+	err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
+	c.Check(err, IsNil)
+
+	docker, _ := dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+	docker.RemoveImage(hwImageId, true)
+
+	api := &ArvTestClient{ContainerRecord: rec}
+	cr := NewContainerRunner(api, &KeepTestClient{}, docker)
+
+	go func() {
+		for cr.ContainerId == "" {
+			time.Sleep(1 * time.Second)
+		}
+		cr.SigChan <- syscall.SIGINT
+	}()
+
+	err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+
+	c.Check(err, IsNil)
+
+	c.Check(api.Content["log"], NotNil)
+
+	if err != nil {
+		for k, v := range api.Logs {
+			c.Log(k)
+			c.Log(v.String())
+		}
+	}
+
+	c.Check(api.Content["state"], Equals, "Cancelled")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
+
+}
+
+func (s *LoggingTestSuite) TestFullRunSetEnv(c *C) {
+	api, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": "/bin",
+    "environment": {"FROBIZ": "bilbo"},
+    "mounts": {},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`)
+
+	c.Check(api.Content["exit_code"], Equals, 0)
+	c.Check(api.Content["state"], Equals, "Complete")
+
+	c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
+}
diff --git a/services/crunch-exec/logging.go b/services/crunch-exec/logging.go
new file mode 100644
index 0000000..01b6548
--- /dev/null
+++ b/services/crunch-exec/logging.go
@@ -0,0 +1,159 @@
+package main
+
+import (
+	"bufio"
+	"bytes"
+	"errors"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"io"
+	"log"
+	"sync"
+	"time"
+)
+
+type Timestamper func(t time.Time) string
+
+type ThrottledLogger struct {
+	*log.Logger
+	buf *bytes.Buffer
+	sync.Mutex
+	writer      io.WriteCloser
+	stop        bool
+	flusherDone chan bool
+	Timestamper
+}
+
+func RFC3339Timestamp(now time.Time) string {
+	// return now.Format(time.RFC3339Nano)
+	// Builtin RFC3339Nano format isn't fixed width so
+	// provide our own.
+
+	return fmt.Sprintf("%04d-%02d-%02dT%02d:%02d:%02d.%09dZ",
+		now.Year(), now.Month(), now.Day(),
+		now.Hour(), now.Minute(), now.Second(),
+		now.Nanosecond())
+
+}
+
+func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
+	this.Mutex.Lock()
+	if this.buf == nil {
+		this.buf = &bytes.Buffer{}
+	}
+	defer this.Mutex.Unlock()
+
+	now := time.Now().UTC()
+	_, err = fmt.Fprintf(this.buf, "%s %s", this.Timestamper(now), p)
+	return len(p), err
+}
+
+func (this *ThrottledLogger) Stop() {
+	this.stop = true
+	<-this.flusherDone
+	this.writer.Close()
+}
+
+func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
+	for b := range c {
+		writer.Write(b.Bytes())
+	}
+	t <- true
+}
+
+func (this *ThrottledLogger) flusher() {
+	bufchan := make(chan *bytes.Buffer)
+	bufterm := make(chan bool)
+	go goWriter(this.writer, bufchan, bufterm)
+	for {
+		if !this.stop {
+			time.Sleep(1 * time.Second)
+		}
+		this.Mutex.Lock()
+		if this.buf != nil && this.buf.Len() > 0 {
+			bufchan <- this.buf
+			this.buf = nil
+		} else if this.stop {
+			this.Mutex.Unlock()
+			break
+		}
+		this.Mutex.Unlock()
+	}
+	close(bufchan)
+	<-bufterm
+	this.flusherDone <- true
+}
+
+const (
+	MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
+)
+
+func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
+	reader := bufio.NewReaderSize(in, MaxLogLine)
+	var prefix string
+	for {
+		line, isPrefix, err := reader.ReadLine()
+		if err == io.EOF {
+			break
+		} else if err != nil {
+			logger.Print("error reading container log:", err)
+		}
+		var suffix string
+		if isPrefix {
+			suffix = "[...]"
+		}
+		logger.Print(prefix, string(line), suffix)
+		// Set up prefix for following line
+		if isPrefix {
+			prefix = "[...]"
+		} else {
+			prefix = ""
+		}
+	}
+	done <- true
+}
+
+func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
+	alw := &ThrottledLogger{}
+	alw.flusherDone = make(chan bool)
+	alw.writer = writer
+	alw.Logger = log.New(alw, "", 0)
+	alw.Timestamper = RFC3339Timestamp
+	go alw.flusher()
+	return alw
+}
+
+type ArvLogWriter struct {
+	Api           IArvadosClient
+	Uuid          string
+	loggingStream string
+	io.WriteCloser
+}
+
+func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
+	var err1 error
+	if this.WriteCloser != nil {
+		_, err1 = this.WriteCloser.Write(p)
+	}
+
+	// write to API
+	lr := arvadosclient.Dict{"object_uuid": this.Uuid,
+		"event_type": this.loggingStream,
+		"properties": map[string]string{"text": string(p)}}
+	err2 := this.Api.Create("logs", lr, nil)
+
+	if err1 != nil || err2 != nil {
+		return 0, errors.New(fmt.Sprintf("%s ; %s", err1, err2))
+	} else {
+		return len(p), nil
+	}
+
+}
+
+func (this *ArvLogWriter) Close() (err error) {
+	if this.WriteCloser != nil {
+		err = this.WriteCloser.Close()
+		this.WriteCloser = nil
+	}
+	return err
+}
diff --git a/services/crunch-exec/logging_test.go b/services/crunch-exec/logging_test.go
new file mode 100644
index 0000000..27d066a
--- /dev/null
+++ b/services/crunch-exec/logging_test.go
@@ -0,0 +1,104 @@
+package main
+
+import (
+	"fmt"
+	. "gopkg.in/check.v1"
+	"testing"
+	"time"
+)
+
+// Gocheck boilerplate
+func Test2(t *testing.T) {
+	TestingT(t)
+}
+
+type LoggingTestSuite struct{}
+
+type TestTimestamper struct {
+	count int
+}
+
+func (this *TestTimestamper) Timestamp(t time.Time) string {
+	this.count += 1
+	return fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&LoggingTestSuite{})
+
+func (s *LoggingTestSuite) TestWriteLogs(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+	cr.CrunchLog.Print("Hello world!")
+	cr.CrunchLog.Print("Goodbye")
+	cr.CrunchLog.Stop()
+
+	c.Check(api.Calls, Equals, 1)
+
+	mt, err := cr.LogCollection.ManifestText()
+	c.Check(err, IsNil)
+	c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunchexec.txt\n")
+
+	logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+		"2015-12-29T15:51:45.000000002Z Goodbye\n"
+
+	c.Check(api.Content["event_type"], Equals, "crunchexec")
+	c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext)
+	c.Check(string(kc.Content), Equals, logtext)
+}
+
+func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+	for i := 0; i < 2000000; i += 1 {
+		cr.CrunchLog.Printf("Hello %d", i)
+	}
+	cr.CrunchLog.Print("Goodbye")
+	cr.CrunchLog.Stop()
+
+	c.Check(api.Calls > 1, Equals, true)
+	c.Check(api.Calls < 2000000, Equals, true)
+
+	mt, err := cr.LogCollection.ManifestText()
+	c.Check(err, IsNil)
+	c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunchexec.txt\n")
+}
+
+func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
+	api := &ArvTestClient{}
+	kc := &KeepTestClient{}
+	cr := NewContainerRunner(api, kc, nil)
+	ts := &TestTimestamper{}
+	cr.CrunchLog.Timestamper = ts.Timestamp
+	stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
+	stdout.Timestamper = ts.Timestamp
+
+	cr.CrunchLog.Print("Hello world!")
+	stdout.Print("Doing stuff")
+	cr.CrunchLog.Print("Goodbye")
+	stdout.Print("Blurb")
+
+	cr.CrunchLog.Stop()
+	logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+		"2015-12-29T15:51:45.000000003Z Goodbye\n"
+	c.Check(api.Content["event_type"], Equals, "crunchexec")
+	c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext1)
+
+	stdout.Stop()
+	logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
+		"2015-12-29T15:51:45.000000004Z Blurb\n"
+	c.Check(api.Content["event_type"], Equals, "stdout")
+	c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext2)
+
+	mt, err := cr.LogCollection.ManifestText()
+	c.Check(err, IsNil)
+	c.Check(mt, Equals, ""+
+		". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunchexec.txt\n"+
+		". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
+}
diff --git a/services/crunch-exec/upload.go b/services/crunch-exec/upload.go
new file mode 100644
index 0000000..e85d636
--- /dev/null
+++ b/services/crunch-exec/upload.go
@@ -0,0 +1,174 @@
+package main
+
+import (
+	"bytes"
+	"crypto/md5"
+	"errors"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"io"
+	"strings"
+)
+
+type Block struct {
+	data   []byte
+	offset int64
+}
+
+type CollectionFileWriter struct {
+	IKeepClient
+	*manifest.ManifestStream
+	offset uint64
+	length uint64
+	*Block
+	uploader chan *Block
+	finish   chan []error
+	fn       string
+}
+
+func (m *CollectionFileWriter) Write(p []byte) (int, error) {
+	n, err := m.ReadFrom(bytes.NewReader(p))
+	return int(n), err
+}
+
+func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
+	var total int64
+	var count int
+
+	for err == nil {
+		if m.Block == nil {
+			m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+		}
+		count, err = r.Read(m.Block.data[m.Block.offset:])
+		total += int64(count)
+		m.Block.offset += int64(count)
+		if m.Block.offset == keepclient.BLOCKSIZE {
+			m.uploader <- m.Block
+			m.Block = nil
+		}
+	}
+
+	m.length += uint64(total)
+
+	if err == io.EOF {
+		return total, nil
+	} else {
+		return total, err
+	}
+}
+
+func (m *CollectionFileWriter) Close() error {
+	m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
+		manifest.FileStreamSegment{m.offset, m.length, m.fn})
+	return nil
+}
+
+func (m *CollectionFileWriter) goUpload() {
+	var errors []error
+	uploader := m.uploader
+	finish := m.finish
+	for block := range uploader {
+		hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+		signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+		if err != nil {
+			errors = append(errors, err)
+		} else {
+			m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+		}
+	}
+	finish <- errors
+}
+
+type CollectionWriter struct {
+	IKeepClient
+	Streams []*CollectionFileWriter
+}
+
+func (m *CollectionWriter) Open(path string) io.WriteCloser {
+	var dir string
+	var fn string
+
+	i := strings.Index(path, "/")
+	if i > -1 {
+		dir = "./" + path[0:i]
+		fn = path[i+1:]
+	} else {
+		dir = "."
+		fn = path
+	}
+
+	fw := &CollectionFileWriter{
+		m.IKeepClient,
+		&manifest.ManifestStream{StreamName: dir},
+		0,
+		0,
+		nil,
+		make(chan *Block),
+		make(chan []error),
+		fn}
+	go fw.goUpload()
+
+	m.Streams = append(m.Streams, fw)
+
+	return fw
+}
+
+func (m *CollectionWriter) Finish() error {
+	var errstring string
+	for _, stream := range m.Streams {
+		if stream.uploader == nil {
+			continue
+		}
+		if stream.Block != nil {
+			stream.uploader <- stream.Block
+		}
+		close(stream.uploader)
+		stream.uploader = nil
+
+		errors := <-stream.finish
+		close(stream.finish)
+		stream.finish = nil
+
+		for _, r := range errors {
+			errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+		}
+	}
+	if errstring != "" {
+		return errors.New(errstring)
+	} else {
+		return nil
+	}
+}
+
+func (m *CollectionWriter) ManifestText() (mt string, err error) {
+	err = m.Finish()
+	if err != nil {
+		return "", err
+	}
+
+	var buf bytes.Buffer
+
+	for _, v := range m.Streams {
+		k := v.StreamName
+		if k == "." {
+			buf.WriteString(".")
+		} else {
+			k = strings.Replace(k, " ", "\\040", -1)
+			k = strings.Replace(k, "\n", "", -1)
+			buf.WriteString("./" + k)
+		}
+		for _, b := range v.Blocks {
+			buf.WriteString(" ")
+			buf.WriteString(b)
+		}
+		for _, f := range v.FileStreamSegments {
+			buf.WriteString(" ")
+			name := strings.Replace(f.Name, " ", "\\040", -1)
+			name = strings.Replace(name, "\n", "", -1)
+			buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
+		}
+		buf.WriteString("\n")
+	}
+	return buf.String(), nil
+}

commit 0a46b75ac1e96a6f5ce3ae797eb4306b352fab36
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jan 6 10:51:28 2016 -0500

    7816: Update Go SDK to use correct block locator pattern.  Change FileToken
    string in manifest to FileStreamSegment struct.

diff --git a/sdk/go/blockdigest/blockdigest.go b/sdk/go/blockdigest/blockdigest.go
index a5a668f..cfc510c 100644
--- a/sdk/go/blockdigest/blockdigest.go
+++ b/sdk/go/blockdigest/blockdigest.go
@@ -9,7 +9,7 @@ import (
 )
 
 var LocatorPattern = regexp.MustCompile(
-	"^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9 at _-]+)*$")
+	"^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9 at _-]*)*$")
 
 // Stores a Block Locator Digest compactly, up to 128 bits.
 // Can be used as a map key.
diff --git a/sdk/go/blockdigest/blockdigest_test.go b/sdk/go/blockdigest/blockdigest_test.go
index e520dee..395e2f8 100644
--- a/sdk/go/blockdigest/blockdigest_test.go
+++ b/sdk/go/blockdigest/blockdigest_test.go
@@ -143,6 +143,9 @@ func TestLocatorPatternBasic(t *testing.T) {
 		"12345678901234567890123456789012+12345+A1+B123wxyz at _-")
 	expectLocatorPatternMatch(t,
 		"12345678901234567890123456789012+12345+A1+B123wxyz at _-+C@")
+	expectLocatorPatternMatch(t, "12345678901234567890123456789012+12345+A")
+	expectLocatorPatternMatch(t, "12345678901234567890123456789012+12345+A1+B")
+	expectLocatorPatternMatch(t, "12345678901234567890123456789012+12345+A+B2")
 
 	expectLocatorPatternFail(t, "12345678901234567890123456789012")
 	expectLocatorPatternFail(t, "12345678901234567890123456789012+")
@@ -153,11 +156,9 @@ func TestLocatorPatternBasic(t *testing.T) {
 	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345 ")
 	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+1")
 	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+1A")
-	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A")
 	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+a1")
 	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A1+")
-	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A1+B")
-	expectLocatorPatternFail(t, "12345678901234567890123456789012+12345+A+B2")
+
 }
 
 func TestParseBlockLocatorSimple(t *testing.T) {
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index 06a6678..a3dc3d5 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -130,8 +130,8 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
 
 	stream.offset += count
 
-	stream.ManifestStream.FileTokens = append(stream.ManifestStream.FileTokens,
-		fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+	stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
+		manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
 
 	return nil
 }
@@ -189,11 +189,11 @@ func (m *ManifestWriter) ManifestText() string {
 			buf.WriteString(" ")
 			buf.WriteString(b)
 		}
-		for _, f := range v.FileTokens {
+		for _, f := range v.FileStreamSegments {
 			buf.WriteString(" ")
-			f = strings.Replace(f, " ", "\\040", -1)
-			f = strings.Replace(f, "\n", "", -1)
-			buf.WriteString(f)
+			name := strings.Replace(f.Name, " ", "\\040", -1)
+			name = strings.Replace(name, "\n", "", -1)
+			buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
 		}
 		buf.WriteString("\n")
 	}
diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index b532a16..d2c171d 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -40,6 +40,10 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
 		return nil, ErrNoManifest
 	}
 	m := manifest.Manifest{Text: mText}
+	return kc.ManifestFileReader(m, filename)
+}
+
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (ReadCloserWithLen, error) {
 	rdrChan := make(chan *cfReader)
 	go kc.queueSegmentsToGet(m, filename, rdrChan)
 	r, ok := <-rdrChan
diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go
index d4b9830..cf0ae85 100644
--- a/sdk/go/manifest/manifest.go
+++ b/sdk/go/manifest/manifest.go
@@ -44,12 +44,19 @@ type FileSegment struct {
 	Len    int
 }
 
+// FileStreamSegment is a portion of a file described as a segment of a stream.
+type FileStreamSegment struct {
+	SegPos uint64
+	SegLen uint64
+	Name   string
+}
+
 // Represents a single line from a manifest.
 type ManifestStream struct {
-	StreamName string
-	Blocks     []string
-	FileTokens []string
-	Err        error
+	StreamName         string
+	Blocks             []string
+	FileStreamSegments []FileStreamSegment
+	Err                error
 }
 
 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
@@ -97,21 +104,21 @@ func ParseBlockLocator(s string) (b BlockLocator, err error) {
 	return
 }
 
-func parseFileToken(tok string) (segPos, segLen uint64, name string, err error) {
+func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
 	parts := strings.SplitN(tok, ":", 3)
 	if len(parts) != 3 {
 		err = ErrInvalidToken
 		return
 	}
-	segPos, err = strconv.ParseUint(parts[0], 10, 64)
+	ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
 	if err != nil {
 		return
 	}
-	segLen, err = strconv.ParseUint(parts[1], 10, 64)
+	ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
 	if err != nil {
 		return
 	}
-	name = UnescapeName(parts[2])
+	ft.Name = UnescapeName(parts[2])
 	return
 }
 
@@ -128,12 +135,11 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
 	blockLens := make([]int, 0, len(s.Blocks))
 	// This is what streamName+"/"+fileName will look like:
 	target := "./" + filepath
-	for _, fTok := range s.FileTokens {
-		wantPos, wantLen, name, err := parseFileToken(fTok)
-		if err != nil {
-			// Skip (!) invalid file tokens.
-			continue
-		}
+	for _, fTok := range s.FileStreamSegments {
+		wantPos := fTok.SegPos
+		wantLen := fTok.SegLen
+		name := fTok.Name
+
 		if s.StreamName+"/"+name != target {
 			continue
 		}
@@ -199,24 +205,25 @@ func parseManifestStream(s string) (m ManifestStream) {
 		}
 	}
 	m.Blocks = tokens[:i]
-	m.FileTokens = tokens[i:]
+	fileTokens := tokens[i:]
 
 	if len(m.Blocks) == 0 {
 		m.Err = fmt.Errorf("No block locators found")
 		return
 	}
 
-	if len(m.FileTokens) == 0 {
+	if len(fileTokens) == 0 {
 		m.Err = fmt.Errorf("No file tokens found")
 		return
 	}
 
-	for _, ft := range m.FileTokens {
-		_, _, _, err := parseFileToken(ft)
+	for _, ft := range fileTokens {
+		pft, err := parseFileStreamSegment(ft)
 		if err != nil {
 			m.Err = fmt.Errorf("Invalid file token: %s", ft)
 			break
 		}
+		m.FileStreamSegments = append(m.FileStreamSegments, pft)
 	}
 
 	return
diff --git a/sdk/go/manifest/manifest_test.go b/sdk/go/manifest/manifest_test.go
index 43c3bd3..f52d564 100644
--- a/sdk/go/manifest/manifest_test.go
+++ b/sdk/go/manifest/manifest_test.go
@@ -61,10 +61,21 @@ func expectStringSlicesEqual(t *testing.T, actual []string, expected []string) {
 	}
 }
 
+func expectFileStreamSegmentsEqual(t *testing.T, actual []FileStreamSegment, expected []FileStreamSegment) {
+	if len(actual) != len(expected) {
+		t.Fatalf("Expected %v (length %d), but received %v (length %d) instead. %s", expected, len(expected), actual, len(actual), getStackTrace())
+	}
+	for i := range actual {
+		if actual[i] != expected[i] {
+			t.Fatalf("Expected %v but received %v instead (first disagreement at position %d). %s", expected, actual, i, getStackTrace())
+		}
+	}
+}
+
 func expectManifestStream(t *testing.T, actual ManifestStream, expected ManifestStream) {
 	expectEqual(t, actual.StreamName, expected.StreamName)
 	expectStringSlicesEqual(t, actual.Blocks, expected.Blocks)
-	expectStringSlicesEqual(t, actual.FileTokens, expected.FileTokens)
+	expectFileStreamSegmentsEqual(t, actual.FileStreamSegments, expected.FileStreamSegments)
 }
 
 func expectBlockLocator(t *testing.T, actual blockdigest.BlockLocator, expected blockdigest.BlockLocator) {
@@ -76,8 +87,8 @@ func expectBlockLocator(t *testing.T, actual blockdigest.BlockLocator, expected
 func TestParseManifestStreamSimple(t *testing.T) {
 	m := parseManifestStream(". 365f83f5f808896ec834c8b595288735+2310+K at qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf 0:2310:qr1hi-8i9sb-ienvmpve1a0vpoi.log.txt")
 	expectManifestStream(t, m, ManifestStream{StreamName: ".",
-		Blocks:     []string{"365f83f5f808896ec834c8b595288735+2310+K at qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf"},
-		FileTokens: []string{"0:2310:qr1hi-8i9sb-ienvmpve1a0vpoi.log.txt"}})
+		Blocks:             []string{"365f83f5f808896ec834c8b595288735+2310+K at qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf"},
+		FileStreamSegments: []FileStreamSegment{{0, 2310, "qr1hi-8i9sb-ienvmpve1a0vpoi.log.txt"}}})
 }
 
 func TestParseBlockLocatorSimple(t *testing.T) {
@@ -108,8 +119,8 @@ func TestStreamIterShortManifestWithBlankStreams(t *testing.T) {
 	expectManifestStream(t,
 		firstStream,
 		ManifestStream{StreamName: ".",
-			Blocks:     []string{"b746e3d2104645f2f64cd3cc69dd895d+15693477+E2866e643690156651c03d876e638e674dcd79475 at 5441920c"},
-			FileTokens: []string{"0:15893477:chr10_band0_s0_e3000000.fj"}})
+			Blocks:             []string{"b746e3d2104645f2f64cd3cc69dd895d+15693477+E2866e643690156651c03d876e638e674dcd79475 at 5441920c"},
+			FileStreamSegments: []FileStreamSegment{{0, 15893477, "chr10_band0_s0_e3000000.fj"}}})
 
 	received, ok := <-streamIter
 	if ok {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list